You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/24 20:54:34 UTC

svn commit: r883836 [2/23] - in /hadoop/pig/branches/load-store-redesign: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/ contrib/zebra/ contrib/zebra...

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Tue Nov 24 19:54:19 2009
@@ -21,6 +21,7 @@
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -39,15 +40,18 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.file.tfile.TFile;
-import org.apache.hadoop.io.file.tfile.Utils;
-import org.apache.hadoop.io.file.tfile.MetaBlockAlreadyExists;
-import org.apache.hadoop.io.file.tfile.MetaBlockDoesNotExist;
-import org.apache.hadoop.io.file.tfile.Utils.Version;
+import org.apache.hadoop.zebra.tfile.TFile;
+import org.apache.hadoop.zebra.tfile.Utils;
+import org.apache.hadoop.zebra.tfile.MetaBlockAlreadyExists;
+import org.apache.hadoop.zebra.tfile.MetaBlockDoesNotExist;
+import org.apache.hadoop.zebra.tfile.Utils.Version;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRangeSplit;
+import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRowSplit;
 import org.apache.hadoop.zebra.types.CGSchema;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.types.Partition;
@@ -55,6 +59,7 @@
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.parser.TableSchemaParser;
 import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.SortInfo;
 import org.apache.pig.data.Tuple;
 
 /**
@@ -81,13 +86,11 @@
   private final static String BT_SCHEMA_FILE = ".btschema";
   // schema version
   private final static Version SCHEMA_VERSION =
-      new Version((short) 1, (short) 0);
+      new Version((short) 1, (short) 1);
   // name of the BasicTable meta-data file
   private final static String BT_META_FILE = ".btmeta";
   // column group prefix
   private final static String CGPathPrefix = "CG";
-  // default comparator to "memcmp"
-  private final static String DEFAULT_COMPARATOR = TFile.COMPARATOR_MEMCMP;
 
   private final static String DELETED_CG_PREFIX = ".deleted-";
   
@@ -114,7 +117,11 @@
    *
    * Dropping a column group that has already been removed is a no-op no 
    * exception is thrown.
+   * <br> <br> 
    * 
+   * Note that this feature is experimental now and subject to changes in the
+   * future.
+   *
    * @param path path to BasicTable
    * @param conf Configuration determines file system and other parameters.
    * @param cgName name of the column group to drop.
@@ -127,8 +134,25 @@
                                      throws IOException {
     
     FileSystem fs = FileSystem.get(conf);
+    int triedCount = 0;
+    int numCGs =  SchemaFile.getNumCGs(path, conf);
+    SchemaFile schemaFile = null;
+    
+    /* Retry up to numCGs times accounting for other CG deleting threads or processes.*/
+    while (triedCount ++ < numCGs) {
+      try {
+        schemaFile = new SchemaFile(path, conf);
+        break;
+      } catch (FileNotFoundException e) {
+        LOG.info("Try " + triedCount + " times : " + e.getMessage());
+      } catch (Exception e) {
+        throw new IOException ("Cannot construct SchemaFile : " + e.getMessage());
+      }
+    }
     
-    SchemaFile schemaFile = new SchemaFile(path, conf);
+    if (schemaFile == null) {
+      throw new IOException ("Cannot construct SchemaFile");
+    }
     
     int cgIdx = schemaFile.getCGByName(cgName);
     if (cgIdx < 0) {
@@ -137,9 +161,8 @@
     }
     
     Path cgPath = new Path(path, schemaFile.getName(cgIdx));
