You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/11/05 22:03:00 UTC

svn commit: r833166 [1/5] - in /hadoop/pig/trunk/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/ src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/ src/java/org/apache/hadoop/zebra/pig/ src/java/org/apache/hadoop/zebra/pi...

Author: gates
Date: Thu Nov  5 21:02:57 2009
New Revision: 833166

URL: http://svn.apache.org/viewvc?rev=833166&view=rev
Log:
PIG-997 Sorted Table Support by Zebra.

Added:
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BagExpr.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BooleanExpr.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ByteExpr.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BytesExpr.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ComparatorExpr.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/DoubleExpr.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ExprUtils.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FixedLengthPrimitive.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/FloatExpr.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/IntExpr.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/KeyGenerator.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafExpr.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSortedBasicTableSplits.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSampleSortedTable.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSortedTableZebraKeyGenerator.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoin.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinNegative.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinPartial.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionMergeJoin.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoin.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinAfterFilter.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinFloat.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinInteger.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinMultipleColsSort.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableSortStorer.java
Removed:
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/FieldType.java
Modified:
    hadoop/pig/trunk/contrib/zebra/CHANGES.txt
    hadoop/pig/trunk/contrib/zebra/build-contrib.xml
    hadoop/pig/trunk/contrib/zebra/build.xml
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName1.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName2.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName3.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName4.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName5.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName6.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupSplits.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestMap.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestMapOfRecord.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestMixedType1.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestNegative.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestNonDefaultWholeMapSplit.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord2Map.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord3Map.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecordMap.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSchema.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSimple.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestWrite.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollection.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableStorer.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableLoader.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableStorer.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapType.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableStorer.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestUnionMixedTypes.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestColumnGroupName.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestColumnSecurity.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageCollection.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageMap.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageMisc1.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageMisc2.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageMisc3.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageRecord.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorePrimitive.java

Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Thu Nov  5 21:02:57 2009
@@ -5,6 +5,8 @@
   INCOMPATIBLE CHANGES
 
   IMPROVEMENTS
+
+    PIG-997 Sorted Table Support by Zebra (yanz via gates)
   	
 	PIG-996 Add findbugs, checkstyle, and clover to zebra build file (chaow via
 	        gates)

Modified: hadoop/pig/trunk/contrib/zebra/build-contrib.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/build-contrib.xml?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/build-contrib.xml (original)
+++ hadoop/pig/trunk/contrib/zebra/build-contrib.xml Thu Nov  5 21:02:57 2009
@@ -44,7 +44,7 @@
   <property name="conf.dir" location="${pig.root}/conf"/>
   <property name="test.junit.output.format" value="plain"/>
   <property name="test.output" value="no"/>
-  <property name="test.timeout" value="900000"/>
+  <property name="test.timeout" value="9000000"/>
   <property name="build.dir" location="${pig.root}/build/contrib/${name}"/>
   <property name="build.javadoc"
 	  location="${pig.root}/build/contrib/${name}/docs"/>
@@ -53,6 +53,7 @@
   <property name="build.examples" location="${build.dir}/examples"/>
   <property name="pig.log.dir" location="${build.dir}/test/logs"/>
   <property name="hadoop.jarfile" value="hadoop20.jar" />
+  <property name="pig.jarfile" value="pig-0.6.0-dev-core.jar" />
   <property name="hbase.jarfile" value="hbase-0.18.1.jar" />
   <property name="hbase.test.jarfile" value="hbase-0.18.1-test.jar" />
 
@@ -103,6 +104,9 @@
       <include name="${hbase.jarfile}" />
       <include name="${hbase.test.jarfile}" />
     </fileset>
+    <fileset dir="${pig.root}/build">
+      <include name="${pig.jarfile}" />
+    </fileset>
     <fileset dir="${pig.root}/build/ivy/lib">
        <include name="**/*.jar"/>
     </fileset>

Modified: hadoop/pig/trunk/contrib/zebra/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/build.xml?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/build.xml (original)
+++ hadoop/pig/trunk/contrib/zebra/build.xml Thu Nov  5 21:02:57 2009
@@ -170,9 +170,16 @@
       <classpath refid="test.classpath"/>
       <formatter type="${test.junit.output.format}" />
 
+      <!-- For the time being, we disable two test cases that need a real cluster to run -->
       <batchtest todir="${build.test}">
-        <fileset dir="${src.test}"
-                 includes="**/Test*.java" excludes="**/${test.exclude}.java" />
+        <fileset dir="${src.test}" includes="**/Test*.java" excludes="**/TestCheckin*.java">
+          <not>
+            <filename name="**/TestRealCluster.java"/>
+          </not>
+          <not>
+            <filename name="**/TestColumnSecurity.java"/>
+          </not>
+        </fileset> 
       </batchtest>
     </junit>
     <fail if="tests.failed">Tests failed!</fail>

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Thu Nov  5 21:02:57 2009
@@ -56,6 +56,7 @@
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.parser.TableSchemaParser;
 import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.SortInfo;
 import org.apache.pig.data.Tuple;
 
 /**
@@ -82,13 +83,11 @@
   private final static String BT_SCHEMA_FILE = ".btschema";
   // schema version
   private final static Version SCHEMA_VERSION =
-      new Version((short) 1, (short) 0);
+      new Version((short) 1, (short) 1);
   // name of the BasicTable meta-data file
   private final static String BT_META_FILE = ".btmeta";
   // column group prefix
   private final static String CGPathPrefix = "CG";
-  // default comparator to "memcmp"
-  private final static String DEFAULT_COMPARATOR = TFile.COMPARATOR_MEMCMP;
 
   private final static String DELETED_CG_PREFIX = ".deleted-";
   
@@ -288,7 +287,8 @@
         schema = schemaFile.getLogical();
         projection = new Projection(schema);
         String storage = schemaFile.getStorageString();
-        partition = new Partition(schema, projection, storage);
+        String comparator = schemaFile.getComparator();
+        partition = new Partition(schema, projection, storage, comparator);
         for (int nx = 0; nx < numCGs; nx++) {
           if (!schemaFile.isCGDeleted(nx)) {
             colGroups[nx] =
@@ -349,6 +349,14 @@
     }
 
     /**
+     * @return the list of sorted columns
+     */
+    public SortInfo getSortInfo()
+    {
+      return schemaFile.getSortInfo();
+    }
+
+    /**
      * Set the projection for the reader. This will affect calls to
      * {@link #getScanner(RangeSplit, boolean)},
      * {@link #getScanner(BytesWritable, BytesWritable, boolean)},
@@ -368,7 +376,7 @@
         this.projection = new Projection(schemaFile.getLogical());
         partition =
             new Partition(schemaFile.getLogical(), this.projection, schemaFile
-                .getStorageString());
+                .getStorageString(), schemaFile.getComparator());
       }
       else {
         /**
@@ -379,7 +387,7 @@
             new Projection(schemaFile.getLogical(), projection);
         partition =
             new Partition(schemaFile.getLogical(), this.projection, schemaFile
-                .getStorageString());
+                .getStorageString(), schemaFile.getComparator());
       }
       inferredMapping = false;
     }
@@ -432,7 +440,7 @@
            kd.add(colGroups[nx].getKeyDistribution(n));
         }
       }
-      if (kd.size() > (int) (n * 1.5)) {
+      if (n >= 0 && kd.size() > (int) (n * 1.5)) {
         kd.resize(n);
       }
       return kd;
@@ -1031,6 +1039,8 @@
     private boolean closed = true;
     ColumnGroup.Writer[] colGroups;
     Partition partition;
+    boolean sorted;
+    private boolean finished;
     Tuple[] cgTuples;
 
     /**
@@ -1056,35 +1066,34 @@
      *          implementation, the schema of a table is a comma or
      *          semicolon-separated list of column names, such as
      *          "FirstName, LastName; Sex, Department".
-     * @param sorted
-     *          Whether the table to be created is sorted or not. If set to
-     *          true, we expect the rows inserted by every inserter created from
-     *          this Writer must be sorted. Additionally, there exists an
-     *          ordering of the inserters Ins-1, Ins-2, ... such that the rows
-     *          created by Ins-1, followed by rows created by Ins-2, ... form a
-     *          total order.
+     * @param sortColumns
+     *          String of comma-separated sorted columns: null for unsorted tables
+     * @param comparator
+     *          Name of the comparator used in sorted tables
      * @param conf
      *          Optional Configuration objects.
      * 
      * @throws IOException
      * @see Schema
      */
