You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/11/05 22:03:00 UTC

svn commit: r833166 [2/5] - in /hadoop/pig/trunk/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/ src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/ src/java/org/apache/hadoop/zebra/pig/ src/java/org/apache/hadoop/zebra/pi...

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ComparatorExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ComparatorExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ComparatorExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ComparatorExpr.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+import java.util.List;
+
+/**
+ * Base class for comparator expressions
+ */
+public abstract class ComparatorExpr {
+  protected abstract void appendLeafGenerator(List<LeafGenerator> list, int el,
+      int cel, boolean c, boolean explicitBound);
+
+  protected abstract void toString(PrintStream out);
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/DoubleExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/DoubleExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/DoubleExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/DoubleExpr.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+
+public class DoubleExpr extends FixedLengthPrimitive {
+  DoubleExpr(int index) {
+    super(index, Double.SIZE / Byte.SIZE);
+  }
+
+  @Override
+  protected void convertValue(byte[] b, Object o) {
+    if (b.length != Double.SIZE / Byte.SIZE) {
+      throw new IllegalArgumentException("Incorrect buffer size");
+    }
+    Double f = (Double) o;
+    long l = Double.doubleToLongBits(f);
+    if (l < 0) {
+      l = ~l;
+    } else {
+      l ^= 1L << (Long.SIZE - 1);
+    }
+    b[7] = (byte) l;
+    b[6] = (byte) (l >> 8);
+    b[5] = (byte) (l >> 16);
+    b[4] = (byte) (l >> 24);
+    b[3] = (byte) (l >> 32);
+    b[2] = (byte) (l >> 40);
+    b[1] = (byte) (l >> 48);
+    b[0] = (byte) (l >> 56);
+
+  }
+
+  @Override
+  protected String getType() {
+    return "Double";
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ExprUtils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ExprUtils.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ExprUtils.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ExprUtils.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Collection;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+
+public class ExprUtils {
+
+  /**
+   * Make a bag comparator expression.
+   * 
+   * @param index
+   *          the index of datum in the source tuple that is a {@link DataBag}
+   *          object.
+   * @param expr
+   *          a comparator expression that corresponds to the member tuples in
+   *          the bag.
+   * @return A comparator expression.
+   * 
+   *         <p>
+   *         Example: suppose we have tuple schema as follows: <code>
+   * Tuple {
+   *    int a;
+   *    String b;
+   *    Bag {
+   *        Tuple {
+   *            Bytes c;
+   *            int d;
+   *            String e;
+   *        }
+   *     } f
+   * }
+   * </code>
+   * 
+   *         We would like to sort by
+   *         <code>Tuple(b, Bag(Negate(Tuple(e, c))))</code>, we can construct
+   *         the ComparatorExpr as follows; <code>
+   * ComparatorExpr expr = tupleComparator(
+   *    leafComparator(1, DataType.CHARARRAY), 
+   *    bagComparator(2, 
+   *        negateComparator(
+   *            tupleComparator(
+   *                leafCmparator(2, DataType.CHARARRAY),
+   *                leafComparator(0, DataType.BYTEARRAY)))))
+   * </code>
+   */
+  public static ComparatorExpr bagComparator(int index, ComparatorExpr expr) {
+    return new BagExpr(index, expr);
+  }
+
+  /**
+   * Converting an expression to a string.
+   * 
+   * @param expr
+   * @return A string representation of the comparator expression.
+   */
+  public static String exprToString(ComparatorExpr expr) {
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    PrintStream out = new PrintStream(bout);
+    expr.toString(out);
+    out.close();
+    return new String(bout.toByteArray());
+  }
+
+  /**
+   * Comparator for primitive types.
+   * 
+   * @param index
+   *          Index in the source tuple.
+   * @param type
+   *          One of the constants defined in {@link DataType} for primitive
+   *          types.
+   * @return Comparator expression.
+   */
+  public static ComparatorExpr primitiveComparator(int index, int type) {
+    switch (type) {
+      case DataType.BOOLEAN:
+        return new BooleanExpr(index);
+      case DataType.BYTE:
+        return new ByteExpr(index);
+      case DataType.BYTEARRAY:
+        return new BytesExpr(index);
+      case DataType.CHARARRAY:
+        return new StringExpr(index);
+      case DataType.DOUBLE:
+        return new DoubleExpr(index);
+      case DataType.FLOAT:
+        return new FloatExpr(index);
+      case DataType.INTEGER:
+        return new IntExpr(index);
+      case DataType.LONG:
+        return new LongExpr(index);
+      default:
+        throw new RuntimeException("Not a prmitive PIG type");
+    }
+  }
+
+  /**
+   * Negate comparator
+   * 
+   * @param expr
+   *          expression to perform negation on.
+   * @return A comparator expression.
+   */
+  public static ComparatorExpr negationComparator(ComparatorExpr expr) {
+    return NegateExpr.makeNegateExpr(expr);
+  }
+
+  /**
+   * Make a Tuple comparator expression.
+   * 
+   * @param exprs
+   *          member comparator expressions.
+   * @return A comparator expression.
+   */
+  public static ComparatorExpr tupleComparator(
+      Collection<? extends ComparatorExpr> exprs) {
+    return TupleExpr.makeTupleComparator(exprs);
+  }
+
+  /**
+   * Make a Tuple comparator expression.
+   * 
+   * @param exprs
+   *          member comparator expressions.
+   * @return A comparator expression.
+   */
+  public static ComparatorExpr tupleComparator(ComparatorExpr... exprs) {
+    return TupleExpr.makeTupleExpr(exprs);
+  }
+
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FixedLengthPrimitive.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FixedLengthPrimitive.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FixedLengthPrimitive.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FixedLengthPrimitive.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+/**
+ * Helper class for fixed length primitives
+ */
+public abstract class FixedLengthPrimitive extends LeafExpr {
+  protected byte[] bytes;
+
+  /**
+   * Constructor
+   * 
+   * @param index
+   * @param length
+   */
+  protected FixedLengthPrimitive(int index, int length) {
+    super(index);
+    bytes = new byte[length];
+  }
+ 
+  protected abstract void convertValue(byte[] b, Object o);
+
+  protected void appendObject(EncodingOutputStream out, Object o) {
+    convertValue(bytes, o);
+    out.write(bytes, 0, bytes.length);
+  }
+
+  protected boolean implicitBound() {
+    return false;
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FloatExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FloatExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FloatExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FloatExpr.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+
+public final class FloatExpr extends FixedLengthPrimitive {
+  public FloatExpr(int index) {
+    super(index, Float.SIZE / Byte.SIZE);
+  }
+
+  @Override
+  protected void convertValue(byte[] b, Object o) {
+    if (b.length != Float.SIZE / Byte.SIZE) {
+      throw new IllegalArgumentException("Incorrect buffer size");
+    }
+    Float f = (Float) o;
+    int l = Float.floatToIntBits(f);
+    if (l < 0) {
+      l = ~l;
+    } else {
+      l ^= 1 << (Integer.SIZE - 1);
+    }
+    b[3] = (byte) l;
+    b[2] = (byte) (l >> 8);
+    b[1] = (byte) (l >> 16);
+    b[0] = (byte) (l >> 24);
+  }
+
+  @Override
+  protected String getType() {
+    return "Float";
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/IntExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/IntExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/IntExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/IntExpr.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+
+public final class IntExpr extends FixedLengthPrimitive {
+  public IntExpr(int index) {
+    super(index, Integer.SIZE / Byte.SIZE);
+  }
+
+  protected void convertValue(byte[] b, Object o) {
+    if (b.length != Integer.SIZE / Byte.SIZE) {
+      throw new IllegalArgumentException("Incorrect buffer size");
+    }
+    int l = (Integer) o;
+    l ^= 1 << (Integer.SIZE - 1);
+    b[3] = (byte) l;
+    b[2] = (byte) (l >> 8);
+    b[1] = (byte) (l >> 16);
+    b[0] = (byte) (l >> 24);
+  }
+
+  @Override
+  protected String getType() {
+    return "Integer";
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/KeyGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/KeyGenerator.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/KeyGenerator.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/KeyGenerator.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Extended from ByteArrayOutputStream with direct access to the underlying byte
+ * array and explicit capacity expansion.
+ * 
+ * Also adding the capability of escaping:
+ * 
+ * <code>
+ * el - escape level for 0x00, valid value 0-252
+ * cel - escape level for 0xff, valid value 0-252
+ *                  escaped
+ * el       0x00            0x01        others
+ * 0        AS-IS           AS-IS       AS-IS
+ * >0       0x01 0x01+el    0x01FD      AS-IS
+ *                  escaped             
+ * cel      0xFF            0xFE        others
+ * 0        AS-IS           AS-IS       AS-IS
+ * >0       0xFE 0xFE-cel   0xFE02      AS-IS
+ * </code>
+ */
+class EncodingOutputStream extends ByteArrayOutputStream {
+  int escapeLevel = 0;
+  int comescLevel = 0;
+  boolean complement = false;
+
+  public EncodingOutputStream() {
+    super();
+  }
+
+  public EncodingOutputStream(int size) {
+    super(size);
+  }
+
+  public byte[] get() {
+    return buf;
+  }
+
+  public void ensureAvailable(int len) {
+    int newcount = count + len;
+    if (newcount > buf.length) {
+      buf = Arrays.copyOf(buf, Math.max(buf.length << 1, newcount));
+    }
+  }
+
+  public void setEscapeParams(int el, int cel, boolean c) {
+    escapeLevel = el;
+    comescLevel = cel;
+    complement = c;
+  }
+  
+  public int getEscapeLevel() {
+    return escapeLevel;
+  }
+
+  public int getComescLevel() {
+    return comescLevel;
+  }
+
+  public boolean getComplement() {
+    return complement;
+  }
+
+  void writeEscaped(int v, boolean c) {
+    ensureAvailable(2);
+    buf[count] = 0x01;
+    buf[count + 1] = (byte) v;
+    if (c) {
+      buf[count] = (byte) (~buf[count]);
+      buf[count + 1] = (byte) (~buf[count + 1]);
+    }
+    count += 2;
+  }
+
+  /**
+   * Write an escaped 0x00.
+   */
+  void escape00() {
+    writeEscaped(escapeLevel + 1, complement);
+  }
+
+  /**
+   * Write an escaped 0x01.
+   */
+  void escape01() {
+    writeEscaped(0xFD, complement);
+  }
+
+  /**
+   * Write an escaped 0xFE
+   */
+  void escapeFE() {
+    writeEscaped(0xFD, !complement);
+  }
+
+  /**
+   * write an escaped 0xFF
+   */
+  void escapeFF() {
+    writeEscaped(comescLevel + 1, !complement);
+  }
+
+  void complement(byte b[], int begin, int end) {
+    if (begin >= end) return;
+    ensureAvailable(end - begin);
+    if (!complement) {
+      System.arraycopy(b, begin, buf, count, end - begin);
+      count += (end - begin);
+    } else {
+      for (int i = begin; i < end; ++i) {
+        buf[count++] = (byte) ~b[i];
+      }
+    }
+  }
+
+  void escape(int b) {
+    switch (b) {
+      case 0:
+        escape00();
+        break;
+      case 1:
+        escape01();
+        break;
+      case 0xfe:
+        escapeFE();
+        break;
+      case 0xff:
+        escapeFF();
+        break;
+    }
+  }
+
+  public void write(int b) {
+    if (!shouldEscape(b, escapeLevel > 0, comescLevel > 0)) {
+      ensureAvailable(1);
+      if (complement) {
+        buf[count++] = (byte) ~b;
+      } else {
+        buf[count++] = (byte) b;
+      }
+    } else {
+      escape(b);
+    }
+  }
+
+  public void write(byte b[]) {
+    write(b, 0, b.length);
+  }
+
+  static boolean shouldEscape(int b, boolean checkLow, boolean checkHigh) {
+    if (checkLow && b < 0x2) return true;
+    if (checkHigh && b > 0xfd) return true;
+    return false;
+  }
+
+  public void write(byte b[], int off, int len) {
+    if ((escapeLevel > 0) || (comescLevel > 0)) {
+      ensureAvailable(len);
+      int begin = off;
+      int next = begin;
+      int end = off + len;
+      for (; begin < end; begin = next) {
+        while ((next < end)
+            && (!shouldEscape(b[next] & 0xff, escapeLevel > 0, comescLevel > 0))) {
+          ++next;
+        }
+        complement(b, begin, next);
+        if (next < end) {
+          escape(b[next] & 0xff);
+          ++next;
+        }
+      }
+    } else {
+      complement(b, off, off + len);
+    }
+  }
+}
+
+/**
+ * Generating binary keys for algorithmic comparators. A user may construct an
+ * algorithmic comparator by creating a ComparatorExpr object (through various
+ * static methods in this class). She could then create a KeyGenerator object
+ * and use it to create binary keys for tuple. The KeyGenerator object can be
+ * reused for different tuples that conform to the same schema. Sorting the
+ * tuples by the binary key yields the same ordering as sorting by the
+ * algorithmic comparator.
+ * 
+ * Basic idea (without optimization):
+ * <ul>
+ * <li>define two operations: escape and complement, that takes in a byte array,
+ * and outputs a byte array:
+ * 
+ * <pre>
+ * escape(byte[] bytes) {
+ *   for (byte b : bytes) {
+ *     if (b == 0)
+ *       emit(0x1, 0x0);
+ *     else if (b == 1)
+ *       emit(0x1, 0x2);
+ *     else emit(b);
+ *   }
+ * }
+ * 
+ * complement(byte[] bytes) {
+ *   for (byte b : bytes) {
+ *     emit(&tilde;b);
+ *   }
+ * }
+ * </pre>
+ * 
+ * <li>find ways to convert primitive types to bytes that compares in the same
+ * order as those objects.
+ * <li>operations:
+ * <ul>
+ * <li>negate(byte[] bytes) == complement(escape(bytes) + 0x0);
+ * <li>tuple(byte[] bytes1, byte[] bytes2) == escape(bytes1) + 0x0 +
+ * escape(bytes2)
+ * <li>bag(byte[] bytes1, byte[] bytes2, ... ) = escape(bytes1) + 0x0 +
+ * escape(bytes2) + ...
+ * </ul>
+ * <li>optimizations:
+ * <ul>
+ * <li>negate(negate(bytes)) == bytes;
+ * <li>tuple(a) == a;
+ * <li>tuple(a, tuple(b, c)) == tuple(a, b, c)
+ * <li>the actual output would be a concatenation of f1(o1), f2(o2), ..., where
+ * o1, o2, are leaf datums in the tuple or 0x0, and fi(oi) is a nested function
+ * of escape() and complement() calls.
+ * <li>The invariance we want to preserve is that escape(0x1) >
+ * escape(escape(0x0)) > escape(0x0) > 0x0. In the basic algorithm, these are
+ * escaped as 0x0102, 0x010100, 0x0100, 0x00, and are thus variable length. We
+ * can actually collapse nested consecutive calls of
+ * escape(escape(...escape(0))...) to escape(i, 0), where i is the level of
+ * nesting, and fi may be represented as nested inter-leaved calling of
+ * complement(bytes) and escape(i, bytes), where escape (i, 0x0) == 0x01 +
+ * (0x01+i) and escape(i, 1) == 0x010xFD. We do limit the total nesting depth by
+ * 252, which should be plenty.
+ * <li>we can further optimize fi as either escape(i, j, bytes) or
+ * complement(escape(i, j, bytes), where i is the level of nesting for escaping
+ * 0x0 and 0x1, and j the level of nesting for escaping 0xff and 0xfe.
+ * <li>If the binary keys being generated from a certain comparator either
+ * compare equal or differ at some byte position, but never the case where one
+ * is a prefix of another, then we do not need to add padding 0x0 for negate()
+ * or tuple(). This is captured by the method implicitBound() in ComparatorExpr.
+ * <li>We figure out how datums should be extracted from a tuple and being
+ * escaped only once for any expression, and write to a modified
+ * ByteArrayOutputStream in one pass.
+ * </ul>
+ * </ul>
+ * 
+ * TODO Remove the strong dependency with Pig by adding a DatumExtractor
+ * interface that allow applications to extract leaf datum from user objects,
+ * something like the following:
+ * 
+ * <pre>
+ * interface DatumExtractor {
+ *   Object extract(Object o);
+ * }
+ * </pre>
+ * 
+ * And user may do something like this:
+ * 
+ * <pre>
+ * class MyObject {
+ *  int a;
+ *  String b;
+ * }
+ * 
+ * ComparatorExpr expr = KeyBuilder.createLeafExpr(new DatumExtractor {
+ *  Object extract(Object o) {
+ *      MyObject obj = (MyObject)o;
+ *      return obj.b;
+ *  } }, DataType.CHARARRAY);
+ * </pre>
+ * 
+ * TODO Change BagExpr to IteratorExpr, so that it may be used in more general
+ * context (any Java collection).
+ * 
+ * TODO Add an ArrayExpr (for Java []).
+ */
+public class KeyGenerator {
+  private EncodingOutputStream out;
+  private List<LeafGenerator> list;
+
+  /**
+   * Create a key builder that can generate binary keys for the input key
+   * expression.
+   * 
+   * @param expr
+   *          comparator expression
+   */
+  public KeyGenerator(ComparatorExpr expr) {
+    out = new EncodingOutputStream();
+    list = new ArrayList<LeafGenerator>();
+    expr.appendLeafGenerator(list, 0, 0, false, false);
+    // illustrate(System.out);
+    System.out.println(ExprUtils.exprToString(expr));
+  }
+
+  /**
+   * Reset the key builder for a new expression.
+   * 
+   * @param expr
+   *          comparator expression
+   */
+  public void reset(ComparatorExpr expr) {
+    list.clear();
+    expr.appendLeafGenerator(list, 0, 0, false, false);
+  }
+
+  /**
+   * Generate the binary key for the input tuple
+   * 
+   * @param t
+   *          input tuple
+   * @return A {@link BytesWritable} containing the binary sorting key for the
+   *         input tuple.
+   * @throws ExecException
+   */
+  public BytesWritable generateKey(Tuple t) throws ExecException {
+    out.reset();
+    for (Iterator<LeafGenerator> it = list.iterator(); it.hasNext();) {
+      LeafGenerator e = it.next();
+      e.append(out, t);
+    }
+    BytesWritable ret = new BytesWritable();
+    ret.set(out.get(), 0, out.size());
+    return ret;
+  }
+
+  /**
+   * Illustrate how the key would be generated from source.
+   * 
+   * @param ps
+   *          The output print stream.
+   */
+  public void illustrate(PrintStream ps) {
+    for (Iterator<LeafGenerator> it = list.iterator(); it.hasNext();) {
+      LeafGenerator e = it.next();
+      e.illustrate(ps);
+    }
+    ps.print("\n");
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafExpr.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Base class of comparator expressions that are the leaves of the expression
+ * tree.
+ */
+public abstract class LeafExpr extends ComparatorExpr {
+  int index;
+
+  /**
+   * Constructor
+   * 
+   * @param index
+   *          tuple position index
+   */
+  protected LeafExpr(int index) {
+    this.index = index;
+  }
+
+  /**
+   * Default illustrator for leaf expressions.
+   * 
+   * @param out
+   * @param escapeLevel
+   * @param comescLevel
+   * @param complement
+   */
+  protected void illustrate(PrintStream out, int escapeLevel, int comescLevel,
+      boolean complement) {
+    out.printf("(%s, %d, %d, %d, %b)", getType(), index, escapeLevel,
+        comescLevel, complement);
+  }
+
+  protected void append(EncodingOutputStream out, Tuple tuple)
+      throws ExecException {
+	  
+		if(index >= tuple.size())
+			return;
+	    Object o = tuple.get(index);
+	    if (o == null) {    
+	      out.write(0x0);
+	      return;
+	    }
+	    appendObject(out, o);
+  }
+
+  protected abstract void appendObject(EncodingOutputStream out, Object object)
+      throws ExecException;
+
+  protected abstract String getType();
+
+  protected abstract boolean implicitBound();
+
+  protected void appendLeafGenerator(List<LeafGenerator> list, int el, int cel,
+      boolean c, boolean explicitBound) {
+    if (explicitBound && implicitBound()) {
+      // requiring explicit bound, while the leaf does not already have it, so
+      // we do escaping and add a bound at end.
+      list.add(new LeafGenerator(this, el + 1, cel, c));
+      list.add(new LeafGenerator(null, el, cel, c));
+    } else {
+      list.add(new LeafGenerator(this, el, cel, c));
+    }
+  }
+
+  @Override
+  protected
+  void toString(PrintStream out) {
+    out.printf("%s(%d)", getType(), index);
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Generate partial binary key for a leaf expression.
+ */
+public final class LeafGenerator {
+  final LeafExpr leaf; // may be null to represent adding '0'
+  final int escapeLevel;
+  final int comescLevel;
+  final boolean complement;
+
+  /**
+   * @param leaf
+   *          The leaf expression
+   * @param el
+   *          escape level
+   * @param cel
+   *          complement escape level
+   * @param complement
+   *          complement the value
+   */
+  public LeafGenerator(LeafExpr leaf, int el, int cel, boolean complement) {
+    this.leaf = leaf;
+    this.escapeLevel = el;
+    if(escapeLevel > 1) {
+    	System.out.println("gaurav");
+    }
+    this.comescLevel = cel;
+    this.complement = complement;
+  }
+
+  void append(EncodingOutputStream out, Tuple tuple) throws ExecException {
+    out.setEscapeParams(escapeLevel, comescLevel, complement);
+    if (leaf == null) { // add a '\0'
+      out.write(0x0);
+    } else {
+      leaf.append(out, tuple);
+    }
+  }
+
+  void illustrate(PrintStream out) {
+    if (leaf == null) {
+      out.printf("(%s, NA, %d, %d, %b)", "NULL", escapeLevel, comescLevel,
+          complement);
+    } else {
+      leaf.illustrate(out, escapeLevel, comescLevel, complement);
+    }
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+
+public final class LongExpr extends FixedLengthPrimitive {
+  public LongExpr(int index) {
+    super(index, Long.SIZE / Byte.SIZE);
+  }
+
+  protected void convertValue(byte[] b, Object o) {
+    if (b.length != Long.SIZE / Byte.SIZE) {
+      throw new IllegalArgumentException("Incorrect buffer size");
+    }
+    long l = (Long) o;
+    l ^= 1L << (Long.SIZE - 1);
+    b[7] = (byte) l;
+    b[6] = (byte) (l >> 8);
+    b[5] = (byte) (l >> 16);
+    b[4] = (byte) (l >> 24);
+    b[3] = (byte) (l >> 32);
+    b[2] = (byte) (l >> 40);
+    b[1] = (byte) (l >> 48);
+    b[0] = (byte) (l >> 56);
+  }
+
+  @Override
+  protected String getType() {
+    return "Long";
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+import java.util.List;
+
+/**
+ * Negate expression
+ */
+public class NegateExpr extends ComparatorExpr {
+  protected final ComparatorExpr expr;
+
+  /**
+   * constructor
+   * 
+   * @param expr
+   */
+  protected NegateExpr(ComparatorExpr expr) {
+    this.expr = expr;
+  }
+
+  @Override
+  protected void appendLeafGenerator(List<LeafGenerator> list, int el, int cel,
+      boolean c, boolean explicitBound) {
+    expr.appendLeafGenerator(list, cel, el, !c, true);
+  }
+
+  /**
+   * @return the child expression
+   */
+  public ComparatorExpr childExpr() {
+    return expr;
+  }
+
+  @Override
+  protected void toString(PrintStream out) {
+    out.print("Negate(");
+    expr.toString(out);
+    out.print(")");
+  }
+  
+  /**
+   * Make a negate expression
+   */
+  public static ComparatorExpr makeNegateExpr(ComparatorExpr expr) {
+    if (expr instanceof NegateExpr) {
+      NegateExpr negateExpr = (NegateExpr) expr;
+      return negateExpr.childExpr();
+    }
+
+    return new NegateExpr(expr);
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+
+public final class ShortExpr extends FixedLengthPrimitive {
+  public ShortExpr(int index) {
+    super(index, Short.SIZE / Byte.SIZE);
+  }
+
+  protected void convertValue(byte[] b, Object o) {
+    if (b.length != Short.SIZE / Byte.SIZE) {
+      throw new IllegalArgumentException("Incorrect buffer size");
+    }
+    short l = (Short) o;
+    l ^= 1 << (Short.SIZE - 1);
+    b[1] = (byte) l;
+    b[0] = (byte) (l >> 8);
+  }
+
+  @Override
+  protected String getType() {
+    return "Short";
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.Text;
+
+public final class StringExpr extends LeafExpr {
+  public StringExpr(int index) {
+    super(index);
+  }
+
+  @Override
+  protected void appendObject(EncodingOutputStream out, Object o) {
+    String s = (String) o;
+    try {
+      ByteBuffer bytes = Text.encode(s);
+      out.write(bytes.array(), 0, bytes.limit());
+    } catch (IOException e) {
+      throw new RuntimeException("Conversion error", e);
+    }
+  }
+
+  @Override
+  protected boolean implicitBound() {
+    return true;
+  }
+
+  @Override
+  protected String getType() {
+    return "String";
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+public class TupleExpr extends ComparatorExpr {
+  protected final List<ComparatorExpr> exprs;
+
+  protected TupleExpr(List<ComparatorExpr> exprs) {
+    this.exprs = exprs;
+  }
+
+  @Override
+  protected void appendLeafGenerator(List<LeafGenerator> list, int el, int cel,
+      boolean c, boolean explicitBound) {
+    int length = exprs.size();
+    for (int i = 0; i < length - 1; ++i) {
+      ComparatorExpr e = exprs.get(i);
+      e.appendLeafGenerator(list, el, cel, c, true);
+    }
+    exprs.get(length - 1).appendLeafGenerator(list, el, cel, c, explicitBound);
+  }
+
+  /**
+   * Get the children expressions.
+   * 
+   * @return The children expressions.
+   */
+  public List<ComparatorExpr> childrenExpr() {
+    return exprs;
+  }
+
+  @Override
+  protected void toString(PrintStream out) {
+    out.print("Tuple");
+    String sep = "(";
+    for (Iterator<ComparatorExpr> it = exprs.iterator(); it.hasNext();) {
+      out.printf(sep);
+      it.next().toString(out);
+      sep = ", ";
+    }
+
+    out.print(")");
+  }
+  
+  /**
+   * Make a tuple expression
+   * 
+   * @param exprs
+   *          children expressions
+   * @return a comparator expression
+   */
+  public static ComparatorExpr makeTupleExpr(ComparatorExpr... exprs) {
+    if (exprs.length == 0)
+      throw new IllegalArgumentException("Zero-length expression list");
+    if (exprs.length == 1) return exprs[0];
+    List<ComparatorExpr> aggr = new ArrayList<ComparatorExpr>(exprs.length);
+    for (ComparatorExpr e : exprs) {
+      if (e instanceof TupleExpr) {
+        aggr.addAll(((TupleExpr) e).childrenExpr());
+      } else {
+        aggr.add(e);
+      }
+    }
+    return new TupleExpr(aggr);
+  }
+  
+  /**
+   * Make a tuple expression
+   * 
+   * @param exprs
+   *          children expressions
+   * @return a comparator expression
+   */
+  public static ComparatorExpr makeTupleComparator(
+      Collection<? extends ComparatorExpr> exprs) {
+    if (exprs.size() == 0)
+      throw new IllegalArgumentException("Zero-length expression list");
+    if (exprs.size() == 1) return exprs.iterator().next();
+    List<ComparatorExpr> aggr = new ArrayList<ComparatorExpr>(exprs.size());
+    for (Iterator<? extends ComparatorExpr> it = exprs.iterator(); it.hasNext();) {
+      ComparatorExpr e = it.next();
+      if (e instanceof TupleExpr) {
+        aggr.addAll(((TupleExpr) e).childrenExpr());
+      } else {
+        aggr.add(e);
+      }
+    }
+    return new TupleExpr(aggr);
+  }
+
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Utilities to allow PIG Storer to generate keys for sorted Zebra tables
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Implementation of PIG Storer/Loader Interfaces
+ */
+package org.apache.hadoop.zebra.pig;
+

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java Thu Nov  5 21:02:57 2009
@@ -26,57 +26,93 @@
 import org.apache.hadoop.io.file.tfile.Utils;
 import org.apache.pig.data.DataType;
 
+/**
+ * Zebra Column Type
+ */
 public enum ColumnType {
+  /**
+   * Any type
+   */
   ANY("any") {
 	  public byte pigDataType() {
 		  return DataType.UNKNOWN;
 	  }
   },
+  /**
+   * Integer
+   */
   INT("int") {
     public byte pigDataType() {
       return DataType.INTEGER;
     }
   },
+  /**
+   * Long
+   */
   LONG("long") {
     public byte pigDataType() {
       return DataType.LONG;
     }
   },
+  /**
+   * Float
+   */
   FLOAT("float") {
     public byte pigDataType() {
       return DataType.FLOAT;
     }
   },
+  /**
+   * Double
+   */
   DOUBLE("double") {
     public byte pigDataType() {
       return DataType.DOUBLE;
     }
   },
+  /**
+   * Boolean
+   */
   BOOL("bool") {
     public byte pigDataType() {
       return DataType.BOOLEAN;
     }
   },
+  /**
+   * Collection
+   */
   COLLECTION("collection") {
     public byte pigDataType() {
       return DataType.BAG;
     }
   },
+  /**
+   * Map
+   */
   MAP("map") {
     public byte pigDataType() {
       return DataType.MAP;
     }
   },
+  /**
+   * Record
+   */
   RECORD("record") {
     public byte pigDataType() {
       return DataType.TUPLE;
     }
   },
+  /**
+   * String
+   */
   STRING("string") {
     public byte pigDataType() {
       return DataType.CHARARRAY;
     }
   },
+  /**
+   * Bytes
+   */
   BYTES("bytes") {
     public byte pigDataType() {
       return DataType.BYTEARRAY;
@@ -102,6 +138,13 @@
     return ColumnType.valueOf(name.toUpperCase());
   }
 
+  /**
+   * Get the Zebra type from a Pig type
+   *
+   * @param dt Pig type
+   *
+   * @return Zebra type
+   */
   public static ColumnType getTypeByPigDataType(byte dt) {
     for (ColumnType ct : ColumnType.values()) {
       if (ct.pigDataType() == dt) {
@@ -111,18 +154,40 @@
     return null;
   }
 
+  /**
+   * Get the Zebra type name for the passed in Zebra column type
+   *
+   * @param columntype Zebra column type
+   *
+   * @return string representation of the Zebra column type
+   */
   public static String findTypeName(ColumnType columntype) {
 	  return columntype.getName();
   }
 
+  /**
+   * Get the Zebra type name for this Zebra column type
+   *
+   * @return string representation of the Zebra column type
+   */
   public String getName() {
     return name;
   }
 
+  /**
+   * Same as getType
+   */
   public String toString() {
     return name;
   }
 
+  /**
+   * check if a column type contains internal schema
+   *
+   * @param columnType Zebra column type
+   *
+   * @return true if the type is RECORD, MAP or COLLECTION
+   */
   public static boolean isSchemaType(ColumnType columnType) {
 	  return ((columnType == RECORD) || (columnType == MAP) || (columnType == COLLECTION));
   }

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java Thu Nov  5 21:02:57 2009
@@ -37,6 +37,9 @@
 
   private static final long schemaVersion = 1L;
 
+  /**
+   * Column Schema in Schema
+   */
   public static class ColumnSchema {
     private String name;
     private ColumnType type;
@@ -73,6 +76,8 @@
 
     /**
      * access function to get the column name 
+     *
+     * @return name of the column
      */
     public String getName() {
       return name;
@@ -80,6 +85,8 @@
     
     /**
      * access function to get the column type 
+     *
+     * @return column type
      */
     public ColumnType getType() {
       return type;
@@ -87,6 +94,8 @@
     
     /**
      * access function to get the column name 
+     *
+     * @return column index in the parent schema
      */
     public int getIndex() {
       return index;
@@ -128,8 +137,8 @@
     /**
      * Compare two field schema for equality
      * 
-     * @param fschema
-     * @param fother
+     * @param fschema one column schema to be compared
+     * @param fother the other column schema to be compared
      * @return true if ColumnSchema are equal, false otherwise
      */
     public static boolean equals(ColumnSchema fschema, ColumnSchema fother) {
@@ -195,28 +204,47 @@
     /*
      * Returns the schema for the next level structure, in record, collection
      * and map.
+     *
+     * @return Schema of the column
      */
     public Schema getSchema() {
       return schema;
     }
   }
 
-  /*
-   * helper class to parse a column name string one section at a time and find
+  /**
+   * Helper class to parse a column name string one section at a time and find
    * the required type for the parsed part.
    */
   public static class ParsedName {
-    public String mName;
-    int mKeyOffset; // the offset where the keysstring starts
-    public ColumnType mDT = ColumnType.ANY; // parent's type
+    private String mName;
+    private int mKeyOffset; // the offset where the keysstring starts
+    private ColumnType mDT = ColumnType.ANY; // parent's type
 
+    /**
+     * Default ctor
+     */
     public ParsedName() {
     }
 
+    /**
+     * Set the name
+     *
+     * @param name
+     *            column name string
+     */
     public void setName(String name) {
       mName = name;
     }
 
+    /**
+     * Set the name and type
+     *
+     * @param name
+     *            column name string
+     * @param pdt
+     *            column type
+     */
     public void setName(String name, ColumnType pdt) {
       mName = name;
       mDT = pdt;
@@ -227,21 +255,42 @@
       mKeyOffset = keyStrOffset;
     }
 
-    void setDT(ColumnType dt) {
+    /**
+     * Set the column type
+     *
+     * @param dt
+     *          column type to be set with
+     */
+    public void setDT(ColumnType dt) {
       mDT = dt;
     }
 
-    ColumnType getDT() {
+    /**
+     * Get the column type
+     * 
+     * @return column type
+     */
+    public ColumnType getDT() {
       return mDT;
     }
 
-    String getName() {
+    /**
+     * Get the column name
+     *
+     * @return column name
+     */
+    public String getName() {
       return mName;
     }
 
+    /**
+     * Parse one sector of a fully qualified column name; also checks validity
+     * of use of the MAP and RECORD delimiters
+     *
+     * @param fs
+     *          column schema this column name is checked against with          
+     */
     public String parseName(Schema.ColumnSchema fs) throws ParseException {
-      // parse one sector of a fq name, also checks sanity of MAP and RECORD
-      // fields
       int fieldIndex, hashIndex;
       fieldIndex = mName.indexOf('.');
       hashIndex = mName.indexOf('#');

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Zebra Schema
+ */
+package org.apache.hadoop.zebra.schema;
+

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java Thu Nov  5 21:02:57 2009
@@ -50,7 +50,6 @@
    
    // tmp schema file name, used as a flag of unfinished CG
    private final static String SCHEMA_FILE = ".schema";
-   private final static String DEFAULT_COMPARATOR = "memcmp";
 	// schema version, should be same as BasicTable's most of the time
    private final static Version SCHEMA_VERSION =
      new Version((short) 1, (short) 1);
@@ -87,19 +86,18 @@
      this.version = SCHEMA_VERSION;
    }
 
-   public CGSchema(Schema schema, boolean sorted) {
+   public CGSchema(Schema schema, boolean sorted, String comparator) {
      this.sorted = sorted;
-     this.comparator = (sorted) ? DEFAULT_COMPARATOR : "";
+     this.comparator = (sorted) ? (comparator == null ? SortInfo.DEFAULT_COMPARATOR : comparator) : "";
      this.schema = schema;
      this.version = SCHEMA_VERSION;
    }
 
-   public CGSchema(Schema schema, boolean sorted, String name, String serializer, String compressor, String owner, String group, short perm) {
-  	this(schema, sorted);
+   public CGSchema(Schema schema, boolean sorted, String comparator, String name, String serializer, String compressor, String owner, String group, short perm) {
+  	this(schema, sorted, comparator);
     this.name = name;
   	this.serializer = serializer;
   	this.compressor = compressor;
-//  	this.owner = owner;
   	this.group = group;
   	this.perm  = perm;
    }

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java Thu Nov  5 21:02:57 2009
@@ -41,6 +41,9 @@
  * insertions and queries respectively.
  */
 public class Partition {
+  /**
+   * Storage split types
+   */
   public enum SplitType {
     NONE, RECORD, COLLECTION, MAP
   }
@@ -317,7 +320,7 @@
      */
     public CGSchema generateDefaultCGSchema(String name, String compressor,
         String serializer, String owner, String group, 
-        short perm, final int defaultCGIndex) throws ParseException {
+        short perm, final int defaultCGIndex, String comparator) throws ParseException {
       Schema schema = new Schema();
       Schema.ColumnSchema fs;
       for (int i = 0; i < mSchema.getNumColumns(); i++) {
@@ -369,13 +372,13 @@
         }
       }
       CGSchema defaultSchema =
-          (schema.getNumColumns() == 0 ? null : new CGSchema(schema, false, name, serializer, compressor, owner, group, perm));
+          (schema.getNumColumns() == 0 ? null : new CGSchema(schema, false, comparator, name, serializer, compressor, owner, group, perm));
       return defaultSchema;
     }
 
     /**
      * returns "hash key-to-(sub)column" map on a (sub)column which is MAP-split
-     * aross different hash keys
+     * across different hash keys
      */
     public HashSet<PartitionInfo.ColumnMappingEntry> getSplitMap(
         Schema.ColumnSchema fs) {
@@ -661,17 +664,21 @@
   private Projection mProjection = null;
   private ArrayList<PartitionedColumn> mPCNeedTmpTuple = new ArrayList<PartitionedColumn>();
   private ArrayList<PartitionedColumn> mPCNeedMap = new ArrayList<PartitionedColumn>();
+  private String comparator;
+  private boolean mSorted;
+  private SortInfo mSortInfo;
 
   /*
    * ctor used for LOAD
    */
-  public Partition(Schema schema, Projection projection, String storage)
+  public Partition(Schema schema, Projection projection, String storage, String comparator)
       throws ParseException, IOException {
     mSchema = schema;
     TableStorageParser sparser =
-        new TableStorageParser(new StringReader(storage), this, mSchema);
+        new TableStorageParser(new StringReader(storage), this, mSchema, comparator);
     mPartitionInfo = new PartitionInfo(schema);
-    ArrayList<CGSchema> cgschemas = sparser.StorageSchema();
+    ArrayList<CGSchema> cgschemas = new ArrayList<CGSchema>();
+    sparser.StorageSchema(cgschemas);
     mCGSchemas = cgschemas.toArray(new CGSchema[cgschemas.size()]);
     mProjection = projection;
     Schema projSchema = projection.getProjectionSchema();
@@ -747,15 +754,34 @@
   /*
    * ctor used by STORE
    */
-  public Partition(final String schema, final String storage)
+  public Partition(final String schema, final String storage, String comparator, String sortColumns)
+        throws ParseException, IOException
+  {
+    TableSchemaParser parser = new TableSchemaParser(new StringReader(schema));
+    mSchema = parser.RecordSchema(null);
+    mSortInfo = SortInfo.parse(sortColumns, mSchema, comparator);
+    mSorted = (mSortInfo != null && mSortInfo.size() > 0);
+    this.comparator = (mSorted ? mSortInfo.getComparator() : "");
+    storeConst(storage);
+  }
+
+  public Partition(String schema, final String storage, String comparator)
       throws ParseException, IOException
   {
     TableSchemaParser parser = new TableSchemaParser(new StringReader(schema));
     mSchema = parser.RecordSchema(null);
+    this.comparator = comparator;
+    storeConst(storage);
+  }
+
+  private void storeConst(final String storage)
+      throws ParseException, IOException
+  {
     mPartitionInfo = new PartitionInfo(mSchema);
     TableStorageParser sparser =
-        new TableStorageParser(new StringReader(storage), this, mSchema);
-    ArrayList<CGSchema> cgschemas = sparser.StorageSchema();    
+      new TableStorageParser(new StringReader(storage), this, mSchema, this.comparator);
+    ArrayList<CGSchema> cgschemas = new ArrayList<CGSchema>();
+    sparser.StorageSchema(cgschemas);
     mCGSchemas = cgschemas.toArray(new CGSchema[cgschemas.size()]);
     int size = mSchema.getNumColumns();
     PartitionInfo.ColumnMappingEntry cgindex;
@@ -805,6 +831,18 @@
       mPCNeedMap.get(i).createMap();
   }
 
+  public SortInfo getSortInfo() {
+    return mSortInfo;
+  }
+
+  public boolean isSorted() {
+    return mSorted;
+  }
+
+  public String getComparator() {
+    return comparator;
+  }
+
   /**
    * returns table schema
    */
@@ -856,10 +894,10 @@
     {
       if (projectedKeys != null)
       {
-        pn.mDT = ColumnType.MAP;
+        pn.setDT(ColumnType.MAP);
         map = true;
       } else {
-        pn.mDT = ColumnType.ANY;
+        pn.setDT(ColumnType.ANY);
         PartitionInfo.ColumnMappingEntry cme;
         for (Iterator<PartitionInfo.ColumnMappingEntry> it = results.iterator(); it.hasNext(); )
         {
@@ -985,7 +1023,7 @@
       pn.parseName(fs);
       Schema.ParsedName oripn = new Schema.ParsedName();
       for (int i = 0; i < schema.getNumColumns(); i++) {
-        oripn.setName(new String(pn.mName), pn.mDT);
+        oripn.setName(new String(pn.getName()), pn.getDT());
         child = schema.getColumn(i);
         if (getCGIndex(child) == null) {
           // not a CG: go one level lower
@@ -1038,7 +1076,7 @@
       PartitionedColumn parent, Schema.ColumnSchema child, int i,
       int fi, HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> cgindices) throws IOException {
     CGEntry cgentry;
-    if (pn.mDT == ColumnType.ANY) {
+    if (pn.getDT() == ColumnType.ANY) {
       // this subtype is MAP split and the projection is on the whole MAP:
       // => need to add stitches for all split keys
 
@@ -1236,7 +1274,6 @@
     while (it.hasNext())
       it.next().getValue().read();
 
-    TypesUtils.resetTuple(t);
     // dispatch
     mExecs.get(mStitchSize - 1).setRecord(t);
 
@@ -1313,9 +1350,9 @@
 
   public CGSchema generateDefaultCGSchema(String name, String compressor, String serializer,
       String owner, String group, short perm, 
-      final int defaultCGIndex) throws ParseException {
+      final int defaultCGIndex, String comparator) throws ParseException {
     return mPartitionInfo.generateDefaultCGSchema(name, compressor, serializer,owner, group, perm,
-        defaultCGIndex);
+        defaultCGIndex, comparator);
   }
 
   public void setSplit(Schema.ColumnSchema fs, SplitType st, SplitType cst, String name, String childName, boolean splitChild) throws ParseException {

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.types;
+
+import java.io.IOException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.schema.ColumnType;
+import org.apache.hadoop.io.file.tfile.TFile;
+
+/**
+ * Sortness related Information
+ */
+public class SortInfo {
+  public static final String DEFAULT_COMPARATOR =  TFile.COMPARATOR_MEMCMP;
+  private boolean global = true;
+  private String[] columns;
+  private int[] indices = null;
+  private ColumnType[] types = null;
+  private String comparator = null;
+  private Schema schema = null;
+  private static final String SORTED_COLUMN_DELIMITER = ",";
+
+  private SortInfo(String[] columns, int[] sortIndices, ColumnType[] sortColTypes, String comparator, Schema schema)
+  {
+    this.columns = columns;
+    this.indices = sortIndices;
+    this.comparator = comparator;
+    this.schema = schema;
+    this.types = sortColTypes;
+  }
+
+  /**
+   * Get an array of the sorted column names with the first column
+   * being the primary sort key, the second column being the
+   * secondary sort key, ..., etc.
+   *
+   * @return an array of strings of sorted column names
+   */
+  public String[] getSortColumnNames() {
+    return columns;
+  }
+  
+  /**
+   * Get an array of zebra types of the sorted columns with the first column
+   * being the primary sort key, the second column being the
+   * secondary sort key, ..., etc.
+   *
+   * @return an array of strings of sorted column names
+   */
+  public ColumnType[] getSortColumnTypes() {
+	  return types;
+  }
+
+  /**
+   * Get an array of column indices in schema of the sorted columns with the first column
+   * being the primary sort key, the second column being the
+   * secondary sort key, ..., etc.
+   *
+   * @return an array of strings of sorted column names
+   */
+  public int[] getSortIndices() {
+    return indices;
+  }
+
+  /**
+   * Get the number of sorted columns
+   *
+   * @return number of sorted columns
+   */
+  public int size() {
+    return (columns == null ? 0 : columns.length);
+  }
+
+  /**
+   * Get the comparator name 
+   *
+   * @return  comparator name
+   */
+  public String getComparator() {
+    return comparator;
+  }
+
+  /**
+   * Check if the two SortInfo objects are equal
+   *
+   * @return true if one's sort columns is equal to a leading portion of the other's
+   */
+  public boolean equals(String sortcolumns, String comparator) throws IOException {
+    if (sortcolumns == null || sortcolumns.trim().isEmpty())
+    {
+      return false;
+    }
+    String[] columns = sortcolumns.trim().split(SORTED_COLUMN_DELIMITER);
+    for (String column : columns)
+    {
+    	if (schema.getColumn(column) == null)
+            throw new IOException(column + " does not exist in schema");
+    }
+    if (this.columns.length <= columns.length)
+    {
+      for (int i = 0; i < this.columns.length; i++)
+     {
+       if (!this.columns[i].equals(columns[i]))
+         return false;
+     }
+    } else {
+      for (int i = 0; i < columns.length; i++)
+     {
+       if (!columns[i].equals(this.columns[i]))
+         return false;
+     }
+    }
+    if (this.comparator == null && comparator == null)
+    {
+      return true;
+    } else if (this.comparator != null && comparator != null)
+    {
+      return (this.comparator.equals(comparator));
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Build a SortInfo object from sort column names, schema, and comparator
+   *
+   * @param sortStr
+   *                     comma-separated sort column names
+   * @param schema
+   *                     schema of the Zebra table for the sort columns
+   * @param comparator
+   *                     comparator name
+   * @return             newly built SortInfo object
+   */
+  public static SortInfo parse(String sortStr, Schema schema, String comparator) throws IOException
+  {
+    if (sortStr == null || sortStr.trim().isEmpty())
+    {
+      return null;
+    }
+    String[] sortedColumns = sortStr.trim().split(SORTED_COLUMN_DELIMITER);
+    int[] sortColIndices = new int[sortedColumns.length];
+    ColumnType[] sortColTypes = new ColumnType[sortedColumns.length];
+    Schema.ColumnSchema cs;
+    for (int i = 0; i < sortedColumns.length; i++)
+    {
+      sortedColumns[i] = sortedColumns[i].trim();
+      /*
+       * sanity check the sort column's existence
+       */
+      if ((cs = schema.getColumn(sortedColumns[i])) == null)
+        throw new IOException(sortedColumns[i] + " does not exist in schema");
+      sortColIndices[i] = schema.getColumnIndex(sortedColumns[i]);
+      sortColTypes[i] = schema.getColumn(sortedColumns[i]).getType();
+    }
+    String comparatorInUse = (sortedColumns.length > 0 ?
+        (comparator == null || comparator.isEmpty() ?
+          DEFAULT_COMPARATOR : comparator) : null);
+    return new SortInfo(sortedColumns, sortColIndices, sortColTypes, comparatorInUse, schema);
+  }
+
+  /**
+   * Build a string of comma-separated sort column names from an array of sort column names
+   *
+   * @param names an array of sort column names
+   *
+   * @return a string of comma-separated sort column names
+   */
+  public static String toSortString(String[] names)
+  {
+    if (names == null || names.length == 0)
+      return null;
+
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < names.length; i++)
+    {
+      if (i > 0)
+        sb.append(SORTED_COLUMN_DELIMITER);
+      sb.append(names[i]);
+    }
+    return sb.toString();
+  }
+}

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java Thu Nov  5 21:02:57 2009
@@ -30,14 +30,14 @@
 import org.apache.hadoop.zebra.parser.ParseException;
 
 
+/**
+ * This class extracts a subfield from a column or subcolumn stored
+ * in entirety on disk
+ * It should be used only by readers whose serializers do not
+ * support projection
+ */
 public class SubColumnExtraction {
-	/**
-	 * This class extracts a subfield from a column or subcolumn stored
-   * in entirety on disk
-	 * It should be used only by readers whose serializers do not
-   * support projection
-	 */
-	public static class SubColumn {
+	static class SubColumn {
 		Schema physical;
 		Projection projection;
 		ArrayList<SplitColumn> exec = null;
@@ -77,16 +77,17 @@
 				pn.setName(name);
 				fs = physical.getColumnSchema(pn);
 				if (keySet != null)
-				  pn.mDT = ColumnType.MAP;
+				  pn.setDT(ColumnType.MAP);
 				if (fs == null)
 		     		continue; // skip non-existing field
 		
 				j = fs.getIndex();
-				if (pn.mDT == ColumnType.MAP || pn.mDT == ColumnType.RECORD || pn.mDT == ColumnType.COLLECTION)
+				ColumnType ct = pn.getDT();
+				if (ct == ColumnType.MAP || ct == ColumnType.RECORD || ct == ColumnType.COLLECTION)
 				{
 					// record/map subfield is expected
-					sc = new SplitColumn(j, pn.mDT);
-          if (pn.mDT == ColumnType.MAP)
+					sc = new SplitColumn(j, ct);
+          if (ct == ColumnType.MAP)
             sclist.add(sc);
 					exec.add(sc); // breadth-first
 					// (i, j) represents the mapping between projection and physical schema
@@ -105,18 +106,19 @@
      * build the split executions
      */
 		private void buildSplit(SplitColumn parent, Schema.ColumnSchema fs,
-        Schema.ParsedName pn, final int projIndex, HashSet<String> keys) throws ParseException, ExecException
+        final Schema.ParsedName pn, final int projIndex, HashSet<String> keys) throws ParseException, ExecException
 		{
 			// recursive call to get the next level schema
-			if (pn.mDT != fs.getType())
+		  ColumnType ct = pn.getDT();
+			if (ct != fs.getType())
 	      	throw new ParseException(fs.getName()+" is not of proper type.");
 	
 			String prefix;
 			int fieldIndex;
 			SplitColumn sc;
-			Partition.SplitType callerDT = (pn.mDT == ColumnType.MAP ? Partition.SplitType.MAP :
-				                               (pn.mDT == ColumnType.RECORD ? Partition.SplitType.RECORD :
-				                                 (pn.mDT == ColumnType.COLLECTION ? Partition.SplitType.COLLECTION :
+			Partition.SplitType callerDT = (ct == ColumnType.MAP ? Partition.SplitType.MAP :
+				                               (ct == ColumnType.RECORD ? Partition.SplitType.RECORD :
+				                                 (ct == ColumnType.COLLECTION ? Partition.SplitType.COLLECTION :
 				                        	         Partition.SplitType.NONE)));
 			prefix = pn.parseName(fs);
 			Schema schema = fs.getSchema();
@@ -133,11 +135,12 @@
 				fieldIndex = 0;
 			}
 	
-			if (pn.mDT != ColumnType.ANY)
+		  ct = pn.getDT();
+			if (ct != ColumnType.ANY)
 			{
 				// record subfield is expected
-			 	sc = new SplitColumn(fieldIndex, pn.mDT);
-        if (pn.mDT == ColumnType.MAP)
+			 	sc = new SplitColumn(fieldIndex, ct);
+        if (ct == ColumnType.MAP)
           sclist.add(sc);
 			 	exec.add(sc); // breadth-first
 			 	buildSplit(sc, fs, pn, projIndex, null);

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt Thu Nov  5 21:02:57 2009
@@ -30,7 +30,7 @@
 import org.apache.hadoop.zebra.types.*;
 
 public class TableStorageParser {
-		public TableStorageParser(java.io.Reader in, Partition partition, Schema schema) { this(in); mSchema = schema; this.partition = partition;}
+		public TableStorageParser(java.io.Reader in, Partition partition, Schema schema, String comparator) { this(in); mSchema = schema; this.partition = partition; this.comparator = comparator; }
 		private Schema mSchema;
 		private int mDefaultCGIndex = -1;
 		private String mName = null;
@@ -39,6 +39,7 @@
 		private short mPerm = -1;
 		private int mCGCount = 0;
 		private Partition partition;
+    private String comparator = null;
 }
 PARSER_END(TableStorageParser)
 
@@ -54,9 +55,10 @@
 TOKEN : { <COMPRESSOR : "lzo" | "gz"> }
 TOKEN : { <SERIALIZER : ("pig" | "avro")> }
 
-TOKEN : { <ORDER	 : "order by"> }
 TOKEN : { <COMPRESS	 : "compress by"> }
 TOKEN : { <SERIALIZE : "serialize by"> }
+TOKEN : { <ASC : "ASC"> }
+TOKEN : { <DESC: "DESC"> }
 TOKEN : { <SECURE 	 : "secure by"> }
 TOKEN : { <USER		 : "user"> }
 TOKEN : { <GROUP	 : "group"> }
@@ -64,8 +66,6 @@
 TOKEN : { <AS		 : "as"> }
 
 
-
-
 TOKEN:
 {
  	<#LETTER : ["a"-"z", "A"-"Z"] >
@@ -77,62 +77,17 @@
 |   <SHORT	:	(<OCTAL>){3}	>
 }
 
-ArrayList<CGSchema> StorageSchema() throws ParseException :
+void StorageSchema(ArrayList<CGSchema> s) throws ParseException :
 {
-	ArrayList<CGSchema> s = new ArrayList();
 	CGSchema fs;
 	CGSchema defaultSchema;
 }
 {
 	try {
-		LOOKAHEAD(2) <EOF>
-		{
-			defaultSchema = partition.generateDefaultCGSchema(mName, mCompressor, mSerializer, mOwner, mGroup, mPerm, 0);
-			if (defaultSchema != null)
-				s.add(defaultSchema);
-
-      // check column group names, add system created names when necessary;
-      HashSet<String> cgNames = new HashSet<String>();
-      ArrayList<CGSchema> unnamed = new ArrayList<CGSchema>(); 	
-      for (int i = 0; i < s.size(); i++) { 
-        CGSchema cgSchema = s.get(i);
-        String str = cgSchema.getName();
-        if (str != null) {
-          if (!cgNames.add(str)) {
-            throw new ParseException("Duplicate column group names.");
-          }
-        } else {
-          unnamed.add(cgSchema);
-        }
-      }
-      
-      int digits = 1;
-      int total = unnamed.size();
-      while (total >= 10) {
-        ++digits;
-        total /= 10;
-      }
-      String formatString = "%0" + digits + "d";
-      
-      int idx = 0;
-      for (int i = 0; i < unnamed.size(); i++) { 
-        CGSchema cgSchema = unnamed.get(i);
-        String str = null;
-        while (true) {
-          str = "CG" + String.format(formatString, idx++);
-          if (!cgNames.contains(str)) {
-            break;
-          }
-        }
-        cgSchema.setName(str);
-      }
-			return s;
-		}
-	|
 		fs = FieldSchema() {mCGCount++; if (fs != null) s.add(fs);}
 		(";" fs = FieldSchema() {mCGCount++; if (fs != null) s.add(fs);})* <EOF>
 		{
-			defaultSchema = partition.generateDefaultCGSchema(mName, mCompressor, mSerializer, mOwner, mGroup, mPerm, mDefaultCGIndex == -1 ? mDefaultCGIndex = mCGCount++ : mDefaultCGIndex);
+			defaultSchema = partition.generateDefaultCGSchema(mName, mCompressor, mSerializer, mOwner, mGroup, mPerm, mDefaultCGIndex == -1 ? mDefaultCGIndex = mCGCount++ : mDefaultCGIndex, comparator);
 			if (defaultSchema != null)
 				s.add(defaultSchema);
 
@@ -171,13 +126,29 @@
         }
         cgSchema.setName(str);
       }
-			return s;
+			return;
 		}
 	} catch (TokenMgrError e) {
 		throw new ParseException(e.getMessage());
 	}
 }
 
+boolean ascdsc() throws ParseException :
+{
+  Token t1 = null, t2 = null;
+}
+{
+  (
+		t1 = <ASC>
+  | t2 = <DESC>
+  )
+  {
+    if (t2 != null)
+      return false;
+    return true;
+  }
+}
+
 CGSchema FieldSchema() throws ParseException:
 {
 	Token t1 = null, t2 = null; 
@@ -310,8 +281,9 @@
 			mOwner		= owner;
 			mGroup		= group;
 			mPerm 		= perm; 
-		} else
-			cs = new CGSchema(fs, false, name, serializer, compressor, owner, group, perm);
+		} else {
+			cs = new CGSchema(fs, false, comparator, name, serializer, compressor, owner, group, perm);
+    }
 		return cs;
 	}
 }

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java Thu Nov  5 21:02:57 2009
@@ -212,8 +212,8 @@
    * @throws IOException
    * 
    */
-  public static void formatTuple(Tuple tuple, String projection) throws IOException {
-    Tuple one = createTuple(Projection.getNumColumns(projection));
+  public static void formatTuple(Tuple tuple, int ncols) throws IOException {
+    Tuple one = createTuple(ncols);
     tuple.reference(one);
     return;
     /*

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java Thu Nov  5 21:02:57 2009
@@ -81,14 +81,13 @@
     return String.format("%s%09d", prefix, random.nextInt(max));
   }
 
-  static int createBasicTable(int parts, int rows, String strSchema, String storage,
-      Path path, boolean properClose, boolean sorted) throws IOException {
+  static int createBasicTable(int parts, int rows, String strSchema, String storage, String sortColumns,
+      Path path, boolean properClose) throws IOException {
     if (fs.exists(path)) {
       BasicTable.drop(path, conf);
     }
 
-    BasicTable.Writer writer = new BasicTable.Writer(path, strSchema, storage,
-        sorted, conf);
+    BasicTable.Writer writer = new BasicTable.Writer(path, strSchema, storage, sortColumns, null, conf);
     writer.finish();
 
     int total = 0;
@@ -96,6 +95,7 @@
     String colNames[] = schema.getColumns();
     Tuple tuple = TypesUtils.createTuple(schema);
 
+    boolean sorted = writer.isSorted();
     for (int i = 0; i < parts; ++i) {
       writer = new BasicTable.Writer(path, conf);
       TableInserter inserter = writer.getInserter(
@@ -230,10 +230,10 @@
   }
 
   static void doReadWrite(Path path, int parts, int rows, String schema,
-      String storage, String projection, boolean properClose, boolean sorted)
+      String storage, String sortColumns, String projection, boolean properClose, boolean sorted)
       throws IOException, ParseException {
-    int totalRows = createBasicTable(parts, rows, schema, storage, path,
-        properClose, sorted);
+    int totalRows = createBasicTable(parts, rows, schema, storage, sortColumns, path,
+        properClose);
     if (rows == 0) {
       Assert.assertEquals(rows, 0);
     }
@@ -248,17 +248,17 @@
 
   public void testMultiCGs() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestBasicTableMultiCGs");
-    doReadWrite(path, 2, 100, "SF_a,SF_b,SF_c,SF_d,SF_e", "[SF_a,SF_b,SF_c];[SF_d,SF_e]", "SF_f,SF_a,SF_c,SF_d", true, false);
+    doReadWrite(path, 2, 100, "SF_a,SF_b,SF_c,SF_d,SF_e", "[SF_a,SF_b,SF_c];[SF_d,SF_e]", null, "SF_f,SF_a,SF_c,SF_d", true, false);
   }
 
   public void testCornerCases() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestBasicTableCornerCases");
-    doReadWrite(path, 0, 0, "a, b, c", "", "a, d, c, f", false, false);
-    doReadWrite(path, 0, 0, "a, b, c", "", "a, d, c, f", true, false);
-    doReadWrite(path, 0, 0, "a, b, c", "", "a, d, c, f", true, true);
-    doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", false, false);
-    doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", true, false);
-    doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", true, true);
+    doReadWrite(path, 0, 0, "a, b, c", "", null, "a, d, c, f", false, false);
+    doReadWrite(path, 0, 0, "a, b, c", "", null, "a, d, c, f", true, false);
+    doReadWrite(path, 0, 0, "a, b, c", "", "a", "a, d, c, f", true, true);
+    doReadWrite(path, 2, 0, "a, b, c", "", null, "a, d, c, f", false, false);
+    doReadWrite(path, 2, 0, "a, b, c", "", null, "a, d, c, f", true, false);
+    doReadWrite(path, 2, 0, "a, b, c", "", "a", "a, d, c, f", true, true);
   }
 
   static int doReadOnly(TableScanner scanner) throws IOException, ParseException {
@@ -289,7 +289,7 @@
   @Test
   public void testNullSplits() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestBasicTableNullSplits");
-    int totalRows = createBasicTable(2, 250, "a, b, c", "", path, true, true);
+    int totalRows = createBasicTable(2, 250, "a, b, c", "", "a", path, true);
     BasicTable.Reader reader = new BasicTable.Reader(path, conf);
     reader.setProjection("a,d,c,f");
     Assert.assertEquals(totalRows, doReadOnly(reader.getScanner(null, false)));
@@ -301,14 +301,14 @@
   @Test
   public void testNegativeSplits() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestNegativeSplits");
-    int totalRows = createBasicTable(2, 250, "a, b, c", "", path, true, true);
+    int totalRows = createBasicTable(2, 250, "a, b, c", "", "", path, true);
     rangeSplitBasicTable(-1, totalRows, "a,d,c,f", path);
   }
 
   @Test
   public void testMetaBlocks() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestBasicTableMetaBlocks");
-    createBasicTable(3, 100, "a, b, c", "", path, false, false);
+    createBasicTable(3, 100, "a, b, c", "", null, path, false);
     BasicTable.Writer writer = new BasicTable.Writer(path, conf);
     BytesWritable meta1 = makeKey(1234);
     BytesWritable meta2 = makeKey(9876);
@@ -350,8 +350,8 @@
   @Test
   public void testNormalCases() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestBasicTableNormal");
-    doReadWrite(path, 2, 250, "a, b, c", "", "a, d, c, f", false, false);
-    doReadWrite(path, 2, 250, "a, b, c", "", "a, d, c, f", true, false);
-    doReadWrite(path, 2, 250, "a, b, c", "", "a, d, c, f", true, true);
+    doReadWrite(path, 2, 250, "a, b, c", "", null, "a, d, c, f", false, false);
+    doReadWrite(path, 2, 250, "a, b, c", "", null, "a, d, c, f", true, false);
+    doReadWrite(path, 2, 250, "a, b, c", "", "a", "a, d, c, f", true, true);
   }
 }

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java Thu Nov  5 21:02:57 2009
@@ -72,7 +72,7 @@
     BasicTable.drop(path, conf);
 
     BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
-        STR_STORAGE, false, conf);
+        STR_STORAGE, conf);
     writer.finish();
 
     Schema schema = writer.getSchema();