-    
-    //Clean up any previous unfinished attempts to drop column groups?
-    
+        
+    //Clean up any previous unfinished attempts to drop column groups?    
     if (schemaFile.isCGDeleted(cgIdx)) {
       // Clean up unfinished delete if it exists. so that clean up can 
       // complete if the previous deletion was interrupted for some reason.
@@ -271,7 +294,8 @@
         schema = schemaFile.getLogical();
         projection = new Projection(schema);
         String storage = schemaFile.getStorageString();
-        partition = new Partition(schema, projection, storage);
+        String comparator = schemaFile.getComparator();
+        partition = new Partition(schema, projection, storage, comparator);
         for (int nx = 0; nx < numCGs; nx++) {
           if (!schemaFile.isCGDeleted(nx)) {
             colGroups[nx] =
@@ -332,6 +356,21 @@
     }
 
     /**
+     * @return the list of sorted columns
+     */
+    public SortInfo getSortInfo()
+    {
+      return schemaFile.getSortInfo();
+    }
+    
+    /**
+     * @return the name of i-th column group 
+     */
+    public String getName(int i) {
+      return schemaFile.getName(i);
+    }
+
+    /**
      * Set the projection for the reader. This will affect calls to
      * {@link #getScanner(RangeSplit, boolean)},
      * {@link #getScanner(BytesWritable, BytesWritable, boolean)},
@@ -351,7 +390,7 @@
         this.projection = new Projection(schemaFile.getLogical());
         partition =
             new Partition(schemaFile.getLogical(), this.projection, schemaFile
-                .getStorageString());
+                .getStorageString(), schemaFile.getComparator());
       }
       else {
         /**
@@ -362,7 +401,7 @@
             new Projection(schemaFile.getLogical(), projection);
         partition =
             new Partition(schemaFile.getLogical(), this.projection, schemaFile
-                .getStorageString());
+                .getStorageString(), schemaFile.getComparator());
       }
       inferredMapping = false;
     }
@@ -389,13 +428,31 @@
       BlockDistribution bd = new BlockDistribution();
       for (int nx = 0; nx < colGroups.length; nx++) {
         if (!isCGDeleted(nx)) {
-          bd.add(colGroups[nx].getBlockDistribution(split == null ? null : split
-            .get(nx)));
+          bd.add(colGroups[nx].getBlockDistribution(split == null ? null : split.getCGRangeSplit()));
         }
       }
       return bd;
     }
 
+
+    /**
+     * Given a row-based split, calculate how the file data that fall into the split
+     * are distributed among hosts.
+     * 
+     * @param split The row-based split. <i>Cannot</i> be null.
+     * @return An object that conveys how blocks fall into the split are
+     *         distributed across hosts.
+     * @see #rowSplit(int)
+     */
+    public BlockDistribution getBlockDistribution(RowSplit split)
+        throws IOException {
+      BlockDistribution bd = new BlockDistribution();
+      int cgIdx = split.getCGIndex();      
+      bd.add(colGroups[cgIdx].getBlockDistribution(split.getCGRowSplit()));
+      
+      return bd;
+    }
+
     /**
      * Collect some key samples and use them to partition the table. Only
      * applicable to sorted BasicTable. The returned {@link KeyDistribution}
@@ -415,7 +472,7 @@
            kd.add(colGroups[nx].getKeyDistribution(n));
         }
       }
-      if (kd.size() > (int) (n * 1.5)) {
+      if (n >= 0 && kd.size() > (int) (n * 1.5)) {
         kd.resize(n);
       }
       return kd;
@@ -473,6 +530,26 @@
     }
 
     /**
+     * Get a scanner that reads a consecutive number of rows as defined in the
+     * {@link RowSplit} object.
+     * 
+     * @param closeReader
+     *          close the underlying Reader object when we close the scanner.
+     *          Should be set to true if we have only one scanner on top of the
+     *          reader, so that we should release resources after the scanner is
+     *          closed.
+     * @param rowSplit split based on row numbers.
+     * 
+     * @return A scanner object.
+     * @throws IOException
+     */
+    public synchronized TableScanner getScanner(boolean closeReader,
+                                                RowSplit rowSplit) 
+      throws IOException, ParseException, ParseException {
+      checkInferredMapping();
+      return new BTScanner(rowSplit, closeReader, partition);
+    }
+    /**
      * Get the schema of the table. The schema may be different from
      * {@link BasicTable.Reader#getSchema(Path, Configuration)} if a projection
      * has been set on the table.
@@ -506,61 +583,106 @@
     public String getPath() {
       return path.toString();
     }
+    
+    /**
+     * Get the path filter used by the table.
+     */
+    public PathFilter getPathFilter(Configuration conf) {
+      ColumnGroup.CGPathFilter filter = new ColumnGroup.CGPathFilter();
+      ColumnGroup.CGPathFilter.setConf(conf);
+      return filter;
+    }
 
     /**
      * Split the table into at most n parts.
      * 
-     * @param n
-     *          Maximum number of parts in the output list.
+     * @param n Maximum number of parts in the output list.
      * @return A list of RangeSplit objects, each of which can be used to
      *         construct TableScanner later.
      */
-    @SuppressWarnings("unchecked")
     public List<RangeSplit> rangeSplit(int n) throws IOException {
-      // assume all CGs will be split into the same number of horizontal
-      // slices
-      List<CGRangeSplit>[] cgSplitsAll = new ArrayList[colGroups.length];
-      // split each CG
-      for (int nx = 0; nx < colGroups.length; nx++) {
-        if (!isCGDeleted(nx))
-          cgSplitsAll[nx] = colGroups[nx].rangeSplit(n);
-      }
-
-      // verify all CGs have same number of slices
-      int numSlices = -1;
-      for (int nx = 0; nx < cgSplitsAll.length; nx++) {
-        if (isCGDeleted(nx)) {
-          continue;
-        }
-        if (numSlices < 0) {
-          numSlices = cgSplitsAll[nx].size();
-        }
-        else if (cgSplitsAll[nx].size() != numSlices) {
-          throw new IOException(
-              "BasicTable's column groups were not equally split.");
+      // use the first non-deleted column group to do split, other column groups will be split exactly the same way.
+      List<RangeSplit> ret;
+      if (firstValidCG >= 0) {
+        List<CGRangeSplit> cgSplits = colGroups[firstValidCG].rangeSplit(n);
+        int numSlices = cgSplits.size();
+        ret = new ArrayList<RangeSplit>(numSlices);
+        for (int slice = 0; slice < numSlices; slice++) {
+          CGRangeSplit oneSliceSplit = cgSplits.get(slice);
+          ret.add(new BasicTable.Reader.RangeSplit(oneSliceSplit));
         }
+
+        return ret;
+      } else { // all column groups are dropped.
+        ret = new ArrayList<RangeSplit>(1);
+        // add a dummy split
+        ret.add(new BasicTable.Reader.RangeSplit(new CGRangeSplit(0, 0)));
+        return ret;
       }
-      if (numSlices <= 0) {
-        // This could happen because of various reasons.
-        // One possibility is that all the CGs are deleted.
-        numSlices = 1;
-      }
-      // return horizontal slices as RangeSplits
-      List<RangeSplit> ret = new ArrayList<RangeSplit>(numSlices);
+    }
+
+    /**
+     * We already use FileInputFormat to create byte offset-based input splits.
+     * Their information is encoded in starts, lengths and paths. This method is 
+     * to wrap this information to form RowSplit objects at basic table level.
+     * 
+     * @param starts array of starting byte of fileSplits.
+     * @param lengths array of length of fileSplits.
+     * @param paths array of path of fileSplits.
+     * @param splitCGIndex index of column group that is used to create fileSplits.
+     * @return A list of RowSplit objects, each of which can be used to
+     *         construct a TableScanner later. 
+     *         
+     */
+    public List<RowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths, int splitCGIndex) throws IOException {
+      List<RowSplit> ret; 
+      
+      List<CGRowSplit> cgSplits = colGroups[splitCGIndex].rowSplit(starts, lengths, paths);
+      int numSlices = cgSplits.size();
+      ret = new ArrayList<RowSplit>(numSlices);
       for (int slice = 0; slice < numSlices; slice++) {
-        CGRangeSplit[] oneSliceSplits = new CGRangeSplit[cgSplitsAll.length];
-        for (int cgIndex = 0; cgIndex < cgSplitsAll.length; cgIndex++) {
-          if (isCGDeleted(cgIndex)) {
-            // set a dummy split
-            oneSliceSplits[cgIndex] = new CGRangeSplit(0, 0);
-          } else {
-            oneSliceSplits[cgIndex] = cgSplitsAll[cgIndex].get(slice);
-          }
-        }
-        ret.add(new BasicTable.Reader.RangeSplit(oneSliceSplits));
+        CGRowSplit cgRowSplit = cgSplits.get(slice);
+        ret.add(new BasicTable.Reader.RowSplit(splitCGIndex, cgRowSplit));
       }
+        
       return ret;
     }
+    
+
+    /** 
+     * Get index of the column group that will be used for row-based split. 
+     * 
+     */
+    public int getRowSplitCGIndex() {
+      // Try to find the largest non-deleted and used column group by projection;
+      int largestCGIndex = -1;
+      int splitCGIndex = -1;
+      long largestCGSize = -1;
+      for (int i=0; i<colGroups.length; i++) {
+        if (!partition.isCGNeeded(i) || isCGDeleted(i)) {
+          continue;
+        }
+        ColumnGroup.Reader reader = colGroups[i];
+        BasicTableStatus btStatus = reader.getStatus();
+        long size = btStatus.getSize();
+        if (size > largestCGSize) {
+          largestCGIndex = i;
+          largestCGSize = size;
+        }
+      }
+     
+      /* We do have a largest non-deleted and used column group,
+      and we use it to do split. */
+      if (largestCGIndex >= 0) { 
+        splitCGIndex = largestCGIndex;
+      } else if (firstValidCG >= 0) { /* If all projection columns are either deleted or non-existing,
+                                      then we use the first non-deleted column group to do split if it exists. */
+        splitCGIndex = firstValidCG; 
+      } 
+     
+      return splitCGIndex;
+    }
+
 
     /**
      * Close the BasicTable for reading. Resources are released.
@@ -642,10 +764,11 @@
      * implementation-dependent.
      */
     public static class RangeSplit implements Writable {
-      CGRangeSplit[] slice;
+      //CGRangeSplit[] slice;
+      CGRangeSplit slice;
 
-      RangeSplit(CGRangeSplit[] splits) {
-        slice = splits;
+      RangeSplit(CGRangeSplit split) {
+        slice = split;
       }
 
       /**
@@ -660,12 +783,10 @@
        */
       @Override
       public void readFields(DataInput in) throws IOException {
-        int count = Utils.readVInt(in);
-        slice = new CGRangeSplit[count];
-        for (int nx = 0; nx < count; nx++) {
+        for (int nx = 0; nx < 1; nx++) {
           CGRangeSplit cgrs = new CGRangeSplit();
           cgrs.readFields(in);
-          slice[nx] = cgrs;
+          slice = cgrs;
         }
       }
 
@@ -674,18 +795,81 @@
        */
       @Override
       public void write(DataOutput out) throws IOException {
-        Utils.writeVInt(out, slice.length);
-        for (CGRangeSplit split : slice) {
-          split.write(out);
-        }
+        //Utils.writeVInt(out, slice.length);
+        //for (CGRangeSplit split : slice) {
+        //  split.write(out);
+        //}
+        slice.write(out);
       }
 
-      CGRangeSplit get(int index) {
-        return slice[index];
+      //CGRangeSplit get(int index) {
+       // return slice[index];
+      //}
+      
+      CGRangeSplit getCGRangeSplit() {
+        return slice;
       }
     }
 
     /**
+     * A row-based split on the zebra table;
+     */
+    public static class RowSplit implements Writable {
+      int cgIndex;  // column group index where split lies on;
+      CGRowSplit slice; 
+
+      RowSplit(int cgidx, CGRowSplit split) {
+        this.cgIndex = cgidx;
+        this.slice = split;
+      }
+
+      /**
+       * Default constructor.
+       */
+      public RowSplit() {
+        // no-op
+      }
+      
+      @Override
+      public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{cgIndex = " + cgIndex + "}\n");
+        sb.append(slice.toString());
+        
+        return sb.toString();
+      }
+
+      /**
+       * @see Writable#readFields(DataInput)
+       */
+      @Override
+      public void readFields(DataInput in) throws IOException {
+        this.cgIndex = Utils.readVInt(in);
+        CGRowSplit cgrs = new CGRowSplit();
+        cgrs.readFields(in);
+        this.slice = cgrs;
+      }
+
+      /**
+       * @see Writable#write(DataOutput)
+       */
+      @Override
+      public void write(DataOutput out) throws IOException {
+        Utils.writeVInt(out, cgIndex);
+        slice.write(out);
+      }
+      
+      int getCGIndex() {
+        return cgIndex;
+      }
+
+      CGRowSplit getCGRowSplit() {
+        return slice;
+      }
+    }
+    
+    
+    /**
      * BasicTable scanner class
      */
     private class BTScanner implements TableScanner {
@@ -704,74 +888,106 @@
       }
 
       public BTScanner(BytesWritable beginKey, BytesWritable endKey,
-          boolean closeReader, Partition partition) throws IOException {
-        this.partition = partition;
-        boolean anyScanner = false;
-        try {
-          schema = partition.getProjection();
-          cgScanners = new TableScanner[colGroups.length];
-          for (int i = 0; i < colGroups.length; ++i) {
-            if (!isCGDeleted(i) && partition.isCGNeeded(i)) 
-            {
-              anyScanner = true;
-              cgScanners[i] = colGroups[i].getScanner(beginKey, endKey, false);
-            } else
-              cgScanners[i] = null;
-          }
-          if (!anyScanner && firstValidCG >= 0) {
-            // if no CG is needed explicitly by projection but the "countRow" still needs to access some column group
-            cgScanners[firstValidCG] = colGroups[firstValidCG].
-                                         getScanner(beginKey, endKey, false);
-          }
-          this.closeReader = closeReader;
-          sClosed = false;
-        }
-        catch (Exception e) {
-          throw new IOException("BTScanner constructor failed : "
-              + e.getMessage());
+        boolean closeReader, Partition partition) throws IOException {
+        init(null, null, beginKey, endKey, closeReader, partition);
+      }
+      
+      public BTScanner(RangeSplit split, Partition partition,
+        boolean closeReader) throws IOException {
+        init(null, split, null, null, closeReader, partition);
+      }
+      
+      public BTScanner(RowSplit rowSplit,  boolean closeReader, 
+                       Partition partition) throws IOException {
+        init(rowSplit, null, null, null, closeReader, partition);
+      }      
+      
+      /**
+       * Creates new CGRowSplit. If the startRow in rowSplit is not set 
+       * (i.e. < 0), it sets the startRow and numRows based on 'startByte' 
+       * and 'numBytes' from given rowSplit.
+       */
+      private CGRowSplit makeCGRowSplit(RowSplit rowSplit) throws IOException {
+        CGRowSplit inputCGSplit = rowSplit.getCGRowSplit(); 
+
+        int cgIdx = rowSplit.getCGIndex();
+        
+        CGRowSplit cgSplit = new CGRowSplit();
+        cgSplit.fileIndex = inputCGSplit.fileIndex;
+        // startByte and numBytes from inputCGSplit are ignored, since
+        // they make sense for only one CG.
+        cgSplit.startRow = inputCGSplit.startRow;
+        cgSplit.numRows = inputCGSplit.numRows;
+        
+        if (cgSplit.startRow >= 0) {
+          //assume the rows are already set up.
+          return cgSplit;
         }
-        finally {
-          if (sClosed) {
-            if (cgScanners != null) {
-              for (int i = 0; i < cgScanners.length; ++i) {
-                if (cgScanners[i] != null) {
-                  try {
-                    cgScanners[i].close();
-                    cgScanners[i] = null;
-                  }
-                  catch (Exception e) {
-                    // no-op
-                  }
-                }
-              }
-            }
-          }
+        
+        // Find the row range :
+        if (isCGDeleted(cgIdx)) {
+          throw new IOException("CG " + cgIdx + " is deleted.");
         }
+        
+        //fill the row numbers.
+        colGroups[cgIdx].fillRowSplit(cgSplit, inputCGSplit.startByte,
+                                      inputCGSplit.numBytes);
+        return cgSplit;
       }
-
-      public BTScanner(RangeSplit split, Partition partition,
-          boolean closeReader) throws IOException {
+    
+      // Helper function for initialization.
+      private TableScanner createCGScanner(int cgIndex, CGRowSplit cgRowSplit, 
+                                           RangeSplit rangeSplit,
+                                           BytesWritable beginKey, 
+                                           BytesWritable endKey) 
+                      throws IOException, ParseException, 
+                             ParseException {        
+        if (cgRowSplit != null) {
+          return colGroups[cgIndex].getScanner(false, cgRowSplit);
+        }      
+        if (beginKey != null || endKey != null) {
+          return colGroups[cgIndex].getScanner(beginKey, endKey, false);
+        }
+        return colGroups[cgIndex].getScanner
+                ((rangeSplit == null ? null : rangeSplit.getCGRangeSplit()), 
+                 false);
+      }
+      
+      /**
+       * If rowRange is not null, scanners will be created based on the 
+       * row range. <br>
+       * If RangeSplit is not null, scaller will be based on the range, <br>
+       * otherwise, these are based on keys.
+       */
+      private void init(RowSplit rowSplit, RangeSplit rangeSplit,
+                   BytesWritable beginKey, BytesWritable endKey, 
+                   boolean closeReader, Partition partition) throws IOException {
+        this.partition = partition;
+        boolean anyScanner = false;
+        
+        CGRowSplit cgRowSplit = null;
+        if (rowSplit != null) {
+          cgRowSplit = makeCGRowSplit(rowSplit);
+        }
+        
         try {
           schema = partition.getProjection();
           cgScanners = new TableScanner[colGroups.length];
-          boolean anyScanner = false;
           for (int i = 0; i < colGroups.length; ++i) {
-            // if no CG is needed explicitly by projection but the "countRow" still needs to access some column group
-            if (!isCGDeleted(i) && partition.isCGNeeded(i))
+            if (!isCGDeleted(i) && partition.isCGNeeded(i)) 
             {
-              cgScanners[i] =
-                  colGroups[i].getScanner(split == null ? null : split.get(i),
-                      false);
               anyScanner = true;
+              cgScanners[i] = createCGScanner(i, cgRowSplit, rangeSplit,
+                                              beginKey, endKey);                                                             
             } else
               cgScanners[i] = null;
           }
           if (!anyScanner && firstValidCG >= 0) {
             // if no CG is needed explicitly by projection but the "countRow" still needs to access some column group
-            cgScanners[firstValidCG] = colGroups[firstValidCG].
-              getScanner(split == null ? null : split.get(firstValidCG), false);
+            cgScanners[firstValidCG] = createCGScanner(firstValidCG, cgRowSplit, 
+                                                       rangeSplit,
+                                                       beginKey, endKey);            
           }
-          this.partition = partition;
           this.closeReader = closeReader;
           sClosed = false;
         }
@@ -1014,6 +1230,8 @@
     private boolean closed = true;
     ColumnGroup.Writer[] colGroups;
     Partition partition;
+    boolean sorted;
+    private boolean finished;
     Tuple[] cgTuples;
 
     /**
@@ -1039,35 +1257,34 @@
      *          implementation, the schema of a table is a comma or
      *          semicolon-separated list of column names, such as
      *          "FirstName, LastName; Sex, Department".
-     * @param sorted
-     *          Whether the table to be created is sorted or not. If set to
-     *          true, we expect the rows inserted by every inserter created from
-     *          this Writer must be sorted. Additionally, there exists an
-     *          ordering of the inserters Ins-1, Ins-2, ... such that the rows
-     *          created by Ins-1, followed by rows created by Ins-2, ... form a
-     *          total order.
+     * @param sortColumns
+     *          String of comma-separated sorted columns: null for unsorted tables
+     * @param comparator
+     *          Name of the comparator used in sorted tables
      * @param conf
      *          Optional Configuration objects.
      * 
      * @throws IOException
      * @see Schema
      */
-    public Writer(Path path, String btSchemaString, String btStorageString,
-        boolean sorted, Configuration conf) throws IOException {
+    public Writer(Path path, String btSchemaString, String btStorageString, String sortColumns,
+        String comparator, Configuration conf) throws IOException {
       try {
         schemaFile =
-            new SchemaFile(path, btSchemaString, btStorageString,
-                DEFAULT_COMPARATOR, sorted, conf);
+            new SchemaFile(path, btSchemaString, btStorageString, sortColumns,
+                comparator, conf);
         partition = schemaFile.getPartition();
         int numCGs = schemaFile.getNumOfPhysicalSchemas();
         colGroups = new ColumnGroup.Writer[numCGs];
         cgTuples = new Tuple[numCGs];
+        sorted = schemaFile.isSorted();
         for (int nx = 0; nx < numCGs; nx++) {
           colGroups[nx] =
               new ColumnGroup.Writer( 
                  new Path(path, schemaFile.getName(nx)),
             		 schemaFile.getPhysicalSchema(nx), 
             		 sorted, 
+                 comparator,
             		 schemaFile.getName(nx),
             		 schemaFile.getSerializer(nx), 
             		 schemaFile.getCompressor(nx), 
@@ -1113,7 +1330,16 @@
     }
 
     /**
-     * Reopen an already created BasicTable for writing. Excepiton will be
+     * a wrapper to support backward compatible constructor
+     */
+    public Writer(Path path, String btSchemaString, String btStorageString,
+        Configuration conf) throws IOException {
+      this(path, btSchemaString, btStorageString, null, null, conf);
+    }
+
+    /**
+    /**
+     * Reopen an already created BasicTable for writing. Exception will be
      * thrown if the table is already closed, or is in the process of being
      * closed.
      */
@@ -1122,6 +1348,7 @@
         schemaFile = new SchemaFile(path, conf);
         int numCGs = schemaFile.getNumOfPhysicalSchemas();
         partition = schemaFile.getPartition();
+        sorted = schemaFile.isSorted();
         colGroups = new ColumnGroup.Writer[numCGs];
         cgTuples = new Tuple[numCGs];
         for (int nx = 0; nx < numCGs; nx++) {
@@ -1167,8 +1394,8 @@
      * make the table immutable.
      */
     public void finish() throws IOException {
-      if (closed) return;
-      closed = true;
+      if (finished) return;
+      finished = true;
       try {
         for (int nx = 0; nx < colGroups.length; nx++) {
           if (colGroups[nx] != null) {
@@ -1203,6 +1430,8 @@
     public void close() throws IOException {
       if (closed) return;
       closed = true;
+      if (!finished)
+        finish();
       try {
         for (int nx = 0; nx < colGroups.length; nx++) {
           if (colGroups[nx] != null) {
@@ -1237,6 +1466,22 @@
     public Schema getSchema() {
       return schemaFile.getLogical();
     }
+    
+    /**
+     * @return sortness
+     */
+    public boolean isSorted() {
+    	return sorted;
+    }
+
+    /**
+     * Get the list of sorted columns.
+     * @return the list of sorted columns
+     */
+    public SortInfo getSortInfo()
+    {
+      return schemaFile.getSortInfo();
+    }
 
     /**
      * Get a inserter with a given name.
@@ -1369,7 +1614,7 @@
           }
           if (finishWriter) {
             try {
-              BasicTable.Writer.this.close();
+              BasicTable.Writer.this.finish();
             }
             catch (Exception e) {
               // no-op
@@ -1401,6 +1646,7 @@
     Schema[] physical;
     Partition partition;
     boolean sorted;
+    SortInfo sortInfo = null;
     String storage;
     CGSchema[] cgschemas;
     
@@ -1419,17 +1665,21 @@
     }
 
     // ctor for writing
-    public SchemaFile(Path path, String btSchemaStr, String btStorageStr,
-        String btComparator, boolean sorted, Configuration conf)
+    public SchemaFile(Path path, String btSchemaStr, String btStorageStr, String sortColumns,
+        String btComparator, Configuration conf)
         throws IOException {
       storage = btStorageStr;
-      this.comparator = btComparator;
       try {
-        partition = new Partition(btSchemaStr, btStorageStr);
+        partition = new Partition(btSchemaStr, btStorageStr, btComparator, sortColumns);
       }
       catch (Exception e) {
         throw new IOException("Partition constructor failed :" + e.getMessage());
       }
+      this.sortInfo = partition.getSortInfo();
+      this.sorted = partition.isSorted();
+      this.comparator = (this.sortInfo == null ? null : this.sortInfo.getComparator());
+      if (this.comparator == null)
+        this.comparator = "";
       logical = partition.getSchema();
       cgschemas = partition.getCGSchemas();
       physical = new Schema[cgschemas.length];
@@ -1437,7 +1687,7 @@
         physical[nx] = cgschemas[nx].getSchema();
       }
       cgDeletedFlags = new boolean[physical.length];
-      this.sorted = sorted;
+
       version = SCHEMA_VERSION;
 
       // write out the schema
@@ -1456,6 +1706,10 @@
       return sorted;
     }
 
+    public SortInfo getSortInfo() {
+      return sortInfo;
+    }
+
     public Schema getLogical() {
       return logical;
     }
@@ -1480,9 +1734,9 @@
       return cgschemas[nx].getCompressor();
     }
 
-    /** 
-     * Returns the index for CG with the given name.
-     * -1 indicates that there is no CG with the name.
+    /**
+     * Returns the index for CG with the given name. -1 indicates that there is
+     * no CG with the name.
      */
     int getCGByName(String cgName) {
       for(int i=0; i<physical.length; i++) {
@@ -1538,6 +1792,15 @@
         WritableUtils.writeString(outSchema, physical[nx].toString());
       }
       WritableUtils.writeVInt(outSchema, sorted ? 1 : 0);
+      WritableUtils.writeVInt(outSchema, sortInfo == null ? 0 : sortInfo.size());
+      if (sortInfo != null && sortInfo.size() > 0)
+      {
+        String[] sortedCols = sortInfo.getSortColumnNames();
+        for (int i = 0; i < sortInfo.size(); i++)
+        {
+          WritableUtils.writeString(outSchema, sortedCols[i]);
+        }
+      }
       outSchema.close();
     }
 
@@ -1566,7 +1829,7 @@
       }
       storage = WritableUtils.readString(in);
       try {
-        partition = new Partition(logicalStr, storage);
+        partition = new Partition(logicalStr, storage, comparator);
       }
       catch (Exception e) {
         throw new IOException("Partition constructor failed :" + e.getMessage());
@@ -1589,9 +1852,48 @@
       }
       sorted = WritableUtils.readVInt(in) == 1 ? true : false;
       setCGDeletedFlags(path, conf);
+      if (version.compareTo(new Version((short)1, (short)0)) > 0)
+      {
+        int numSortColumns = WritableUtils.readVInt(in);
+        if (numSortColumns > 0)
+        {
+          String[] sortColumnStr = new String[numSortColumns];
+          for (int i = 0; i < numSortColumns; i++)
+          {
+            sortColumnStr[i] = WritableUtils.readString(in);
+          }
+          sortInfo = SortInfo.parse(SortInfo.toSortString(sortColumnStr), logical, comparator);
+        }
+      }
       in.close();
     }
 
+    private static int getNumCGs(Path path, Configuration conf) throws IOException {
+      Path pathSchema = makeSchemaFilePath(path);
+      if (!path.getFileSystem(conf).exists(pathSchema)) {
+        throw new IOException("BT Schema file doesn't exist: " + pathSchema);
+      }
+      // read schema file
+      FSDataInputStream in = path.getFileSystem(conf).open(pathSchema);
+      Version version = new Version(in);
+      // verify compatibility against SCHEMA_VERSION
+      if (!version.compatibleWith(SCHEMA_VERSION)) {
+        new IOException("Incompatible versions, expecting: " + SCHEMA_VERSION
+            + "; found in file: " + version);
+      }
+      
+      // read comparator
+      WritableUtils.readString(in);
+      // read logicalStr
+      WritableUtils.readString(in);
+      // read storage
+      WritableUtils.readString(in);
+      int numCGs = WritableUtils.readVInt(in);
+      in.close();
+
+      return numCGs;
+    }
+
     private static Path makeSchemaFilePath(Path parent) {
       return new Path(parent, BT_SCHEMA_FILE);
     }
@@ -1607,23 +1909,24 @@
       
       for (FileStatus file : path.getFileSystem(conf).listStatus(path)) {
         if (!file.isDir()) {
-           String fname =  file.getPath().getName();
-           if (fname.startsWith(DELETED_CG_PREFIX)) {
-             deletedCGs.add(fname.substring(DELETED_CG_PREFIX.length()));
-           }
+          String fname =  file.getPath().getName();
+          if (fname.startsWith(DELETED_CG_PREFIX)) {
+            deletedCGs.add(fname.substring(DELETED_CG_PREFIX.length()));
+          }
         }
       }
       
       for(int i=0; i<physical.length; i++) {
-        cgDeletedFlags[i] = 
-          deletedCGs.contains(getName(i));
+        cgDeletedFlags[i] = deletedCGs.contains(getName(i));
       }
     }
+    
+    
   }
 
   static public void dumpInfo(String file, PrintStream out, Configuration conf)
       throws IOException {
-      dumpInfo(file, out, conf, 0);
+    dumpInfo(file, out, conf, 0);
   }
 
   static public void dumpInfo(String file, PrintStream out, Configuration conf, int indent)
@@ -1633,10 +1936,25 @@
     Path path = new Path(file);
     try {
       BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+      String schemaStr = reader.getBTSchemaString();
+      String storageStr = reader.getStorageString();
       IOutils.indent(out, indent);
-      out.printf("Schema : %s\n", reader.getBTSchemaString());
+      out.printf("Schema : %s\n", schemaStr);
       IOutils.indent(out, indent);
-      out.printf("Storage Information : %s\n", reader.getStorageString());
+      out.printf("Storage Information : %s\n", storageStr);
+      SortInfo sortInfo = reader.getSortInfo();
+      if (sortInfo != null && sortInfo.size() > 0)
+      {
+        IOutils.indent(out, indent);
+        String[] sortedCols = sortInfo.getSortColumnNames();
+        out.println("Sorted Columns :");
+        for (int nx = 0; nx < sortedCols.length; nx++) {
+          if (nx > 0)
+            out.printf(" , ");
+          out.printf("%s", sortedCols[nx]);
+        }
+        out.printf("\n");
+      }
       IOutils.indent(out, indent);
       out.println("Column Groups within the Basic Table :");
       for (int nx = 0; nx < reader.colGroups.length; nx++) {

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTableStatus.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTableStatus.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTableStatus.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTableStatus.java Tue Nov 24 19:54:19 2009
@@ -22,7 +22,7 @@
 
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.file.tfile.Utils;
+import org.apache.hadoop.zebra.tfile.Utils;
 
 /**
  * Status of a BasicTable. The status may be reported under some projection.

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Tue Nov 24 19:54:19 2009
@@ -37,6 +37,8 @@
 import java.util.Random;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.fs.BlockLocation;
@@ -48,10 +50,10 @@
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.file.tfile.TFile;
-import org.apache.hadoop.io.file.tfile.Utils;
-import org.apache.hadoop.io.file.tfile.ByteArray;
-import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.zebra.tfile.TFile;
+import org.apache.hadoop.zebra.tfile.Utils;
+import org.apache.hadoop.zebra.tfile.ByteArray;
+import org.apache.hadoop.zebra.tfile.RawComparable;
 import org.apache.hadoop.zebra.types.CGSchema;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.types.Partition;
@@ -78,6 +80,8 @@
  *      </ul>
  */
 class ColumnGroup {
+  static Log LOG = LogFactory.getLog(ColumnGroup.class);
+  
   private final static String CONF_COMPRESS = "table.output.tfile.compression";
   private final static String DEFAULT_COMPRESS = "gz";
   private final static String CONF_MIN_BLOCK_SIZE = "table.tfile.minblock.size";
@@ -156,7 +160,10 @@
   static CGIndex buildIndex(FileSystem fs, Path path, boolean dirty,
       Configuration conf) throws IOException {
     CGIndex ret = new CGIndex();
-    FileStatus[] files = fs.listStatus(path, new CGPathFilter(conf));
+    CGPathFilter cgPathFilter = new CGPathFilter();
+    CGPathFilter.setConf(conf);
+    FileStatus[] files = fs.listStatus(path, cgPathFilter);
+    
     Comparator<RawComparable> comparator = null;
     for (FileStatus f : files) {
       if (dirty) {
@@ -194,9 +201,15 @@
     }
 
     ret.sort(comparator);
+    
+    int idx = 0;
+    for (CGIndexEntry e : ret.getIndex()) {
+      e.setIndex(idx++);    
+    }
+    
     return ret;
-  }
-
+  }   
+  
   /**
    * ColumnGroup reader.
    */
@@ -266,7 +279,8 @@
       }
       projection = new Projection(cgschema.getSchema()); // default projection to CG schema.
       Path metaFilePath = makeMetaFilePath(path);
-      if (!fs.exists(metaFilePath)) {
+      /* If index file is not existing or loading from an unsorted table. */
+      if (!fs.exists(metaFilePath) || !cgschema.isSorted() ) {
         // special case for unsorted CG that did not create index properly.
         if (cgschema.isSorted()) {
           throw new FileNotFoundException(
@@ -421,14 +435,31 @@
         throw new IllegalArgumentException("Illegal range split");
       }
 
-      if (split.len == 0) {
-        throw new IOException("Zero-length range split");
-      }
-
       return new CGScanner(split, closeReader);
     }
 
     /**
+     * Get a scanner that reads the rows defined by rowRange. 
+     * 
+     * @param closeReader
+     *          close the underlying Reader object when we close the scanner.
+     *          Should be set to true if we have only one scanner on top of the
+     *          reader, so that we should release resources after the scanner is
+     *          closed.
+     * @param rowSplit specifies part index, start row, and end row.
+     * @return A scanner object.
+     */
+    public synchronized TableScanner getScanner(boolean closeReader, 
+                                                CGRowSplit rowSplit)
+                        throws IOException, ParseException {
+      if (closed) {
+        throw new EOFException("Reader already closed");
+      }
+      
+      return new CGScanner(rowSplit, closeReader);
+    }
+     
+    /**
      * Given a split range, calculate how the file data that fall into the range
      * are distributed among hosts.
      * 
@@ -461,7 +492,69 @@
 
       return ret;
     }
+    
+    /**
+     * Given a row range, calculate how the file data that fall into the range
+     * are distributed among hosts.
+     * 
+     * @param split
+     *          The row-based split. If null, return all blocks.
+     * @return a map from host name to the amount of data (in bytes) the host
+     *         owns that fall roughly into the key range.
+     */
+    public BlockDistribution getBlockDistribution(CGRowSplit split)
+        throws IOException {
+      if (split == null) {
+        throw new IOException("Row-based split cannot be null for getBlockDistribution()");
+      }
+
+      BlockDistribution ret = new BlockDistribution();
+      CGIndexEntry entry = cgindex.get(split.fileIndex);
+      FileStatus tfileStatus = fs.getFileStatus(new Path(path, entry.getName())); 
+      
+      BlockLocation[] locations = fs.getFileBlockLocations(tfileStatus, split.startByte, split.numBytes);
+      for (BlockLocation l : locations) {
+        ret.add(l);
+      }
+      
+      return ret;
+    }
+
+   /**
+    * Sets startRow and number of rows in rowSplit based on
+    * startOffset and length.
+    * 
+    * It is assumed that 'startByte' and 'numBytes' in rowSplit itself
+    * are not valid.
+    */
+    void fillRowSplit(CGRowSplit rowSplit, long startOffset, long length) 
+                      throws IOException {
+
+      Path tfPath = new Path(path, cgindex.get(rowSplit.fileIndex).getName());
+      FileStatus tfile = fs.getFileStatus(tfPath);
+
+      TFile.Reader reader = null;
+      
+      try {
+        reader = new TFile.Reader(fs.open(tfPath),
+                                  tfile.getLen(), conf);
 
+        long startRow = reader.getRecordNumNear(startOffset);
+        long endRow = reader.getRecordNumNear(startOffset + length);
+
+        if (endRow < startRow) {
+          endRow = startRow;
+        }
+
+        rowSplit.startRow = startRow;
+        rowSplit.numRows = endRow - startRow;
+      } finally {
+        if (reader != null) {
+          reader.close();
+        }
+      }
+    }
+    
     /**
      * Get a sampling of keys and calculate how data are distributed among
      * key-partitioned buckets. The implementation attempts to calculate all
@@ -637,6 +730,32 @@
     }
 
     /**
+     * We already use FileInputFormat to create byte offset-based input splits.
+     * Their information is encoded in starts, lengths and paths. This method is 
+     * to wrap this information to form CGRowSplit objects at column group level.
+     * 
+     * @param starts array of starting byte of fileSplits.
+     * @param lengths array of length of fileSplits.
+     * @param paths array of path of fileSplits.
+     * @return A list of CGRowSplit objects. 
+     *         
+     */
+    public List<CGRowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths) throws IOException {
+      List<CGRowSplit> lst = new ArrayList<CGRowSplit>();
+       
+      for (int i=0; i<starts.length; i++) {
+        long start = starts[i];
+        long length = lengths[i];
+        Path path = paths[i];
+        int idx = cgindex.getFileIndex(path);        
+        
+        lst.add(new CGRowSplit(idx, start, length));
+      }
+      
+      return lst;
+    } 
+    
+    /**
      * Is the ColumnGroup sorted?
      * 
      * @return Whether the ColumnGroup is sorted.
@@ -663,8 +782,9 @@
       TFile.Reader.Scanner scanner;
       TupleReader tupleReader;
 
-      TFileScanner(FileSystem fs, Path path, RawComparable begin,
-          RawComparable end, CGSchema cgschema, Projection projection,
+      TFileScanner(FileSystem fs, Path path, CGRowSplit rowRange, 
+                    RawComparable begin, RawComparable end, 
+                    CGSchema cgschema, Projection projection,
           Configuration conf) throws IOException, ParseException {
         try {
           ins = fs.open(path);
@@ -672,7 +792,17 @@
            * compressor is inside cgschema
            */
           reader = new TFile.Reader(ins, fs.getFileStatus(path).getLen(), conf);
-          scanner = reader.createScanner(begin, end);
+          if (rowRange != null && rowRange.fileIndex >= 0) {
+            scanner = reader.createScannerByRecordNum(rowRange.startRow, 
+                                         rowRange.startRow + rowRange.numRows);
+          } else {
+            /* using deprecated API just so that zebra can work with 
+             * hadoop jar that does not contain HADOOP-6218 (Record ids for
+             * TFile). This is expected to be temporary. Later we should 
+             * use the undeprecated API.
+             */
+            scanner = reader.createScanner(begin, end);
+          }
           /*
            * serializer is inside cgschema: different serializer will require
            * different Reader: for pig, it's TupleReader
@@ -795,9 +925,29 @@
           beginIndex = split.start;
           endIndex = split.start + split.len;
         }
-        init(null, null, closeReader);
+        init(null, null, null, closeReader);
       }
-
+      
+      /**
+       * Scanner for a range specified by the given row range.
+       * 
+       * @param rowRange see {@link CGRowSplit}
+       * @param closeReader
+       */
+      CGScanner(CGRowSplit rowRange, boolean closeReader) 
+                 throws IOException, ParseException {
+        beginIndex = 0;
+        endIndex = cgindex.size();
+        if (rowRange != null && rowRange.fileIndex>= 0) {
+          if (rowRange.fileIndex >= cgindex.size()) {
+            throw new IllegalArgumentException("Part Index is out of range.");
+          }
+          beginIndex = rowRange.fileIndex;
+          endIndex = beginIndex+1;
+        }
+        init(rowRange, null, null, closeReader);
+      }
+      
       CGScanner(RawComparable beginKey, RawComparable endKey,
           boolean closeReader) throws IOException, ParseException {
         beginIndex = 0;
@@ -811,11 +961,12 @@
             ++endIndex;
           }
         }
-        init(beginKey, endKey, closeReader);
+        init(null, beginKey, endKey, closeReader);
       }
 
-      private void init(RawComparable beginKey, RawComparable endKey,
-          boolean doClose) throws IOException, ParseException {
+      private void init(CGRowSplit rowRange, RawComparable beginKey, 
+                        RawComparable endKey, boolean doClose) 
+             throws IOException, ParseException {
         if (beginIndex > endIndex) {
           throw new IllegalArgumentException("beginIndex > endIndex");
         }
@@ -827,8 +978,9 @@
             RawComparable begin = (i == beginIndex) ? beginKey : null;
             RawComparable end = (i == endIndex - 1) ? endKey : null;
             TFileScanner scanner =
-                new TFileScanner(fs, cgindex.getPath(i, path), begin, end,
-                    cgschema, logicalSchema, conf);
+                new TFileScanner(fs, cgindex.getPath(i, path), rowRange, 
+                                 begin, end,
+                                 cgschema, logicalSchema, conf);
             // skip empty scanners.
             if (!scanner.atEnd()) {
               tmpScanners.add(scanner);
@@ -975,7 +1127,55 @@
         Utils.writeVInt(out, len);
       }
     }
+    
+    public static class CGRowSplit implements Writable {
+      int fileIndex = -1;
+      long startByte = -1;
+      long numBytes = -1;
+      long startRow = -1;
+      long numRows = -1;
+
+      CGRowSplit(int fileIdx, long start, long len) {
+        this.fileIndex = fileIdx;
+        this.startByte = start;
+        this.numBytes = len;
+      }
+
+      public CGRowSplit() {
+        // no-op;
+      }
+      
+      @Override
+      public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{fileIndex = " + fileIndex + "}\n");       
+        sb.append("{startByte = " + startByte + "}\n");
+        sb.append("{numBytes = " + numBytes + "}\n");
+        sb.append("{startRow = " + startRow + "}\n");
+        sb.append("{numRows = " + numRows + "}\n");
+        
+        return sb.toString();
+      }
+
+      @Override
+      public void readFields(DataInput in) throws IOException {
+        fileIndex = Utils.readVInt(in);
+        startByte = Utils.readVLong(in);
+        numBytes = Utils.readVLong(in);
+        startRow = Utils.readVLong(in);
+        numRows = Utils.readVLong(in);
+      }
 
+      @Override
+      public void write(DataOutput out) throws IOException {
+        Utils.writeVInt(out, fileIndex);
+        Utils.writeVLong(out, startByte);
+        Utils.writeVLong(out, numBytes);
+        Utils.writeVLong(out, startRow);
+        Utils.writeVLong(out, numRows);
+      }      
+    }
+    
     private static class SplitColumn {
       SplitColumn(Partition.SplitType st) {
         this.st = st;
@@ -1051,7 +1251,7 @@
     Configuration conf;
     FileSystem fs;
     CGSchema cgschema;
-    private boolean finished;
+    private boolean finished, closed;
 
     /**
      * Create a ColumnGroup writer. The semantics are as follows:
@@ -1095,11 +1295,25 @@
     public Writer(Path path, String schema, boolean sorted, String name, String serializer,
         String compressor, String owner, String group, short perm,boolean overwrite, Configuration conf)
         throws IOException, ParseException {
-      this(path, new Schema(schema), sorted, name, serializer, compressor, owner, group, perm, overwrite,
+      this(path, new Schema(schema), sorted, null, name, serializer, compressor, owner, group, perm, overwrite,
           conf);
     }
     
     public Writer(Path path, Schema schema, boolean sorted, String name, String serializer,
+        String compressor, String owner, String group, short perm,boolean overwrite, Configuration conf)
+        throws IOException, ParseException {
+      this(path, schema, sorted, null, name, serializer, compressor, owner, group, perm, overwrite,
+          conf);
+    }
+
+    public Writer(Path path, String schema, boolean sorted, String comparator, String name, String serializer,
+        String compressor, String owner, String group, short perm,boolean overwrite, Configuration conf)
+        throws IOException, ParseException {
+      this(path, new Schema(schema), sorted, comparator, name, serializer, compressor, owner, group, perm, overwrite,
+          conf);
+    }
+
+    public Writer(Path path, Schema schema, boolean sorted, String comparator, String name, String serializer,
         String compressor, String owner, String group, short perm, boolean overwrite, Configuration conf)
         throws IOException, ParseException {
       this.path = path;
@@ -1118,7 +1332,7 @@
 
       checkPath(path, true);
 
-      cgschema = new CGSchema(schema, sorted, name, serializer, compressor, owner, group, perm);
+      cgschema = new CGSchema(schema, sorted, comparator, name, serializer, compressor, owner, group, perm);
       CGSchema sfNew = CGSchema.load(fs, path);
       if (sfNew != null) {
         // compare input with on-disk schema.
@@ -1162,7 +1376,10 @@
     @Override
     public void close() throws IOException {
       if (!finished) {
-        finished = true;
+        finish();
+      }
+      if (!closed) {
+        closed = true;
         createIndex();
       }
     }
@@ -1196,13 +1413,23 @@
     private void createIndex() throws IOException {
       MetaFile.Writer metaFile =
           MetaFile.createWriter(makeMetaFilePath(path), conf);
-      CGIndex index = buildIndex(fs, path, false, conf);
-      DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
-      try {
-        index.write(dos);
-      }
-      finally {
-        dos.close();
+      if (cgschema.isSorted()) {
+        CGIndex index = buildIndex(fs, path, false, conf);
+        DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
+        try {
+          index.write(dos);
+        }
+        finally {
+          dos.close();
+        }
+      } else { /* Create an empty data meta file for unsorted table. */
+        DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
+        try {
+          Utils.writeString(dos, "");
+        } 
+        finally {
+          dos.close();
+        }
       }
       metaFile.close();
     }
@@ -1429,18 +1656,19 @@
    * name, first and last key (inclusive) of a data file
    */
   static class CGIndexEntry implements RawComparable, Writable {
+    int index;
     String name;
     long rows;
     RawComparable firstKey;
     RawComparable lastKey;
 
     // for reading
-    CGIndexEntry() {
+    public CGIndexEntry() {
       // no-op
     }
 
     // for writing
-    CGIndexEntry(String name, long rows, RawComparable firstKey,
+    public CGIndexEntry(String name, long rows, RawComparable firstKey,
         RawComparable lastKey) {
       this.name = name;
       this.rows = rows;
@@ -1448,6 +1676,10 @@
       this.lastKey = lastKey;
     }
 
+    public int getIndex() {
+      return index;
+    }
+    
     public String getName() {
       return name;
     }
@@ -1463,6 +1695,10 @@
     public RawComparable getLastKey() {
       return lastKey;
     }
+    
+    void setIndex (int idx) {
+      this.index = idx;
+    }
 
     @Override
     public byte[] buffer() {
@@ -1525,6 +1761,16 @@
       status = new BasicTableStatus();
       index = new ArrayList<CGIndexEntry>();
     }
+    
+    int getFileIndex(Path path) throws IOException {
+      String filename = path.getName();
+      for (CGIndexEntry cgie : index) {
+        if (cgie.getName().equals(filename)) {
+          return cgie.getIndex(); 
+        }
+      }
+      throw new IOException("File " + filename + " is not in the column group index"); 
+    }
 
     int size() {
       return index.size();
@@ -1679,16 +1925,17 @@
     }
   }
 
-  static class CGPathFilter implements PathFilter {
-    private final Configuration conf;
-
-    CGPathFilter(Configuration conf) {
-      this.conf = conf;
+  public static class CGPathFilter implements PathFilter {
+    private static Configuration conf;
+   
+    public static void setConf(Configuration c) {
+      conf = c;
     }
 
     public boolean accept(Path p) {
       return p.getName().equals(META_FILE) || p.getName().equals(SCHEMA_FILE)
           || p.getName().startsWith(".tmp.")
+          || p.getName().startsWith("ttt")
           || p.getName().startsWith(getNonDataFilePrefix(conf)) ? false : true;
     }
   }

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java Tue Nov 24 19:54:19 2009
@@ -21,7 +21,18 @@
 import java.io.DataOutputStream;
 import java.io.PrintStream;
 
+/**
+ * Helper for Zebra I/O
+ */
 public class IOutils {
+  /**
+   * indent of some spaces
+   *
+   * @param os
+   *          print stream the indent space to be inserted
+   * @param amount
+   *          the number of spaces to be indented
+   */
   public static void indent(PrintStream os, int amount)
   {
     for (int i = 0; i < amount; i++)

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java Tue Nov 24 19:54:19 2009
@@ -23,7 +23,8 @@
 import java.util.TreeMap;
 
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.zebra.tfile.RawComparable;
+import org.apache.hadoop.zebra.tfile.ByteArray;
 
 /**
  * Class used to convey the information of how on-disk data are distributed
@@ -149,7 +150,8 @@
    * Get the block distribution of all data that maps to the key bucket.
    */
   public BlockDistribution getBlockDistribution(BytesWritable key) {
-    BlockDistribution bInfo = data.get(key);
+    ByteArray key0 = new ByteArray(key.get(), 0, key.getSize());
+    BlockDistribution bInfo = data.get(key0);
     if (bInfo == null) {
       throw new IllegalArgumentException("Invalid key");
     }

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java Tue Nov 24 19:54:19 2009
@@ -28,9 +28,9 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.file.tfile.TFile;
-import org.apache.hadoop.io.file.tfile.MetaBlockAlreadyExists;
-import org.apache.hadoop.io.file.tfile.MetaBlockDoesNotExist;
+import org.apache.hadoop.zebra.tfile.TFile;
+import org.apache.hadoop.zebra.tfile.MetaBlockAlreadyExists;
+import org.apache.hadoop.zebra.tfile.MetaBlockDoesNotExist;
 
 
 /**

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java Tue Nov 24 19:54:19 2009
@@ -93,7 +93,6 @@
   
   /**
    * Get the projection's schema
-   * @return
    */
   public Schema getSchema();
 }

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java Tue Nov 24 19:54:19 2009
@@ -16,6 +16,6 @@
  */
 
 /**
- * Physical I/O management of Hadoop Tables.
+ * Physical I/O management of Hadoop Zebra Tables.
  */
 package org.apache.hadoop.zebra.io;

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java Tue Nov 24 19:54:19 2009
@@ -30,8 +30,11 @@
 import org.apache.hadoop.zebra.io.TableInserter;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.types.Partition;
+import org.apache.hadoop.zebra.types.SortInfo;
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.pig.data.Tuple;
+import org.apache.hadoop.zebra.pig.comparator.*;
+
 
 /**
  * {@link org.apache.hadoop.mapred.OutputFormat} class for creating a
@@ -113,7 +116,10 @@
   private static final String OUTPUT_SCHEMA = "mapred.lib.table.output.schema";
   private static final String OUTPUT_STORAGEHINT =
       "mapred.lib.table.output.storagehint";
-  private static final String OUTPUT_SORTED = "mapred.lib.table.output.sorted";
+  private static final String OUTPUT_SORTCOLUMNS =
+      "mapred.lib.table.output.sortcolumns";
+  private static final String OUTPUT_COMPARATOR =
+      "mapred.lib.table.output.comparator";
 
   /**
    * Set the output path of the BasicTable in JobConf
@@ -174,10 +180,66 @@
     return new Schema(schema);
   }
 
+  private static KeyGenerator makeKeyBuilder(byte[] elems) {
+	    ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
+	    for (int i = 0; i < elems.length; ++i) {
+	      exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
+	    }
+	    return new KeyGenerator(ExprUtils.tupleComparator(exprs));
+  }
+
+  /**
+   * Generates a zebra specific sort key generator which is used to generate BytesWritable key 
+   * Sort Key(s) are used to generate this object
+   * 
+   * @param conf
+   *          The JobConf object.
+   * @return Object of type zebra.pig.comaprator.KeyGenerator. 
+   *         
+   */
+  public static Object getSortKeyGenerator(JobConf conf) throws IOException, ParseException {
+
+    SortInfo sortInfo = getSortInfo(conf);
+    Schema schema     = getSchema(conf);
+    String[] sortColNames = sortInfo.getSortColumnNames();
+
+    byte[] types = new byte[sortColNames.length];
+    for(int i =0 ; i < sortColNames.length; ++i){
+      types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
+    }
+    KeyGenerator builder = makeKeyBuilder(types);
+    return builder;
+
+  }
+
+
+  /**
+   * Generates a BytesWritable key for the input key
+   * using keygenerate provided. Sort Key(s) are used to generate this object
+   *
+   * @param builder
+   *         Opaque key generator created by getSortKeyGenerator() method
+   * @param t
+   *         Tuple to create sort key from
+   * @return ByteWritable Key 
+   *
+   */
+  public static BytesWritable getSortKey(Object builder, Tuple t) throws Exception {
+	  KeyGenerator kg = (KeyGenerator) builder;
+	  return kg.generateKey(t);
+  }
+
+
+
+
   /**
    * Set the table storage hint in JobConf, should be called after setSchema is
    * called.
+   * <br> <br>
    * 
+   * Note that the "secure by" feature is experimental now and subject to
+   * changes in the future.
+   *
    * @param conf
    *          The JobConf object.
    * @param storehint
@@ -194,7 +256,7 @@
       throw new ParseException("Schema has not been set");
 
     // for sanity check purpose only
-    Partition partition = new Partition(schema, storehint);
+    Partition partition = new Partition(schema, storehint, null);
 
     conf.set(OUTPUT_STORAGEHINT, storehint);
   }
@@ -214,34 +276,84 @@
   }
 
   /**
-   * Set sorted-ness of the table. It is disabled now (by making it package
-   * private). So only unsorted BasicTables may be created for now.
-   * 
-   * TODO: must also allow users to specify customized comparator.
+   * Set the sort info
+   *
+   * @param conf
+   *          The JobConf object.
+   *          
+   * @param sortColumns
+   *          Comma-separated sort column names
+   *          
+   * @param comparator
+   *          comparator class name; null for default
+   *
+   */
+  public static void setSortInfo(JobConf conf, String sortColumns, String comparator) {
+    conf.set(OUTPUT_SORTCOLUMNS, sortColumns);
+    conf.set(OUTPUT_COMPARATOR, comparator);
+  }
+
+  /**
+   * Set the sort info
+   *
+   * @param conf
+   *          The JobConf object.
+   *          
+   * @param sortColumns
+   *          Comma-separated sort column names
    */
-  public static void setSorted(JobConf conf, boolean sorted) {
-    conf.setBoolean(OUTPUT_SORTED, sorted);
+  public static void setSortInfo(JobConf conf, String sortColumns) {
+	  conf.set(OUTPUT_SORTCOLUMNS, sortColumns);
+  }
+  
+  /**
+   * Get the SortInfo object 
+   *
+   * @param conf
+   *          The JobConf object.
+   * @return SortInfo object; null if the Zebra table is unsorted 
+   *
+   */
+  public static SortInfo getSortInfo(JobConf conf)throws IOException
+  {
+    String sortColumns = conf.get(OUTPUT_SORTCOLUMNS);
+    if (sortColumns == null)
+    	return null;
+    Schema schema = null;
+    try {
+      schema = getSchema(conf);
+    } catch (ParseException e) {
+    	throw new IOException("Schema parsing failure : "+e.getMessage());
+    }
+    if (schema == null)
+    	throw new IOException("Schema not defined");
+    String comparator = getComparator(conf);
+    return SortInfo.parse(sortColumns, schema, comparator);
   }
 
   /**
-   * Is the table to be created should be sorted? It is disabled now (by making
-   * it package private).
+   * Get the  comparator for sort columns
+   *
+   * @param conf
+   *          The JobConf object.
+   * @return  comparator String
+   *
    */
-  static boolean getSorted(JobConf conf) {
-    return conf.getBoolean(OUTPUT_SORTED, false);
+  private static String getComparator(JobConf conf)
+  {
+    return conf.get(OUTPUT_COMPARATOR);
   }
 
   /**
    * Get the output table as specified in JobConf. It is useful for applications
    * to add more meta data after all rows have been added to the table.
-   * Currently it is disabled (by setting it to package private).
    * 
    * @param conf
    *          The JobConf object.
    * @return The output BasicTable.Writer object.
    * @throws IOException
    */
-  public static BasicTable.Writer getOutput(JobConf conf) throws IOException {
+  private static BasicTable.Writer getOutput(JobConf conf) throws IOException {
     String path = conf.get(OUTPUT_PATH);
     if (path == null) {
       throw new IllegalArgumentException("Cannot find output path");
@@ -268,16 +380,17 @@
     if (schema == null) {
       throw new IllegalArgumentException("Cannot find output schema");
     }
-    String storehint;
+    String storehint, sortColumns, comparator;
     try {
       storehint = getStorageHint(conf);
+      sortColumns = (getSortInfo(conf) == null ? null : SortInfo.toSortString(getSortInfo(conf).getSortColumnNames()));
+      comparator = getComparator(conf);
     }
     catch (ParseException e) {
       throw new IOException(e);
     }
     BasicTable.Writer writer =
-        new BasicTable.Writer(new Path(path), schema, storehint,
-            getSorted(conf), conf); // will
+        new BasicTable.Writer(new Path(path), schema, storehint, sortColumns, comparator, conf);
 
     writer.finish();
   }
@@ -299,14 +412,13 @@
 
   /**
    * Close the output BasicTable, No more rows can be added into the table. A
-   * BasicTable is not visible for reading until it is "closed". This call is
-   * required for sorted TFile, but not required for unsorted TFile.
+   * BasicTable is not visible for reading until it is "closed".
    * 
    * @param conf
    *          The JobConf object.
    * @throws IOException
    */
-  static void close(JobConf conf) throws IOException {
+  public static void close(JobConf conf) throws IOException {
     BasicTable.Writer table = getOutput(conf);
     table.close();
   }

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/CachedTableScanner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/CachedTableScanner.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/CachedTableScanner.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/CachedTableScanner.java Tue Nov 24 19:54:19 2009
@@ -36,6 +36,7 @@
   private Tuple row;
   private boolean keyReady;
   private boolean rowReady;
+  private int index;
   private TableScanner scanner;
 
   /**
@@ -45,11 +46,12 @@
    *          The scanner to be encapsulated
    * @throws IOException 
    */
-  public CachedTableScanner(TableScanner scanner) throws IOException {
+  public CachedTableScanner(TableScanner scanner, int index) throws IOException {
     key = new BytesWritable();
     row = TypesUtils.createTuple(Projection.getNumColumns(scanner.getProjection()));
     keyReady = false;
     rowReady = false;
+    this.index = index;
     this.scanner = scanner;
   }
 
@@ -82,6 +84,15 @@
   }
 
   /**
+   * Get the table index in a union
+   * 
+   * @return the table index in union
+   */
+  public int getIndex() {
+    return index;
+    
+  }
+  /**
    * Seek to a row whose key is greater than or equal to the input key.
    * 
    * @param inKey

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java Tue Nov 24 19:54:19 2009
@@ -34,6 +34,7 @@
  * Table Expression - expression to describe an input table.
  */
 abstract class TableExpr {
+  private boolean sorted = false;
   /**
    * Factory method to create a TableExpr from a string.
    * 
@@ -132,6 +133,26 @@
   }
   
   /**
+   * Get a scanner with an unsorted split.
+   * 
+   * @param split
+   *          The range split.
+   * @param projection
+   *          The projection schema. It should never be null.
+   * @param conf
+   *          The configuration
+   * @return A table scanner.
+   * @throws IOException
+   */
+  public TableScanner getScanner(RowTableSplit split, String projection,
+      Configuration conf) throws IOException, ParseException, ParseException {
+    BasicTable.Reader reader =
+        new BasicTable.Reader(new Path(split.getPath()), conf);
+    reader.setProjection(projection);
+    return reader.getScanner(true, split.getSplit());
+  }
+  
+  /**
    * A leaf table corresponds to a materialized table. It is represented by the
    * path to the BasicTable and the projection.
    */
@@ -179,7 +200,14 @@
    * @return Whether this expression may only be split by key.
    */
   public boolean sortedSplitRequired() {
-    return false;
+    return sorted;
+  }
+
+  /**
+   * Set the requirement for sorted table
+   */
+  public void setSortedSplit() {
+    sorted = true;
   }
 
   /**