-    public Writer(Path path, String btSchemaString, String btStorageString,
-        boolean sorted, Configuration conf) throws IOException {
+    public Writer(Path path, String btSchemaString, String btStorageString, String sortColumns,
+        String comparator, Configuration conf) throws IOException {
       try {
         schemaFile =
-            new SchemaFile(path, btSchemaString, btStorageString,
-                DEFAULT_COMPARATOR, sorted, conf);
+            new SchemaFile(path, btSchemaString, btStorageString, sortColumns,
+                comparator, conf);
         partition = schemaFile.getPartition();
         int numCGs = schemaFile.getNumOfPhysicalSchemas();
         colGroups = new ColumnGroup.Writer[numCGs];
         cgTuples = new Tuple[numCGs];
+        sorted = schemaFile.isSorted();
         for (int nx = 0; nx < numCGs; nx++) {
           colGroups[nx] =
               new ColumnGroup.Writer( 
                  new Path(path, schemaFile.getName(nx)),
             		 schemaFile.getPhysicalSchema(nx), 
             		 sorted, 
+                 comparator,
             		 schemaFile.getName(nx),
             		 schemaFile.getSerializer(nx), 
             		 schemaFile.getCompressor(nx), 
@@ -1130,7 +1139,16 @@
     }
 
     /**
-     * Reopen an already created BasicTable for writing. Excepiton will be
+     * a wrapper to support backward compatible constructor
+     */
+    public Writer(Path path, String btSchemaString, String btStorageString,
+        Configuration conf) throws IOException {
+      this(path, btSchemaString, btStorageString, null, null, conf);
+    }
+
+    /**
+    /**
+     * Reopen an already created BasicTable for writing. Exception will be
      * thrown if the table is already closed, or is in the process of being
      * closed.
      */
@@ -1139,6 +1157,7 @@
         schemaFile = new SchemaFile(path, conf);
         int numCGs = schemaFile.getNumOfPhysicalSchemas();
         partition = schemaFile.getPartition();
+        sorted = schemaFile.isSorted();
         colGroups = new ColumnGroup.Writer[numCGs];
         cgTuples = new Tuple[numCGs];
         for (int nx = 0; nx < numCGs; nx++) {
@@ -1184,8 +1203,8 @@
      * make the table immutable.
      */
     public void finish() throws IOException {
-      if (closed) return;
-      closed = true;
+      if (finished) return;
+      finished = true;
       try {
         for (int nx = 0; nx < colGroups.length; nx++) {
           if (colGroups[nx] != null) {
@@ -1220,6 +1239,8 @@
     public void close() throws IOException {
       if (closed) return;
       closed = true;
+      if (!finished)
+        finish();
       try {
         for (int nx = 0; nx < colGroups.length; nx++) {
           if (colGroups[nx] != null) {
@@ -1254,6 +1275,22 @@
     public Schema getSchema() {
       return schemaFile.getLogical();
     }
+    
+    /**
+     * @return sortness
+     */
+    public boolean isSorted() {
+    	return sorted;
+    }
+
+    /**
+     * Get the list of sorted columns.
+     * @return the list of sorted columns
+     */
+    public SortInfo getSortInfo()
+    {
+      return schemaFile.getSortInfo();
+    }
 
     /**
      * Get a inserter with a given name.
@@ -1386,7 +1423,7 @@
           }
           if (finishWriter) {
             try {
-              BasicTable.Writer.this.close();
+              BasicTable.Writer.this.finish();
             }
             catch (Exception e) {
               // no-op
@@ -1418,6 +1455,7 @@
     Schema[] physical;
     Partition partition;
     boolean sorted;
+    SortInfo sortInfo = null;
     String storage;
     CGSchema[] cgschemas;
     
@@ -1436,17 +1474,21 @@
     }
 
     // ctor for writing
-    public SchemaFile(Path path, String btSchemaStr, String btStorageStr,
-        String btComparator, boolean sorted, Configuration conf)
+    public SchemaFile(Path path, String btSchemaStr, String btStorageStr, String sortColumns,
+        String btComparator, Configuration conf)
         throws IOException {
       storage = btStorageStr;
-      this.comparator = btComparator;
       try {
-        partition = new Partition(btSchemaStr, btStorageStr);
+        partition = new Partition(btSchemaStr, btStorageStr, btComparator, sortColumns);
       }
       catch (Exception e) {
         throw new IOException("Partition constructor failed :" + e.getMessage());
       }
+      this.sortInfo = partition.getSortInfo();
+      this.sorted = partition.isSorted();
+      this.comparator = (this.sortInfo == null ? null : this.sortInfo.getComparator());
+      if (this.comparator == null)
+        this.comparator = "";
       logical = partition.getSchema();
       cgschemas = partition.getCGSchemas();
       physical = new Schema[cgschemas.length];
@@ -1454,7 +1496,7 @@
         physical[nx] = cgschemas[nx].getSchema();
       }
       cgDeletedFlags = new boolean[physical.length];
-      this.sorted = sorted;
+
       version = SCHEMA_VERSION;
 
       // write out the schema
@@ -1473,6 +1515,10 @@
       return sorted;
     }
 
+    public SortInfo getSortInfo() {
+      return sortInfo;
+    }
+
     public Schema getLogical() {
       return logical;
     }
@@ -1555,6 +1601,15 @@
         WritableUtils.writeString(outSchema, physical[nx].toString());
       }
       WritableUtils.writeVInt(outSchema, sorted ? 1 : 0);
+      WritableUtils.writeVInt(outSchema, sortInfo == null ? 0 : sortInfo.size());
+      if (sortInfo != null && sortInfo.size() > 0)
+      {
+        String[] sortedCols = sortInfo.getSortColumnNames();
+        for (int i = 0; i < sortInfo.size(); i++)
+        {
+          WritableUtils.writeString(outSchema, sortedCols[i]);
+        }
+      }
       outSchema.close();
     }
 
@@ -1583,7 +1638,7 @@
       }
       storage = WritableUtils.readString(in);
       try {
-        partition = new Partition(logicalStr, storage);
+        partition = new Partition(logicalStr, storage, comparator);
       }
       catch (Exception e) {
         throw new IOException("Partition constructor failed :" + e.getMessage());
@@ -1606,6 +1661,19 @@
       }
       sorted = WritableUtils.readVInt(in) == 1 ? true : false;
       setCGDeletedFlags(path, conf);
+      if (version.compareTo(new Version((short)1, (short)0)) > 0)
+      {
+        int numSortColumns = WritableUtils.readVInt(in);
+        if (numSortColumns > 0)
+        {
+          String[] sortColumnStr = new String[numSortColumns];
+          for (int i = 0; i < numSortColumns; i++)
+          {
+            sortColumnStr[i] = WritableUtils.readString(in);
+          }
+          sortInfo = SortInfo.parse(SortInfo.toSortString(sortColumnStr), logical, comparator);
+        }
+      }
       in.close();
     }
 
@@ -1677,10 +1745,25 @@
     Path path = new Path(file);
     try {
       BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+      String schemaStr = reader.getBTSchemaString();
+      String storageStr = reader.getStorageString();
       IOutils.indent(out, indent);
-      out.printf("Schema : %s\n", reader.getBTSchemaString());
+      out.printf("Schema : %s\n", schemaStr);
       IOutils.indent(out, indent);
-      out.printf("Storage Information : %s\n", reader.getStorageString());
+      out.printf("Storage Information : %s\n", storageStr);
+      SortInfo sortInfo = reader.getSortInfo();
+      if (sortInfo != null && sortInfo.size() > 0)
+      {
+        IOutils.indent(out, indent);
+        String[] sortedCols = sortInfo.getSortColumnNames();
+        out.println("Sorted Columns :");
+        for (int nx = 0; nx < sortedCols.length; nx++) {
+          if (nx > 0)
+            out.printf(" , ");
+          out.printf("%s", sortedCols[nx]);
+        }
+        out.printf("\n");
+      }
       IOutils.indent(out, indent);
       out.println("Column Groups within the Basic Table :");
       for (int nx = 0; nx < reader.colGroups.length; nx++) {

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Thu Nov  5 21:02:57 2009
@@ -1051,7 +1051,7 @@
     Configuration conf;
     FileSystem fs;
     CGSchema cgschema;
-    private boolean finished;
+    private boolean finished, closed;
 
     /**
      * Create a ColumnGroup writer. The semantics are as follows:
@@ -1095,11 +1095,25 @@
     public Writer(Path path, String schema, boolean sorted, String name, String serializer,
         String compressor, String owner, String group, short perm,boolean overwrite, Configuration conf)
         throws IOException, ParseException {
-      this(path, new Schema(schema), sorted, name, serializer, compressor, owner, group, perm, overwrite,
+      this(path, new Schema(schema), sorted, null, name, serializer, compressor, owner, group, perm, overwrite,
           conf);
     }
     
     public Writer(Path path, Schema schema, boolean sorted, String name, String serializer,
+        String compressor, String owner, String group, short perm,boolean overwrite, Configuration conf)
+        throws IOException, ParseException {
+      this(path, schema, sorted, null, name, serializer, compressor, owner, group, perm, overwrite,
+          conf);
+    }
+
+    public Writer(Path path, String schema, boolean sorted, String comparator, String name, String serializer,
+        String compressor, String owner, String group, short perm,boolean overwrite, Configuration conf)
+        throws IOException, ParseException {
+      this(path, new Schema(schema), sorted, comparator, name, serializer, compressor, owner, group, perm, overwrite,
+          conf);
+    }
+
+    public Writer(Path path, Schema schema, boolean sorted, String comparator, String name, String serializer,
         String compressor, String owner, String group, short perm, boolean overwrite, Configuration conf)
         throws IOException, ParseException {
       this.path = path;
@@ -1118,7 +1132,7 @@
 
       checkPath(path, true);
 
-      cgschema = new CGSchema(schema, sorted, name, serializer, compressor, owner, group, perm);
+      cgschema = new CGSchema(schema, sorted, comparator, name, serializer, compressor, owner, group, perm);
       CGSchema sfNew = CGSchema.load(fs, path);
       if (sfNew != null) {
         // compare input with on-disk schema.
@@ -1162,7 +1176,10 @@
     @Override
     public void close() throws IOException {
       if (!finished) {
-        finished = true;
+        finish();
+      }
+      if (!closed) {
+        closed = true;
         createIndex();
       }
     }

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java Thu Nov  5 21:02:57 2009
@@ -21,7 +21,18 @@
 import java.io.DataOutputStream;
 import java.io.PrintStream;
 
+/**
+ * Helper for Zebra I/O
+ */
 public class IOutils {
+  /**
+   * indent of some spaces
+   *
+   * @param os
+   *          print stream the indent space to be inserted
+   * @param amount
+   *          the number of spaces to be indented
+   */
   public static void indent(PrintStream os, int amount)
   {
     for (int i = 0; i < amount; i++)

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java Thu Nov  5 21:02:57 2009
@@ -24,6 +24,7 @@
 
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.io.file.tfile.ByteArray;
 
 /**
  * Class used to convey the information of how on-disk data are distributed
@@ -149,7 +150,8 @@
    * Get the block distribution of all data that maps to the key bucket.
    */
   public BlockDistribution getBlockDistribution(BytesWritable key) {
-    BlockDistribution bInfo = data.get(key);
+    ByteArray key0 = new ByteArray(key.get(), 0, key.getSize());
+    BlockDistribution bInfo = data.get(key0);
     if (bInfo == null) {
       throw new IllegalArgumentException("Invalid key");
     }

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java Thu Nov  5 21:02:57 2009
@@ -93,7 +93,6 @@
   
   /**
    * Get the projection's schema
-   * @return
    */
   public Schema getSchema();
 }

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java Thu Nov  5 21:02:57 2009
@@ -16,6 +16,6 @@
  */
 
 /**
- * Physical I/O management of Hadoop Tables.
+ * Physical I/O management of Hadoop Zebra Tables.
  */
 package org.apache.hadoop.zebra.io;

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java Thu Nov  5 21:02:57 2009
@@ -30,8 +30,11 @@
 import org.apache.hadoop.zebra.io.TableInserter;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.types.Partition;
+import org.apache.hadoop.zebra.types.SortInfo;
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.pig.data.Tuple;
+import org.apache.hadoop.zebra.pig.comparator.*;
+
 
 /**
  * {@link org.apache.hadoop.mapred.OutputFormat} class for creating a
@@ -113,7 +116,10 @@
   private static final String OUTPUT_SCHEMA = "mapred.lib.table.output.schema";
   private static final String OUTPUT_STORAGEHINT =
       "mapred.lib.table.output.storagehint";
-  private static final String OUTPUT_SORTED = "mapred.lib.table.output.sorted";
+  private static final String OUTPUT_SORTCOLUMNS =
+      "mapred.lib.table.output.sortcolumns";
+  private static final String OUTPUT_COMPARATOR =
+      "mapred.lib.table.output.comparator";
 
   /**
    * Set the output path of the BasicTable in JobConf
@@ -174,6 +180,58 @@
     return new Schema(schema);
   }
 
+  private static KeyGenerator makeKeyBuilder(byte[] elems) {
+	    ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
+	    for (int i = 0; i < elems.length; ++i) {
+	      exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
+	    }
+	    return new KeyGenerator(ExprUtils.tupleComparator(exprs));
+  }
+
+  /**
+   * Generates a zebra specific sort key generator which is used to generate BytesWritable key 
+   * Sort Key(s) are used to generate this object
+   * 
+   * @param conf
+   *          The JobConf object.
+   * @return Object of type zebra.pig.comaprator.KeyGenerator. 
+   *         
+   */
+  public static Object getSortKeyGenerator(JobConf conf) throws IOException, ParseException {
+
+    SortInfo sortInfo = getSortInfo(conf);
+    Schema schema     = getSchema(conf);
+    String[] sortColNames = sortInfo.getSortColumnNames();
+
+    byte[] types = new byte[sortColNames.length];
+    for(int i =0 ; i < sortColNames.length; ++i){
+      types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
+    }
+    KeyGenerator builder = makeKeyBuilder(types);
+    return builder;
+
+  }
+
+
+  /**
+   * Generates a BytesWritable key for the input key
+   * using keygenerate provided. Sort Key(s) are used to generate this object
+   *
+   * @param builder
+   *         Opaque key generator created by getSortKeyGenerator() method
+   * @param t
+   *         Tuple to create sort key from
+   * @return ByteWritable Key 
+   *
+   */
+  public static BytesWritable getSortKey(Object builder, Tuple t) throws Exception {
+	  KeyGenerator kg = (KeyGenerator) builder;
+	  return kg.generateKey(t);
+  }
+
+
+
+
   /**
    * Set the table storage hint in JobConf, should be called after setSchema is
    * called.
@@ -194,7 +252,7 @@
       throw new ParseException("Schema has not been set");
 
     // for sanity check purpose only
-    Partition partition = new Partition(schema, storehint);
+    Partition partition = new Partition(schema, storehint, null);
 
     conf.set(OUTPUT_STORAGEHINT, storehint);
   }
@@ -214,34 +272,84 @@
   }
 
   /**
-   * Set sorted-ness of the table. It is disabled now (by making it package
-   * private). So only unsorted BasicTables may be created for now.
-   * 
-   * TODO: must also allow users to specify customized comparator.
+   * Set the sort info
+   *
+   * @param conf
+   *          The JobConf object.
+   *          
+   * @param sortColumns
+   *          Comma-separated sort column names
+   *          
+   * @param comparator
+   *          comparator class name; null for default
+   *
+   */
+  public static void setSortInfo(JobConf conf, String sortColumns, String comparator) {
+    conf.set(OUTPUT_SORTCOLUMNS, sortColumns);
+    conf.set(OUTPUT_COMPARATOR, comparator);
+  }
+
+  /**
+   * Set the sort info
+   *
+   * @param conf
+   *          The JobConf object.
+   *          
+   * @param sortColumns
+   *          Comma-separated sort column names
    */
-  public static void setSorted(JobConf conf, boolean sorted) {
-    conf.setBoolean(OUTPUT_SORTED, sorted);
+  public static void setSortInfo(JobConf conf, String sortColumns) {
+	  conf.set(OUTPUT_SORTCOLUMNS, sortColumns);
+  }
+  
+  /**
+   * Get the SortInfo object 
+   *
+   * @param conf
+   *          The JobConf object.
+   * @return SortInfo object; null if the Zebra table is unsorted 
+   *
+   */
+  public static SortInfo getSortInfo(JobConf conf)throws IOException
+  {
+    String sortColumns = conf.get(OUTPUT_SORTCOLUMNS);
+    if (sortColumns == null)
+    	return null;
+    Schema schema = null;
+    try {
+      schema = getSchema(conf);
+    } catch (ParseException e) {
+    	throw new IOException("Schema parsing failure : "+e.getMessage());
+    }
+    if (schema == null)
+    	throw new IOException("Schema not defined");
+    String comparator = getComparator(conf);
+    return SortInfo.parse(sortColumns, schema, comparator);
   }
 
   /**
-   * Is the table to be created should be sorted? It is disabled now (by making
-   * it package private).
+   * Get the  comparator for sort columns
+   *
+   * @param conf
+   *          The JobConf object.
+   * @return  comparator String
+   *
    */
-  static boolean getSorted(JobConf conf) {
-    return conf.getBoolean(OUTPUT_SORTED, false);
+  private static String getComparator(JobConf conf)
+  {
+    return conf.get(OUTPUT_COMPARATOR);
   }
 
   /**
    * Get the output table as specified in JobConf. It is useful for applications
    * to add more meta data after all rows have been added to the table.
-   * Currently it is disabled (by setting it to package private).
    * 
    * @param conf
    *          The JobConf object.
    * @return The output BasicTable.Writer object.
    * @throws IOException
    */
-  public static BasicTable.Writer getOutput(JobConf conf) throws IOException {
+  private static BasicTable.Writer getOutput(JobConf conf) throws IOException {
     String path = conf.get(OUTPUT_PATH);
     if (path == null) {
       throw new IllegalArgumentException("Cannot find output path");
@@ -268,16 +376,17 @@
     if (schema == null) {
       throw new IllegalArgumentException("Cannot find output schema");
     }
-    String storehint;
+    String storehint, sortColumns, comparator;
     try {
       storehint = getStorageHint(conf);
+      sortColumns = (getSortInfo(conf) == null ? null : SortInfo.toSortString(getSortInfo(conf).getSortColumnNames()));
+      comparator = getComparator(conf);
     }
     catch (ParseException e) {
       throw new IOException(e);
     }
     BasicTable.Writer writer =
-        new BasicTable.Writer(new Path(path), schema, storehint,
-            getSorted(conf), conf); // will
+        new BasicTable.Writer(new Path(path), schema, storehint, sortColumns, comparator, conf);
 
     writer.finish();
   }
@@ -299,14 +408,13 @@
 
   /**
    * Close the output BasicTable, No more rows can be added into the table. A
-   * BasicTable is not visible for reading until it is "closed". This call is
-   * required for sorted TFile, but not required for unsorted TFile.
+   * BasicTable is not visible for reading until it is "closed".
    * 
    * @param conf
    *          The JobConf object.
    * @throws IOException
    */
-  static void close(JobConf conf) throws IOException {
+  public static void close(JobConf conf) throws IOException {
     BasicTable.Writer table = getOutput(conf);
     table.close();
   }

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java Thu Nov  5 21:02:57 2009
@@ -34,6 +34,7 @@
  * Table Expression - expression to describe an input table.
  */
 abstract class TableExpr {
+  private boolean sorted = false;
   /**
    * Factory method to create a TableExpr from a string.
    * 
@@ -179,7 +180,14 @@
    * @return Whether this expression may only be split by key.
    */
   public boolean sortedSplitRequired() {
-    return false;
+    return sorted;
+  }
+
+  /**
+   * Set the requirement for sorted table
+   */
+  public void setSortedSplit() {
+    sorted = true;
   }
 
   /**

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Thu Nov  5 21:02:57 2009
@@ -24,7 +24,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.WritableUtils;
@@ -46,6 +46,8 @@
 import org.apache.hadoop.zebra.types.Projection;
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.SortInfo;
+import org.apache.hadoop.zebra.mapred.TableExpr.LeafTableInfo;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
@@ -133,6 +135,7 @@
 public class TableInputFormat implements InputFormat<BytesWritable, Tuple> {
   private static final String INPUT_EXPR = "mapred.lib.table.input.expr";
   private static final String INPUT_PROJ = "mapred.lib.table.input.projection";
+  private static final String INPUT_SORT = "mapred.lib.table.input.sort";
 
   /**
    * Set the paths to the input table.
@@ -158,6 +161,45 @@
       setInputExpr(conf, expr);
     }
   }
+  
+  //This method escapes commas in the glob pattern of the given paths. 
+  private static String[] getPathStrings(String commaSeparatedPaths) {
+    int length = commaSeparatedPaths.length();
+    int curlyOpen = 0;
+    int pathStart = 0;
+    boolean globPattern = false;
+    List<String> pathStrings = new ArrayList<String>();
+    
+    for (int i=0; i<length; i++) {
+      char ch = commaSeparatedPaths.charAt(i);
+      switch(ch) {
+        case '{' : {
+          curlyOpen++;
+          if (!globPattern) {
+            globPattern = true;
+          }
+          break;
+        }
+        case '}' : {
+          curlyOpen--;
+          if (curlyOpen == 0 && globPattern) {
+            globPattern = false;
+          }
+          break;
+        }
+        case ',' : {
+          if (!globPattern) {
+            pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
+            pathStart = i + 1 ;
+          }
+          break;
+        }
+      }
+    }
+    pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
+    
+    return pathStrings.toArray(new String[0]);
+  }
 
   /**
    * Set the input expression in the JobConf object.
@@ -181,6 +223,18 @@
     StringReader in = new StringReader(expr);
     return TableExpr.parse(in);
   }
+  
+  /**
+   * Get the schema of a table expr
+   * 
+   * @param conf
+   *          JobConf object.
+   */
+  public static Schema getSchema(JobConf conf) throws IOException
+  {
+	  TableExpr expr = getInputExpr(conf);
+	  return expr.getSchema(conf);
+  }
 
   /**
    * Set the input projection in the JobConf object.
@@ -220,15 +274,171 @@
   }
   
   /**
+   * Set requirement for sorted table
+   *
+   *@param conf
+   *          JobConf object.
+   */
+  private static void setSorted(JobConf conf) {
+    conf.setBoolean(INPUT_SORT, true);
+  }
+  
+  /**
+   * Get the SortInfo object regarding a Zebra table
+   *
+   * @param conf
+   *          JobConf object
+   * @return the zebra tables's SortInfo; null if the table is unsorted.
+   */
+  public static SortInfo getSortInfo(JobConf conf) throws IOException
+  {
+	  TableExpr expr = getInputExpr(conf);
+	  SortInfo result = null;
+	  int sortSize = 0;
+	  if (expr instanceof BasicTableExpr)
+    {
+      BasicTable.Reader reader = new BasicTable.Reader(((BasicTableExpr) expr).getPath(), conf);
+      SortInfo sortInfo = reader.getSortInfo();
+      reader.close();
+      result = sortInfo;
+	  } else {
+      List<LeafTableInfo> leaves = expr.getLeafTables(null);
+      for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext(); )
+      {
+        LeafTableInfo leaf = it.next();
+        BasicTable.Reader reader = new BasicTable.Reader(leaf.getPath(), conf);
+        SortInfo sortInfo = reader.getSortInfo();
+        reader.close();
+        if (sortSize == 0)
+        {
+          sortSize = sortInfo.size();
+          result = sortInfo;
+        } else if (sortSize != sortInfo.size()) {
+          throw new IOException("Tables of the table union do not possess the same sort property.");
+        }
+		  }
+	  }
+	  return result;
+  }
+
+  /**
+   * Requires sorted table or table union
+   * 
+   * @param conf
+   *          JobConf object.
+   * @param sortcolumns
+   *          Sort column names.
+   * @param comparator
+   *          Comparator name. Null means the caller does not care but table union will check
+   *          identical comparators regardless as a minimum sanity check
+   *        
+   */
+  public static void requireSortedTable(JobConf conf, String sortcolumns, String comparator) throws IOException {
+	 TableExpr expr = getInputExpr(conf);
+	 if (expr instanceof BasicTableExpr)
+	 {
+		 BasicTable.Reader reader = new BasicTable.Reader(((BasicTableExpr) expr).getPath(), conf);
+		 SortInfo sortInfo = reader.getSortInfo();
+		 reader.close();
+		 if (comparator == null && sortInfo != null)
+			 // cheat the equals method's comparator comparison
+			 comparator = sortInfo.getComparator();
+		 if (sortInfo == null || !sortInfo.equals(sortcolumns, comparator))
+		 {
+			 throw new IOException("The table is not (properly) sorted");
+		 }
+	 } else {
+		 List<LeafTableInfo> leaves = expr.getLeafTables(null);
+		 for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext(); )
+		 {
+			 LeafTableInfo leaf = it.next();
+			 BasicTable.Reader reader = new BasicTable.Reader(leaf.getPath(), conf);
+			 SortInfo sortInfo = reader.getSortInfo();
+			 reader.close();
+			 if (comparator == null && sortInfo != null)
+				 comparator = sortInfo.getComparator(); // use the first table's comparator as comparison base
+			 if (sortInfo == null || !sortInfo.equals(sortcolumns, comparator))
+			 {
+				 throw new IOException("The table is not (properly) sorted");
+			 }
+		 }
+		 // need key range input splits for sorted table union
+		 setSorted(conf);
+	 }
+  }
+  
+  /**
+   * Requires sorted table or table union. For table union, leading sort columns 
+   * of component tables need to be the same
+   * 
+   * @param conf
+   *          JobConf object.
+   *        
+   */
+  public static void requireSortedTable(JobConf conf) throws IOException {
+	 TableExpr expr = getInputExpr(conf);
+	 if (expr instanceof BasicTableExpr)
+	 {
+		 BasicTable.Reader reader = new BasicTable.Reader(((BasicTableExpr) expr).getPath(), conf);
+		 SortInfo sortInfo = reader.getSortInfo();
+		 reader.close();
+		 if (sortInfo == null)
+		 {
+			 throw new IOException("The table is not (properly) sorted");
+		 }
+	 } else {
+		 List<LeafTableInfo> leaves = expr.getLeafTables(null);
+		 String sortcolumns = null, comparator = null;
+		 for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext(); )
+		 {
+			 LeafTableInfo leaf = it.next();
+			 BasicTable.Reader reader = new BasicTable.Reader(leaf.getPath(), conf);
+			 SortInfo sortInfo = reader.getSortInfo();
+			 reader.close();
+			 if (sortInfo == null)
+			 {
+				 throw new IOException("The table is not (properly) sorted");
+			 }
+			 // check the compatible sort info among the member tables of a union
+			 if (sortcolumns == null)
+			 {
+				 sortcolumns = SortInfo.toSortString(sortInfo.getSortColumnNames());
+				 comparator = sortInfo.getComparator();
+			 } else {
+				 if (!sortInfo.equals(sortcolumns, comparator))
+				 {
+					 throw new IOException("The table is not (properly) sorted");
+				 }
+			 }
+		 }
+		 // need key range input splits for sorted table union
+		 setSorted(conf);
+	 }
+  }
+  /**
+   * Set requirement for sorted table
+   *
+   *@param conf
+   *          JobConf object.
+   */
+  private static boolean getSorted(JobConf conf) {
+    return conf.getBoolean(INPUT_SORT, false);
+  }
+
+  /**
    * @see InputFormat#getRecordReader(InputSplit, JobConf, Reporter)
    */
   @Override
   public RecordReader<BytesWritable, Tuple> getRecordReader(InputSplit split,
       JobConf conf, Reporter reporter) throws IOException {
-    TableExpr expr = getInputExpr(conf);
+  TableExpr expr = getInputExpr(conf);
     if (expr == null) {
       throw new IOException("Table expression not defined");
     }
+
+    if (getSorted(conf))
+      expr.setSortedSplit();
+
     String strProj = conf.get(INPUT_PROJ);
     String projection = null;
     try {
@@ -241,16 +451,37 @@
     } catch (ParseException e) {
     	throw new IOException("Projection parsing failed : "+e.getMessage());
     }
+
     try {
       return new TableRecordReader(expr, projection, split, conf);
     } catch (ParseException e) {
     	throw new IOException("Projection parsing faile : "+e.getMessage());
     }
   }
+  
+  /**
+   * Get a TableRecordReader on a single split
+   * 
+   * @param conf
+   *          JobConf object.
+   * @param projection
+   *          comma-separated column names in projection. null means all columns in projection
+   */
+  
+  public TableRecordReader getTableRecordReader(JobConf conf, String projection) throws IOException, ParseException
+  {
+	// a single split is needed
+    if (projection != null)
+    	setProjection(conf, projection);
+    TableInputFormat inputFormat = new TableInputFormat();
+    InputSplit[] splits = inputFormat.getSplits(conf, 1);
+    return (TableRecordReader) getRecordReader(splits[0], conf, Reporter.NULL);
+  }
 
   private static InputSplit[] getSortedSplits(JobConf conf, int numSplits,
       TableExpr expr, List<BasicTable.Reader> readers,
       List<BasicTableStatus> status) throws IOException {
+
     if (expr.sortedSplitRequired() && !expr.sortedSplitCapable()) {
       throw new IOException("Unable to created sorted splits");
     }
@@ -410,6 +641,8 @@
   public InputSplit[] getSplits(JobConf conf, int numSplits)
       throws IOException {
     TableExpr expr = getInputExpr(conf);
+    if (getSorted(conf))
+      expr.setSortedSplit();
     if (expr.sortedSplitRequired() && !expr.sortedSplitCapable()) {
       throw new IOException("Unable to created sorted splits");
     }
@@ -542,6 +775,12 @@
     if (bool == 1) {
       end.readFields(in);
     }
+    length = WritableUtils.readVLong(in);
+    int size = WritableUtils.readVInt(in);
+    if (size > 0)
+      hosts = new String[size];
+    for (int i = 0; i < size; i++)
+    	hosts[i] = WritableUtils.readString(in);
   }
 
   @Override
@@ -560,6 +799,12 @@
       WritableUtils.writeVInt(out, 1);
       end.write(out);
     }
+    WritableUtils.writeVLong(out, length);
+    WritableUtils.writeVInt(out, hosts == null ? 0 : hosts.length);
+    for (int i = 0; i < hosts.length; i++)
+    {
+    	WritableUtils.writeString(out, hosts[i]);
+    }
   }
   
   public BytesWritable getBegin() {
@@ -644,79 +889,3 @@
     return split;
   }
 }
-
-/**
- * Adaptor class to implement RecordReader on top of Scanner.
- */
-class TableRecordReader implements RecordReader<BytesWritable, Tuple> {
-  private final TableScanner scanner;
-  private long count = 0;
-
-  /**
-   * 
-   * @param expr
-   * @param projection
-   *          projection schema. Should never be null.
-   * @param split
-   * @param conf
-   * @throws IOException
-   */
-  public TableRecordReader(TableExpr expr, String projection,
-      InputSplit split,
-      JobConf conf) throws IOException, ParseException {
-    if (expr.sortedSplitRequired()) {
-      SortedTableSplit tblSplit = (SortedTableSplit) split;
-      scanner =
-          expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection,
-              conf);
-    }
-    else {
-      UnsortedTableSplit tblSplit = (UnsortedTableSplit) split;
-      scanner = expr.getScanner(tblSplit, projection, conf);
-    }
-  }
-  
-  @Override
-  public void close() throws IOException {
-    scanner.close();
-  }
-
-  @Override
-  public BytesWritable createKey() {
-    return new BytesWritable();
-  }
-
-  @Override
-  public Tuple createValue() {
-    try {
-      return TypesUtils.createTuple(Projection.getNumColumns(scanner.getProjection()));
-    }
-    catch (IOException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-    return null;
-  }
-
-  @Override
-  public long getPos() throws IOException {
-    return count;
-  }
-
-  @Override
-  public float getProgress() throws IOException {
-    return (float) ((scanner.atEnd()) ? 1.0 : 0);
-  }
-
-  @Override
-  public boolean next(BytesWritable key, Tuple value) throws IOException {
-    if (scanner.atEnd()) {
-      return false;
-    }
-    scanner.getKey(key);
-    scanner.getValue(value);
-    scanner.advance();
-    ++count;
-    return true;
-  }
-}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Adaptor class to implement RecordReader on top of Scanner.
+ */
+public class TableRecordReader implements RecordReader<BytesWritable, Tuple> {
+  private final TableScanner scanner;
+  private long count = 0;
+
+  /**
+   * 
+   * @param expr
+   *          Table expression
+   * @param projection
+   *          projection schema. Should never be null.
+   * @param split
+   *          the split to work on
+   * @param conf
+   *          JobConf object
+   * @throws IOException
+   */
+  public TableRecordReader(TableExpr expr, String projection,
+      InputSplit split,
+      JobConf conf) throws IOException, ParseException {
+    if (expr.sortedSplitRequired()) {
+      SortedTableSplit tblSplit = (SortedTableSplit) split;
+      scanner =
+          expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection,
+              conf);
+    }
+    else {
+      UnsortedTableSplit tblSplit = (UnsortedTableSplit) split;
+      scanner = expr.getScanner(tblSplit, projection, conf);
+    }
+  }
+  
+  @Override
+  public void close() throws IOException {
+    scanner.close();
+  }
+
+  @Override
+  public BytesWritable createKey() {
+    return new BytesWritable();
+  }
+
+  @Override
+  public Tuple createValue() {
+    try {
+      return TypesUtils.createTuple(Projection.getNumColumns(scanner.getProjection()));
+    }
+    catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    return null;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return count;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return (float) ((scanner.atEnd()) ? 1.0 : 0);
+  }
+
+  @Override
+  public boolean next(BytesWritable key, Tuple value) throws IOException {
+    if (scanner.atEnd()) {
+      return false;
+    }
+    scanner.getKey(key);
+    scanner.getValue(value);
+    scanner.advance();
+    ++count;
+    return true;
+  }
+  
+  /**
+   * Seek to the position at the first row which has the key
+   * or just after the key; only applicable for sorted Zebra table
+   *
+   * @param key
+   *          the key to seek on
+   */
+  public boolean seekTo(BytesWritable key) throws IOException {
+    return scanner.seekTo(key);
+  }
+  
+  /**
+   * Check if the end of the input has been reached
+   *
+   * @return true if the end of the input is reached
+   */
+  public boolean atEnd() throws IOException {
+	  return scanner.atEnd();
+  }
+}

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java Thu Nov  5 21:02:57 2009
@@ -162,6 +162,7 @@
   CachedTableScanner[] scanners;
   PriorityBlockingQueue<CachedTableScanner> queue;
   boolean synced = false;
+  CachedTableScanner scanner = null; // the working scanner
 
   SortedTableUnionScanner(List<TableScanner> scanners) throws IOException {
     if (scanners.isEmpty()) {
@@ -188,6 +189,9 @@
       TableScanner scanner = scanners.get(i);
       this.scanners[i] = new CachedTableScanner(scanner);
     }
+    // initial fill-ins
+    if (!atEnd())
+    	scanner = queue.poll();
   }
   
 
@@ -206,21 +210,18 @@
   @Override
   public boolean advance() throws IOException {
     sync();
-    CachedTableScanner scanner = queue.poll();
-    if (scanner != null) {
-      scanner.advance();
-      if (!scanner.atEnd()) {
-        queue.add(scanner);
-      }
-      return true;
+    scanner.advance();
+    if (!scanner.atEnd()) {
+      queue.add(scanner);
     }
-    return false;
+    scanner = queue.poll();
+    return (scanner != null);
   }
 
   @Override
   public boolean atEnd() throws IOException {
     sync();
-    return queue.isEmpty();
+    return (scanner == null && queue.isEmpty());
   }
 
   @Override
@@ -239,7 +240,6 @@
       throw new EOFException("No more rows to read");
     }
     
-    CachedTableScanner scanner = queue.poll();
     key.set(scanner.getKey());
   }
 
@@ -249,7 +249,6 @@
       throw new EOFException("No more rows to read");
     }
     
-    CachedTableScanner scanner = queue.poll();
     row.reference(scanner.getValue());
   }
 
@@ -260,6 +259,8 @@
       rv = rv || scanner.seekTo(key);
     }
     synced = false;
+    if (!atEnd())
+      scanner = queue.poll();
     return rv;
   }
 
@@ -268,6 +269,7 @@
     for (CachedTableScanner scanner : scanners) {
       scanner.seekToEnd();
     }
+    scanner = null;
     synced = false;
   }
 

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java Thu Nov  5 21:02:57 2009
@@ -18,7 +18,7 @@
 /**
  * Providing {@link org.apache.hadoop.mapred.InputFormat} and
  * {@link org.apache.hadoop.mapred.OutputFormat} adaptor classes for Hadoop
- * Table.
+ * Zebra Table.
  */
 package org.apache.hadoop.zebra.mapred;
 

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java Thu Nov  5 21:02:57 2009
@@ -32,7 +32,7 @@
  * represent row keys, and PIG {@link org.apache.pig.data.Tuple} objects to
  * represent rows.
  * <p>
- * Each BasicTable maintains a {@link org.apache.hadoop.zebra.types.Schema} ,
+ * Each BasicTable maintains a {@link org.apache.hadoop.zebra.schema.Schema} ,
  * which, for this release, is nothing but a collection of column names. Given a
  * schema, we can deduce the integer index of a particular column, and use it to
  * extract (get) the desired datum from PIG Tuple object (which only allows

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Thu Nov  5 21:02:57 2009
@@ -27,7 +27,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-//import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,11 +40,14 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.zebra.io.BasicTable;
 import org.apache.hadoop.zebra.mapred.TableInputFormat;
+import org.apache.hadoop.zebra.mapred.TableRecordReader;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.types.Projection;
 import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.SortInfo;
 import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.Slice;
@@ -57,16 +59,25 @@
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.hadoop.zebra.pig.comparator.*;
+import org.apache.pig.IndexableLoadFunc;
+import org.apache.hadoop.zebra.io.TableScanner;
 
 /**
- * Pig LoadFunc and Slicer for Table
+ * Pig IndexableLoadFunc and Slicer for Zebra Table
  */
-public class TableLoader implements LoadFunc, Slicer {
+public class TableLoader implements IndexableLoadFunc, Slicer {
 	static final Log LOG = LogFactory.getLog(TableLoader.class);
 	private TableInputFormat inputFormat;
 	private JobConf jobConf;
 	private String projectionString;
 	private Path[] paths;
+	private TableRecordReader indexReader = null;
+	private BytesWritable indexKey = null;
+  private Tuple tuple;
+  private org.apache.hadoop.zebra.schema.Schema schema;  
+  private SortInfo sortInfo;
+  private boolean sorted = false;
   
 	/**
 	 * default constructor
@@ -84,12 +95,103 @@
 		projectionString = projectionStr;	  
 	}
 
+  /**
+	 * @param projectionStr
+	 * 		  projection string passed from pig query.
+   * @param sorted
+   *      need sorted table(s)?
+	 */
+	public TableLoader(String projectionStr, String sorted) throws IOException {
+      inputFormat = new TableInputFormat();
+      if (projectionStr != null && !projectionStr.isEmpty())
+        projectionString = projectionStr;	  
+      if (sorted.equalsIgnoreCase("sorted"))
+        this.sorted = true;
+      else
+        throw new IOException("Invalid argument to the table loader constructor: "+sorted);
+	}
+
 	@Override
-	public void bindTo(String fileName, BufferedPositionedInputStream is,
-			long offset, long end) throws IOException {
-		throw new IOException("Not implemented");
+	public void initialize(Configuration conf) throws IOException
+	{
+	  if (conf == null)
+	    throw new IOException("Null Configuration passed.");
+	  jobConf = new JobConf(conf);
 	}
+	
+	@Override
+	public void bindTo(String filePaths, BufferedPositionedInputStream is,
+			long offset, long end) throws IOException {
+
+      FileInputFormat.setInputPaths(jobConf, filePaths);
+      Path[] paths = FileInputFormat.getInputPaths(jobConf);
+			/**
+			 * Performing glob pattern matching
+			 */
+			List<Path> result = new ArrayList<Path>(paths.length);
+			for (Path p : paths) {
+				FileSystem fs = p.getFileSystem(jobConf);
+				FileStatus[] matches = fs.globStatus(p);
+				if (matches == null) {
+					LOG.warn("Input path does not exist: " + p);
+				}
+				else if (matches.length == 0) {
+					LOG.warn("Input Pattern " + p + " matches 0 files");
+				} else {
+					for (FileStatus globStat: matches) {
+						if (globStat.isDir()) {
+							result.add(globStat.getPath());
+						} else {
+							LOG.warn("Input path " + p + " is not a directory");
+						}
+					}
+				}
+			}
+			if (result.isEmpty()) {
+				throw new IOException("No table specified for input");
+			}
+			TableInputFormat.setInputPaths(jobConf, result.toArray(new Path[result.size()]));
 
+      TableInputFormat.requireSortedTable(jobConf);
+      sortInfo = TableInputFormat.getSortInfo(jobConf);
+      schema = TableInputFormat.getSchema(jobConf);
+      int numcols = schema.getNumColumns();
+      tuple = TypesUtils.createTuple(numcols);
+
+      /*
+       * Use all columns of a table as a projection: not an optimal approach
+       * No need to call TableInputFormat.setProjection: by default use all columns
+       */
+      try {
+        indexReader = inputFormat.getTableRecordReader(jobConf, null);
+      } catch (ParseException e) {
+    	  throw new IOException("Exception from TableInputFormat.getTableRecordReader: "+e.getMessage());
+      }
+      indexKey = new BytesWritable();
+    }
+
+  @Override
+  public void seekNear(Tuple t) throws IOException
+  {
+		// SortInfo sortInfo =  inputFormat.getSortInfo(conf, path);
+		String[] sortColNames = sortInfo.getSortColumnNames();
+		byte[] types = new byte[sortColNames.length];
+		for(int i =0 ; i < sortColNames.length; ++i){
+			types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
+		}
+		KeyGenerator builder = makeKeyBuilder(types);
+		BytesWritable key = builder.generateKey(t);
+//	    BytesWritable key = new BytesWritable(((String) t.get(0)).getBytes());
+		indexReader.seekTo(key);
+  }
+
+  private KeyGenerator makeKeyBuilder(byte[] elems) {
+	    ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
+	    for (int i = 0; i < elems.length; ++i) {
+	      exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
+	    }
+	    return new KeyGenerator(ExprUtils.tupleComparator(exprs));
+  }  
 	/**
 	 * @param storage
 	 * @param location
@@ -145,6 +247,8 @@
 			} catch (ParseException e) {
 				throw new RuntimeException("Schema parsing failed : "+e.getMessage());
 			}
+      if (sorted)
+  			TableInputFormat.requireSortedTable(jobConf);
 		}
 	}
   
@@ -215,17 +319,27 @@
 
 	@Override
 	public Tuple getNext() throws IOException {
-		throw new IOException("Not implemented");
+      if (indexReader.atEnd())
+        return null;
+      indexReader.next(indexKey, tuple);
+      return tuple;
 	}
 
+  @Override
+  public void close() throws IOException {
+    if (indexReader != null)
+      indexReader.close();
+  }
+
 	@Override
 	public Slice[] slice(DataStorage store, String location) throws IOException {
 		checkConf(store, location);
 		// TableInputFormat accepts numSplits < 0 (special case for no-hint)
 		InputSplit[] splits = inputFormat.getSplits(jobConf, -1);
+
 		Slice[] slices = new Slice[splits.length];
 		for (int nx = 0; nx < slices.length; nx++) {
-			slices[nx] = new TableSlice(jobConf, splits[nx]);
+			slices[nx] = new TableSlice(jobConf, splits[nx], sorted);
 		}
 
 		return slices;
@@ -244,11 +358,12 @@
 		private InputSplit split;
     
 		transient private JobConf conf;
-		transient private String projection;
+		transient private int numProjCols = 0;
 		transient private RecordReader<BytesWritable, Tuple> scanner;
 		transient private BytesWritable key;
+    transient private boolean sorted = false;
 
-		TableSlice(JobConf conf, InputSplit split) {
+		TableSlice(JobConf conf, InputSplit split, boolean sorted) {
 			// hack: expecting JobConf contains nothing but a <string, string>
 			// key-value pair store.
 			configMap = new TreeMap<String, String>();
@@ -257,6 +372,7 @@
 				configMap.put(e.getKey(), e.getValue());
 			}
 			this.split = split;
+      this.sorted = sorted;
 		}
 
 		@Override
@@ -315,20 +431,24 @@
 				localConf.set(e.getKey(), e.getValue());
 			}
 			conf = new JobConf(localConf);
+      String projection;
 			try
-			{
-				projection = TableInputFormat.getProjection(conf);
-			} catch (ParseException e) {
-				throw new IOException("Schema parsing failed :"+e.getMessage());
-			}
+      {
+        projection = TableInputFormat.getProjection(conf);
+      } catch (ParseException e) {
+        throw new IOException("Schema parsing failed :"+e.getMessage());
+      }
+      numProjCols = Projection.getNumColumns(projection);
 			TableInputFormat inputFormat = new TableInputFormat();
+      if (sorted)
+        TableInputFormat.requireSortedTable(conf);
 			scanner = inputFormat.getRecordReader(split, conf, Reporter.NULL);
 			key = new BytesWritable();
 		}
 
 		@Override
 		public boolean next(Tuple value) throws IOException {
-			TypesUtils.formatTuple(value, projection);
+			TypesUtils.formatTuple(value, numProjCols);
 			return scanner.next(key, value);
 		}
     

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Thu Nov  5 21:02:57 2009
@@ -21,25 +21,37 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.reflect.Constructor;
+import java.util.List;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.zebra.pig.comparator.ComparatorExpr;
+import org.apache.hadoop.zebra.pig.comparator.ExprUtils;
+import org.apache.hadoop.zebra.pig.comparator.KeyGenerator;
 import org.apache.hadoop.zebra.io.BasicTable;
 import org.apache.hadoop.zebra.io.TableInserter;
 import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.SortInfo;
+import org.apache.hadoop.zebra.types.TypesUtils;
 import org.apache.pig.StoreConfig;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.CommittableStoreFunc;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.hadoop.zebra.pig.comparator.*;
 
-public class TableStorer implements StoreFunc {
+/**
+ * Pig CommittableStoreFunc for Zebra Table
+ */
+public class TableStorer implements CommittableStoreFunc {
 	private String storageHintString;
 
 	public TableStorer() {	  
@@ -78,6 +90,18 @@
 	static public void main(String[] args) throws SecurityException, NoSuchMethodException {
 		Constructor meth = TableOutputFormat.class.getDeclaredConstructor(emptyArray);
 	}
+
+  @Override
+  public void commit(Configuration conf) throws IOException {
+    try {
+      JobConf job = new JobConf(conf);
+      StoreConfig storeConfig = MapRedUtil.getStoreConfig(job);
+      BasicTable.Writer write = new BasicTable.Writer(new Path(storeConfig.getLocation()), job);
+      write.close();
+    } catch (IOException ee) {
+      throw ee;
+    }
+  }
 }
 
 /**
@@ -91,6 +115,36 @@
 		StoreConfig storeConfig = MapRedUtil.getStoreConfig(job);
 		String location = storeConfig.getLocation(), schemaStr;
     Schema schema = storeConfig.getSchema();
+    org.apache.pig.SortInfo pigSortInfo = storeConfig.getSortInfo();
+
+    /* TODO
+     * use a home-brewn comparator ??
+     */
+    String comparator = null;
+    String sortColumnNames = null;
+    if (pigSortInfo != null)
+    {
+      List<org.apache.pig.SortColInfo> sortColumns = pigSortInfo.getSortColInfoList();
+      StringBuilder sb = new StringBuilder();
+      if (sortColumns != null && sortColumns.size() >0)
+      {
+        org.apache.pig.SortColInfo sortColumn;
+        String sortColumnName;
+        for (int i = 0; i < sortColumns.size(); i++)
+        {
+          sortColumn = sortColumns.get(i);
+          sortColumnName = sortColumn.getColName();
+          if (sortColumnName == null)
+            throw new IOException("Zebra does not support column positional reference yet");
+          if (!org.apache.pig.data.DataType.isAtomic(schema.getField(sortColumnName).type))
+        	  throw new IOException(schema.getField(sortColumnName).alias+" is not of simple type as required for a sort column now.");
+          if (i > 0)
+            sb.append(",");
+          sb.append(sortColumnName);
+        }
+        sortColumnNames = sb.toString();
+      }
+    }
     try {
       schemaStr = SchemaConverter.fromPigSchema(schema).toString();
     } catch (ParseException e) {
@@ -98,7 +152,7 @@
     }
 		TableStorer storeFunc = (TableStorer)MapRedUtil.getStoreFunc(job);   
 		BasicTable.Writer writer = new BasicTable.Writer(new Path(location), 
-				schemaStr, storeFunc.getStorageHintString(), false, job);
+				schemaStr, storeFunc.getStorageHintString(), sortColumnNames, comparator, job);
 		writer.finish();
 	}
     
@@ -118,6 +172,9 @@
 	final private BytesWritable KEY0 = new BytesWritable(new byte[0]); 
 	private BasicTable.Writer writer;
 	private TableInserter inserter;
+  private int[] sortColIndices = null;
+  KeyGenerator builder;
+  Tuple t;
 
 	public TableRecordWriter(String name, JobConf conf) throws IOException {
 		StoreConfig storeConfig = MapRedUtil.getStoreConfig(conf);
@@ -126,6 +183,22 @@
 		// TODO: how to get? 1) column group splits; 2) flag of sorted-ness,
 		// compression, etc.
 		writer = new BasicTable.Writer(new Path(location), conf);
+
+    if (writer.getSortInfo() != null)
+    {
+      sortColIndices = writer.getSortInfo().getSortIndices();
+      SortInfo sortInfo =  writer.getSortInfo();
+      String[] sortColNames = sortInfo.getSortColumnNames();
+      org.apache.hadoop.zebra.schema.Schema schema = writer.getSchema();
+
+      byte[] types = new byte[sortColNames.length];
+      for(int i =0 ; i < sortColNames.length; ++i){
+        types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
+      }
+      t = TypesUtils.createTuple(sortColNames.length);
+      builder = makeKeyBuilder(types);
+    }
+
 		inserter = writer.getInserter(name, false);
 	}
 
@@ -135,9 +208,23 @@
 		writer.finish();
 	}
 
+  private KeyGenerator makeKeyBuilder(byte[] elems) {
+	    ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
+	    for (int i = 0; i < elems.length; ++i) {
+	      exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
+	    }
+	    return new KeyGenerator(ExprUtils.tupleComparator(exprs));
+  }
+
   @Override
   public void write(BytesWritable key, Tuple value) throws IOException {
-    if (key == null) {
+    if (sortColIndices != null)
+    {
+      for(int i =0; i < sortColIndices.length;++i) {
+        t.set(i, value.get(sortColIndices[i]));
+      }
+      key = builder.generateKey(t);
+    } else if (key == null) {
       key = KEY0;
     }
     inserter.insert(key, value);

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BagExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BagExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BagExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BagExpr.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+
+public class BagExpr extends LeafExpr {
+  protected List<LeafGenerator> list;
+  protected final ComparatorExpr expr;
+  protected int el, cel;
+  protected boolean c;
+
+  public BagExpr(int index, ComparatorExpr expr) {
+    super(index);
+    this.expr = expr;
+  }
+
+  @Override
+  protected
+  void appendObject(EncodingOutputStream out, Object object)
+      throws ExecException {
+
+    if (list == null) {
+      // This is the first time we get called. build the execution plan.
+      el = out.getEscapeLevel();
+      cel = out.getComescLevel();
+      c = out.getComplement();
+      list = new ArrayList<LeafGenerator>();
+      // requiring the individual items to be explicitly bounded.
+      expr.appendLeafGenerator(list, el, cel, c, true);
+    }
+
+    DataBag bag = (DataBag) object;
+    for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
+      Tuple t = it.next();
+      for (Iterator<LeafGenerator> it2 = list.iterator(); it2.hasNext();) {
+        LeafGenerator g = it2.next();
+        g.append(out, t);
+      }
+    }
+  }
+
+  @Override
+  protected
+  boolean implicitBound() {
+    return true;
+  }
+
+  public void illustrate(PrintStream out, int escapeLevel, int comescLevel,
+      boolean complement) {
+    List<LeafGenerator> l = new ArrayList<LeafGenerator>();
+    expr.appendLeafGenerator(l, escapeLevel, comescLevel, complement, true);
+    out.printf("(%s, %d, [", getType(), index);
+    for (Iterator<LeafGenerator> it = l.iterator(); it.hasNext();) {
+      LeafGenerator leaf = it.next();
+      leaf.illustrate(out);
+    }
+    out.print("])");
+  }
+
+  @Override
+  protected
+  String getType() {
+    return "Bag";
+  }
+
+  @Override
+  protected
+  void toString(PrintStream out) {
+    out.printf("%s(%d, ", getType(), index);
+    expr.toString(out);
+    out.print(")");
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BooleanExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BooleanExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BooleanExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BooleanExpr.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+public final class BooleanExpr extends FixedLengthPrimitive {
+  public BooleanExpr(int index) {
+    super(index, 1);
+  }
+
+  protected void convertValue(byte[] b, Object o) {
+    if (b.length != 1) {
+      throw new IllegalArgumentException("expecting array length == 1");
+    }
+    boolean l = (Boolean) o;
+    // Avoid using 0, 1, which may require escaping later.
+    b[0] = (byte) (l ? 3 : 2);
+  }
+
+  @Override
+  protected String getType() {
+    return "Boolean";
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ByteExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ByteExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ByteExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ByteExpr.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+public final class ByteExpr extends FixedLengthPrimitive {
+  public ByteExpr(int index) {
+    super(index, 1);
+  }
+
+  protected void convertValue(byte[] b, Object o) {
+    if (b.length != 1) {
+      throw new IllegalArgumentException("Expecting length==1");
+    }
+    byte l = (Byte) o;
+    l ^= 1 << (Byte.SIZE - 1);
+    b[0] = l;
+  }
+
+  @Override
+  protected String getType() {
+    return "Byte";
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BytesExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BytesExpr.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BytesExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/BytesExpr.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import org.apache.pig.data.DataByteArray;
+
+public final class BytesExpr extends LeafExpr {
+  public BytesExpr(int index) {
+    super(index);
+  }
+
+  @Override
+  protected void appendObject(EncodingOutputStream out, Object o) {
+    DataByteArray bytes = (DataByteArray) o;
+    out.write(bytes.get());
+  }
+
+  @Override
+  protected boolean implicitBound() {
+    return true;
+  }
+
+  @Override
+  protected String getType() {
+    return "ByteArray";
+  }
+}