You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/11/21 16:36:14 UTC

svn commit: r882929 [1/7] - in /hadoop/pig/branches/branch-0.6/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/ src/java/org/apache/hadoop/zebra/schema/ src/java/org/apache/hadoop/zebra/tfile/ src/java/org...

Author: gates
Date: Sat Nov 21 15:36:12 2009
New Revision: 882929

URL: http://svn.apache.org/viewvc?rev=882929&view=rev
Log:
PIG-1077 Support record(row)-based file split in Zebra's TableInputFormat (chaow via gates)

Added:
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BCFile.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedByteArrayOutputStream.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedRangeFileInputStream.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/ByteArray.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Chunk.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/CompareUtils.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Compression.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/MetaBlockAlreadyExists.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/MetaBlockDoesNotExist.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/RawComparable.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/SimpleBufferedOutputStream.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFile.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFileDumper.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Utils.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTfileSplit.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/KVGenerator.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/KeySampler.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/NanoTimer.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/RandomDistribution.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFile.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileByteArrays.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileComparators.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileJClassComparatorByteArrays.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileLzoCodecsByteArrays.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileLzoCodecsStreams.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileNoneCodecsByteArrays.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileNoneCodecsStreams.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileSeek.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileSeqFileComparison.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileSplit.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileStreams.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestTFileUnsortedByteArrays.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/TestVLong.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/tfile/Timer.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageGrammar.java
Modified:
    hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt
    hadoop/pig/branches/branch-0.6/contrib/zebra/build.xml
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTableStatus.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName1.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName2.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName3.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName4.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestMap.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestMapOfRecord.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestNegative.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestRecord.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestSimple.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ArticleGenerator.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/Dictionary.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMRSample2.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TableMapReduceExample.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestCheckin.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestUnionMixedTypes.java

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt?rev=882929&r1=882928&r2=882929&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt Sat Nov 21 15:36:12 2009
@@ -6,6 +6,9 @@
 
   IMPROVEMENTS
 
+    PIG-1077 Support record(row)-based file split in Zebra's
+             TableInputFormat (chaow via gates)
+
     PIG-1089 Use Pig's version for Zebra's own versi (chaow via olgan)
 
     PIG-1069  Order Preserving Sorted Table Union (yanz via gates)

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/build.xml?rev=882929&r1=882928&r2=882929&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/build.xml (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/build.xml Sat Nov 21 15:36:12 2009
@@ -34,6 +34,10 @@
   	<manifest>
 	    <attribute name="Main-Class" value="org.apache.hadoop.zebra.io.BasicTable"/>
 	</manifest>
+
+        <zipfileset src="${pig.root}/lib/${hadoop.jarfile}">
+            <include name="${hadoop.includes}" />
+        </zipfileset>
     </jar>
   </target>
 

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=882929&r1=882928&r2=882929&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Sat Nov 21 15:36:12 2009
@@ -40,15 +40,18 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.file.tfile.TFile;
-import org.apache.hadoop.io.file.tfile.Utils;
-import org.apache.hadoop.io.file.tfile.MetaBlockAlreadyExists;
-import org.apache.hadoop.io.file.tfile.MetaBlockDoesNotExist;
-import org.apache.hadoop.io.file.tfile.Utils.Version;
+import org.apache.hadoop.zebra.tfile.TFile;
+import org.apache.hadoop.zebra.tfile.Utils;
+import org.apache.hadoop.zebra.tfile.MetaBlockAlreadyExists;
+import org.apache.hadoop.zebra.tfile.MetaBlockDoesNotExist;
+import org.apache.hadoop.zebra.tfile.Utils.Version;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRangeSplit;
+import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRowSplit;
 import org.apache.hadoop.zebra.types.CGSchema;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.types.Partition;
@@ -359,6 +362,13 @@
     {
       return schemaFile.getSortInfo();
     }
+    
+    /**
+     * @return the name of i-th column group 
+     */
+    public String getName(int i) {
+      return schemaFile.getName(i);
+    }
 
     /**
      * Set the projection for the reader. This will affect calls to
@@ -418,13 +428,31 @@
       BlockDistribution bd = new BlockDistribution();
       for (int nx = 0; nx < colGroups.length; nx++) {
         if (!isCGDeleted(nx)) {
-          bd.add(colGroups[nx].getBlockDistribution(split == null ? null : split
-            .get(nx)));
+          bd.add(colGroups[nx].getBlockDistribution(split == null ? null : split.getCGRangeSplit()));
         }
       }
       return bd;
     }
 
+
+    /**
+     * Given a row-based split, calculate how the file data that fall into the split
+     * are distributed among hosts.
+     * 
+     * @param split The row-based split. <i>Cannot</i> be null.
+     * @return An object that conveys how blocks fall into the split are
+     *         distributed across hosts.
+     * @see #rowSplit(int)
+     */
+    public BlockDistribution getBlockDistribution(RowSplit split)
+        throws IOException {
+      BlockDistribution bd = new BlockDistribution();
+      int cgIdx = split.getCGIndex();      
+      bd.add(colGroups[cgIdx].getBlockDistribution(split.getCGRowSplit()));
+      
+      return bd;
+    }
+
     /**
      * Collect some key samples and use them to partition the table. Only
      * applicable to sorted BasicTable. The returned {@link KeyDistribution}
@@ -502,6 +530,26 @@
     }
 
     /**
+     * Get a scanner that reads a consecutive number of rows as defined in the
+     * {@link RowSplit} object.
+     * 
+     * @param closeReader
+     *          close the underlying Reader object when we close the scanner.
+     *          Should be set to true if we have only one scanner on top of the
+     *          reader, so that we should release resources after the scanner is
+     *          closed.
+     * @param rowSplit split based on row numbers.
+     * 
+     * @return A scanner object.
+     * @throws IOException
+     */
+    public synchronized TableScanner getScanner(boolean closeReader,
+                                                RowSplit rowSplit) 
+      throws IOException, ParseException, ParseException {
+      checkInferredMapping();
+      return new BTScanner(rowSplit, closeReader, partition);
+    }
+    /**
      * Get the schema of the table. The schema may be different from
      * {@link BasicTable.Reader#getSchema(Path, Configuration)} if a projection
      * has been set on the table.
@@ -535,61 +583,106 @@
     public String getPath() {
       return path.toString();
     }
+    
+    /**
+     * Get the path filter used by the table.
+     */
+    public PathFilter getPathFilter(Configuration conf) {
+      ColumnGroup.CGPathFilter filter = new ColumnGroup.CGPathFilter();
+      ColumnGroup.CGPathFilter.setConf(conf);
+      return filter;
+    }
 
     /**
      * Split the table into at most n parts.
      * 
-     * @param n
-     *          Maximum number of parts in the output list.
+     * @param n Maximum number of parts in the output list.
      * @return A list of RangeSplit objects, each of which can be used to
      *         construct TableScanner later.
      */
-    @SuppressWarnings("unchecked")
     public List<RangeSplit> rangeSplit(int n) throws IOException {
-      // assume all CGs will be split into the same number of horizontal
-      // slices
-      List<CGRangeSplit>[] cgSplitsAll = new ArrayList[colGroups.length];
-      // split each CG
-      for (int nx = 0; nx < colGroups.length; nx++) {
-        if (!isCGDeleted(nx))
-          cgSplitsAll[nx] = colGroups[nx].rangeSplit(n);
-      }
-
-      // verify all CGs have same number of slices
-      int numSlices = -1;
-      for (int nx = 0; nx < cgSplitsAll.length; nx++) {
-        if (isCGDeleted(nx)) {
-          continue;
-        }
-        if (numSlices < 0) {
-          numSlices = cgSplitsAll[nx].size();
-        }
-        else if (cgSplitsAll[nx].size() != numSlices) {
-          throw new IOException(
-              "BasicTable's column groups were not equally split.");
+      // use the first non-deleted column group to do split, other column groups will be split exactly the same way.
+      List<RangeSplit> ret;
+      if (firstValidCG >= 0) {
+        List<CGRangeSplit> cgSplits = colGroups[firstValidCG].rangeSplit(n);
+        int numSlices = cgSplits.size();
+        ret = new ArrayList<RangeSplit>(numSlices);
+        for (int slice = 0; slice < numSlices; slice++) {
+          CGRangeSplit oneSliceSplit = cgSplits.get(slice);
+          ret.add(new BasicTable.Reader.RangeSplit(oneSliceSplit));
         }
+
+        return ret;
+      } else { // all column groups are dropped.
+        ret = new ArrayList<RangeSplit>(1);
+        // add a dummy split
+        ret.add(new BasicTable.Reader.RangeSplit(new CGRangeSplit(0, 0)));
+        return ret;
       }
-      if (numSlices <= 0) {
-        // This could happen because of various reasons.
-        // One possibility is that all the CGs are deleted.
-        numSlices = 1;
-      }
-      // return horizontal slices as RangeSplits
-      List<RangeSplit> ret = new ArrayList<RangeSplit>(numSlices);
+    }
+
+    /**
+     * We already use FileInputFormat to create byte offset-based input splits.
+     * Their information is encoded in starts, lengths and paths. This method is 
+     * to wrap this information to form RowSplit objects at basic table level.
+     * 
+     * @param starts array of starting byte of fileSplits.
+     * @param lengths array of length of fileSplits.
+     * @param paths array of path of fileSplits.
+     * @param splitCGIndex index of column group that is used to create fileSplits.
+     * @return A list of RowSplit objects, each of which can be used to
+     *         construct a TableScanner later. 
+     *         
+     */
+    public List<RowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths, int splitCGIndex) throws IOException {
+      List<RowSplit> ret; 
+      
+      List<CGRowSplit> cgSplits = colGroups[splitCGIndex].rowSplit(starts, lengths, paths);
+      int numSlices = cgSplits.size();
+      ret = new ArrayList<RowSplit>(numSlices);
       for (int slice = 0; slice < numSlices; slice++) {
-        CGRangeSplit[] oneSliceSplits = new CGRangeSplit[cgSplitsAll.length];
-        for (int cgIndex = 0; cgIndex < cgSplitsAll.length; cgIndex++) {
-          if (isCGDeleted(cgIndex)) {
-            // set a dummy split
-            oneSliceSplits[cgIndex] = new CGRangeSplit(0, 0);
-          } else {
-            oneSliceSplits[cgIndex] = cgSplitsAll[cgIndex].get(slice);
-          }
-        }
-        ret.add(new BasicTable.Reader.RangeSplit(oneSliceSplits));
+        CGRowSplit cgRowSplit = cgSplits.get(slice);
+        ret.add(new BasicTable.Reader.RowSplit(splitCGIndex, cgRowSplit));
       }
+        
       return ret;
     }
+    
+
+    /** 
+     * Get index of the column group that will be used for row-based split. 
+     * 
+     */
+    public int getRowSplitCGIndex() {
+      // Try to find the largest non-deleted and used column group by projection;
+      int largestCGIndex = -1;
+      int splitCGIndex = -1;
+      long largestCGSize = -1;
+      for (int i=0; i<colGroups.length; i++) {
+        if (!partition.isCGNeeded(i) || isCGDeleted(i)) {
+          continue;
+        }
+        ColumnGroup.Reader reader = colGroups[i];
+        BasicTableStatus btStatus = reader.getStatus();
+        long size = btStatus.getSize();
+        if (size > largestCGSize) {
+          largestCGIndex = i;
+          largestCGSize = size;
+        }
+      }
+     
+      /* We do have a largest non-deleted and used column group,
+      and we use it to do split. */
+      if (largestCGIndex >= 0) { 
+        splitCGIndex = largestCGIndex;
+      } else if (firstValidCG >= 0) { /* If all projection columns are either deleted or non-existing,
+                                      then we use the first non-deleted column group to do split if it exists. */
+        splitCGIndex = firstValidCG; 
+      } 
+     
+      return splitCGIndex;
+    }
+
 
     /**
      * Close the BasicTable for reading. Resources are released.
@@ -671,10 +764,11 @@
      * implementation-dependent.
      */
     public static class RangeSplit implements Writable {
-      CGRangeSplit[] slice;
+      //CGRangeSplit[] slice;
+      CGRangeSplit slice;
 
-      RangeSplit(CGRangeSplit[] splits) {
-        slice = splits;
+      RangeSplit(CGRangeSplit split) {
+        slice = split;
       }
 
       /**
@@ -689,12 +783,10 @@
        */
       @Override
       public void readFields(DataInput in) throws IOException {
-        int count = Utils.readVInt(in);
-        slice = new CGRangeSplit[count];
-        for (int nx = 0; nx < count; nx++) {
+        for (int nx = 0; nx < 1; nx++) {
           CGRangeSplit cgrs = new CGRangeSplit();
           cgrs.readFields(in);
-          slice[nx] = cgrs;
+          slice = cgrs;
         }
       }
 
@@ -703,18 +795,81 @@
        */
       @Override
       public void write(DataOutput out) throws IOException {
-        Utils.writeVInt(out, slice.length);
-        for (CGRangeSplit split : slice) {
-          split.write(out);
-        }
+        //Utils.writeVInt(out, slice.length);
+        //for (CGRangeSplit split : slice) {
+        //  split.write(out);
+        //}
+        slice.write(out);
       }
 
-      CGRangeSplit get(int index) {
-        return slice[index];
+      //CGRangeSplit get(int index) {
+       // return slice[index];
+      //}
+      
+      CGRangeSplit getCGRangeSplit() {
+        return slice;
       }
     }
 
     /**
+     * A row-based split on the zebra table;
+     */
+    public static class RowSplit implements Writable {
+      int cgIndex;  // column group index where split lies on;
+      CGRowSplit slice; 
+
+      RowSplit(int cgidx, CGRowSplit split) {
+        this.cgIndex = cgidx;
+        this.slice = split;
+      }
+
+      /**
+       * Default constructor.
+       */
+      public RowSplit() {
+        // no-op
+      }
+      
+      @Override
+      public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{cgIndex = " + cgIndex + "}\n");
+        sb.append(slice.toString());
+        
+        return sb.toString();
+      }
+
+      /**
+       * @see Writable#readFields(DataInput)
+       */
+      @Override
+      public void readFields(DataInput in) throws IOException {
+        this.cgIndex = Utils.readVInt(in);
+        CGRowSplit cgrs = new CGRowSplit();
+        cgrs.readFields(in);
+        this.slice = cgrs;
+      }
+
+      /**
+       * @see Writable#write(DataOutput)
+       */
+      @Override
+      public void write(DataOutput out) throws IOException {
+        Utils.writeVInt(out, cgIndex);
+        slice.write(out);
+      }
+      
+      int getCGIndex() {
+        return cgIndex;
+      }
+
+      CGRowSplit getCGRowSplit() {
+        return slice;
+      }
+    }
+    
+    
+    /**
      * BasicTable scanner class
      */
     private class BTScanner implements TableScanner {
@@ -733,74 +888,106 @@
       }
 
       public BTScanner(BytesWritable beginKey, BytesWritable endKey,
-          boolean closeReader, Partition partition) throws IOException {
-        this.partition = partition;
-        boolean anyScanner = false;
-        try {
-          schema = partition.getProjection();
-          cgScanners = new TableScanner[colGroups.length];
-          for (int i = 0; i < colGroups.length; ++i) {
-            if (!isCGDeleted(i) && partition.isCGNeeded(i)) 
-            {
-              anyScanner = true;
-              cgScanners[i] = colGroups[i].getScanner(beginKey, endKey, false);
-            } else
-              cgScanners[i] = null;
-          }
-          if (!anyScanner && firstValidCG >= 0) {
-            // if no CG is needed explicitly by projection but the "countRow" still needs to access some column group
-            cgScanners[firstValidCG] = colGroups[firstValidCG].
-                                         getScanner(beginKey, endKey, false);
-          }
-          this.closeReader = closeReader;
-          sClosed = false;
-        }
-        catch (Exception e) {
-          throw new IOException("BTScanner constructor failed : "
-              + e.getMessage());
+        boolean closeReader, Partition partition) throws IOException {
+        init(null, null, beginKey, endKey, closeReader, partition);
+      }
+      
+      public BTScanner(RangeSplit split, Partition partition,
+        boolean closeReader) throws IOException {
+        init(null, split, null, null, closeReader, partition);
+      }
+      
+      public BTScanner(RowSplit rowSplit,  boolean closeReader, 
+                       Partition partition) throws IOException {
+        init(rowSplit, null, null, null, closeReader, partition);
+      }      
+      
+      /**
+       * Creates new CGRowSplit. If the startRow in rowSplit is not set 
+       * (i.e. < 0), it sets the startRow and numRows based on 'startByte' 
+       * and 'numBytes' from given rowSplit.
+       */
+      private CGRowSplit makeCGRowSplit(RowSplit rowSplit) throws IOException {
+        CGRowSplit inputCGSplit = rowSplit.getCGRowSplit(); 
+
+        int cgIdx = rowSplit.getCGIndex();
+        
+        CGRowSplit cgSplit = new CGRowSplit();
+        cgSplit.fileIndex = inputCGSplit.fileIndex;
+        // startByte and numBytes from inputCGSplit are ignored, since
+        // they make sense for only one CG.
+        cgSplit.startRow = inputCGSplit.startRow;
+        cgSplit.numRows = inputCGSplit.numRows;
+        
+        if (cgSplit.startRow >= 0) {
+          //assume the rows are already set up.
+          return cgSplit;
         }
-        finally {
-          if (sClosed) {
-            if (cgScanners != null) {
-              for (int i = 0; i < cgScanners.length; ++i) {
-                if (cgScanners[i] != null) {
-                  try {
-                    cgScanners[i].close();
-                    cgScanners[i] = null;
-                  }
-                  catch (Exception e) {
-                    // no-op
-                  }
-                }
-              }
-            }
-          }
+        
+        // Find the row range :
+        if (isCGDeleted(cgIdx)) {
+          throw new IOException("CG " + cgIdx + " is deleted.");
         }
+        
+        //fill the row numbers.
+        colGroups[cgIdx].fillRowSplit(cgSplit, inputCGSplit.startByte,
+                                      inputCGSplit.numBytes);
+        return cgSplit;
+      }
+    
+      // Helper function for initialization.
+      private TableScanner createCGScanner(int cgIndex, CGRowSplit cgRowSplit, 
+                                           RangeSplit rangeSplit,
+                                           BytesWritable beginKey, 
+                                           BytesWritable endKey) 
+                      throws IOException, ParseException, 
+                             ParseException {        
+        if (cgRowSplit != null) {
+          return colGroups[cgIndex].getScanner(false, cgRowSplit);
+        }      
+        if (beginKey != null || endKey != null) {
+          return colGroups[cgIndex].getScanner(beginKey, endKey, false);
+        }
+        return colGroups[cgIndex].getScanner
+                ((rangeSplit == null ? null : rangeSplit.getCGRangeSplit()), 
+                 false);
       }
-
-      public BTScanner(RangeSplit split, Partition partition,
-          boolean closeReader) throws IOException {
+      
+      /**
+       * If rowRange is not null, scanners will be created based on the 
+       * row range. <br>
+       * If RangeSplit is not null, scaller will be based on the range, <br>
+       * otherwise, these are based on keys.
+       */
+      private void init(RowSplit rowSplit, RangeSplit rangeSplit,
+                   BytesWritable beginKey, BytesWritable endKey, 
+                   boolean closeReader, Partition partition) throws IOException {
+        this.partition = partition;
+        boolean anyScanner = false;
+        
+        CGRowSplit cgRowSplit = null;
+        if (rowSplit != null) {
+          cgRowSplit = makeCGRowSplit(rowSplit);
+        }
+        
         try {
           schema = partition.getProjection();
           cgScanners = new TableScanner[colGroups.length];
-          boolean anyScanner = false;
           for (int i = 0; i < colGroups.length; ++i) {
-            // if no CG is needed explicitly by projection but the "countRow" still needs to access some column group
-            if (!isCGDeleted(i) && partition.isCGNeeded(i))
+            if (!isCGDeleted(i) && partition.isCGNeeded(i)) 
             {
-              cgScanners[i] =
-                  colGroups[i].getScanner(split == null ? null : split.get(i),
-                      false);
               anyScanner = true;
+              cgScanners[i] = createCGScanner(i, cgRowSplit, rangeSplit,
+                                              beginKey, endKey);                                                             
             } else
               cgScanners[i] = null;
           }
           if (!anyScanner && firstValidCG >= 0) {
             // if no CG is needed explicitly by projection but the "countRow" still needs to access some column group
-            cgScanners[firstValidCG] = colGroups[firstValidCG].
-              getScanner(split == null ? null : split.get(firstValidCG), false);
+            cgScanners[firstValidCG] = createCGScanner(firstValidCG, cgRowSplit, 
+                                                       rangeSplit,
+                                                       beginKey, endKey);            
           }
-          this.partition = partition;
           this.closeReader = closeReader;
           sClosed = false;
         }

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTableStatus.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTableStatus.java?rev=882929&r1=882928&r2=882929&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTableStatus.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTableStatus.java Sat Nov 21 15:36:12 2009
@@ -22,7 +22,7 @@
 
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.file.tfile.Utils;
+import org.apache.hadoop.zebra.tfile.Utils;
 
 /**
  * Status of a BasicTable. The status may be reported under some projection.

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=882929&r1=882928&r2=882929&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Sat Nov 21 15:36:12 2009
@@ -37,6 +37,8 @@
 import java.util.Random;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.fs.BlockLocation;
@@ -48,10 +50,10 @@
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.file.tfile.TFile;
-import org.apache.hadoop.io.file.tfile.Utils;
-import org.apache.hadoop.io.file.tfile.ByteArray;
-import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.zebra.tfile.TFile;
+import org.apache.hadoop.zebra.tfile.Utils;
+import org.apache.hadoop.zebra.tfile.ByteArray;
+import org.apache.hadoop.zebra.tfile.RawComparable;
 import org.apache.hadoop.zebra.types.CGSchema;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.types.Partition;
@@ -78,6 +80,8 @@
  *      </ul>
  */
 class ColumnGroup {
+  static Log LOG = LogFactory.getLog(ColumnGroup.class);
+  
   private final static String CONF_COMPRESS = "table.output.tfile.compression";
   private final static String DEFAULT_COMPRESS = "gz";
   private final static String CONF_MIN_BLOCK_SIZE = "table.tfile.minblock.size";
@@ -156,7 +160,10 @@
   static CGIndex buildIndex(FileSystem fs, Path path, boolean dirty,
       Configuration conf) throws IOException {
     CGIndex ret = new CGIndex();
-    FileStatus[] files = fs.listStatus(path, new CGPathFilter(conf));
+    CGPathFilter cgPathFilter = new CGPathFilter();
+    CGPathFilter.setConf(conf);
+    FileStatus[] files = fs.listStatus(path, cgPathFilter);
+    
     Comparator<RawComparable> comparator = null;
     for (FileStatus f : files) {
       if (dirty) {
@@ -194,9 +201,15 @@
     }
 
     ret.sort(comparator);
+    
+    int idx = 0;
+    for (CGIndexEntry e : ret.getIndex()) {
+      e.setIndex(idx++);    
+    }
+    
     return ret;
-  }
-
+  }   
+  
   /**
    * ColumnGroup reader.
    */
@@ -266,7 +279,8 @@
       }
       projection = new Projection(cgschema.getSchema()); // default projection to CG schema.
       Path metaFilePath = makeMetaFilePath(path);
-      if (!fs.exists(metaFilePath)) {
+      /* If index file is not existing or loading from an unsorted table. */
+      if (!fs.exists(metaFilePath) || !cgschema.isSorted() ) {
         // special case for unsorted CG that did not create index properly.
         if (cgschema.isSorted()) {
           throw new FileNotFoundException(
@@ -425,6 +439,27 @@
     }
 
     /**
+     * Get a scanner that reads the rows defined by rowRange. 
+     * 
+     * @param closeReader
+     *          close the underlying Reader object when we close the scanner.
+     *          Should be set to true if we have only one scanner on top of the
+     *          reader, so that we should release resources after the scanner is
+     *          closed.
+     * @param rowSplit specifies part index, start row, and end row.
+     * @return A scanner object.
+     */
+    public synchronized TableScanner getScanner(boolean closeReader, 
+                                                CGRowSplit rowSplit)
+                        throws IOException, ParseException {
+      if (closed) {
+        throw new EOFException("Reader already closed");
+      }
+      
+      return new CGScanner(rowSplit, closeReader);
+    }
+     
+    /**
      * Given a split range, calculate how the file data that fall into the range
      * are distributed among hosts.
      * 
@@ -457,7 +492,69 @@
 
       return ret;
     }
+    
+    /**
+     * Given a row range, calculate how the file data that fall into the range
+     * are distributed among hosts.
+     * 
+     * @param split
+     *          The row-based split. If null, return all blocks.
+     * @return a map from host name to the amount of data (in bytes) the host
+     *         owns that fall roughly into the key range.
+     */
+    public BlockDistribution getBlockDistribution(CGRowSplit split)
+        throws IOException {
+      if (split == null) {
+        throw new IOException("Row-based split cannot be null for getBlockDistribution()");
+      }
+
+      BlockDistribution ret = new BlockDistribution();
+      CGIndexEntry entry = cgindex.get(split.fileIndex);
+      FileStatus tfileStatus = fs.getFileStatus(new Path(path, entry.getName())); 
+      
+      BlockLocation[] locations = fs.getFileBlockLocations(tfileStatus, split.startByte, split.numBytes);
+      for (BlockLocation l : locations) {
+        ret.add(l);
+      }
+      
+      return ret;
+    }
+
+   /**
+    * Sets startRow and number of rows in rowSplit based on
+    * startOffset and length.
+    * 
+    * It is assumed that 'startByte' and 'numBytes' in rowSplit itself
+    * are not valid.
+    */
+    void fillRowSplit(CGRowSplit rowSplit, long startOffset, long length) 
+                      throws IOException {
 
+      Path tfPath = new Path(path, cgindex.get(rowSplit.fileIndex).getName());
+      FileStatus tfile = fs.getFileStatus(tfPath);
+
+      TFile.Reader reader = null;
+      
+      try {
+        reader = new TFile.Reader(fs.open(tfPath),
+                                  tfile.getLen(), conf);
+
+        long startRow = reader.getRecordNumNear(startOffset);
+        long endRow = reader.getRecordNumNear(startOffset + length);
+
+        if (endRow < startRow) {
+          endRow = startRow;
+        }
+
+        rowSplit.startRow = startRow;
+        rowSplit.numRows = endRow - startRow;
+      } finally {
+        if (reader != null) {
+          reader.close();
+        }
+      }
+    }
+    
     /**
      * Get a sampling of keys and calculate how data are distributed among
      * key-partitioned buckets. The implementation attempts to calculate all
@@ -633,6 +730,32 @@
     }
 
     /**
+     * We already use FileInputFormat to create byte offset-based input splits.
+     * Their information is encoded in starts, lengths and paths. This method is 
+     * to wrap this information to form CGRowSplit objects at column group level.
+     * 
+     * @param starts array of starting byte of fileSplits.
+     * @param lengths array of length of fileSplits.
+     * @param paths array of path of fileSplits.
+     * @return A list of CGRowSplit objects. 
+     *         
+     */
+    public List<CGRowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths) throws IOException {
+      List<CGRowSplit> lst = new ArrayList<CGRowSplit>();
+       
+      for (int i=0; i<starts.length; i++) {
+        long start = starts[i];
+        long length = lengths[i];
+        Path path = paths[i];
+        int idx = cgindex.getFileIndex(path);        
+        
+        lst.add(new CGRowSplit(idx, start, length));
+      }
+      
+      return lst;
+    } 
+    
+    /**
      * Is the ColumnGroup sorted?
      * 
      * @return Whether the ColumnGroup is sorted.
@@ -659,8 +782,9 @@
       TFile.Reader.Scanner scanner;
       TupleReader tupleReader;
 
-      TFileScanner(FileSystem fs, Path path, RawComparable begin,
-          RawComparable end, CGSchema cgschema, Projection projection,
+      TFileScanner(FileSystem fs, Path path, CGRowSplit rowRange, 
+                    RawComparable begin, RawComparable end, 
+                    CGSchema cgschema, Projection projection,
           Configuration conf) throws IOException, ParseException {
         try {
           ins = fs.open(path);
@@ -668,7 +792,17 @@
            * compressor is inside cgschema
            */
           reader = new TFile.Reader(ins, fs.getFileStatus(path).getLen(), conf);
-          scanner = reader.createScanner(begin, end);
+          if (rowRange != null && rowRange.fileIndex >= 0) {
+            scanner = reader.createScannerByRecordNum(rowRange.startRow, 
+                                         rowRange.startRow + rowRange.numRows);
+          } else {
+            /* using deprecated API just so that zebra can work with 
+             * hadoop jar that does not contain HADOOP-6218 (Record ids for
+             * TFile). This is expected to be temporary. Later we should 
+             * use the undeprecated API.
+             */
+            scanner = reader.createScanner(begin, end);
+          }
           /*
            * serializer is inside cgschema: different serializer will require
            * different Reader: for pig, it's TupleReader
@@ -791,9 +925,29 @@
           beginIndex = split.start;
           endIndex = split.start + split.len;
         }
-        init(null, null, closeReader);
+        init(null, null, null, closeReader);
       }
-
+      
+      /**
+       * Scanner for a range specified by the given row range.
+       * 
+       * @param rowRange see {@link CGRowSplit}
+       * @param closeReader
+       */
+      CGScanner(CGRowSplit rowRange, boolean closeReader) 
+                 throws IOException, ParseException {
+        beginIndex = 0;
+        endIndex = cgindex.size();
+        if (rowRange != null && rowRange.fileIndex>= 0) {
+          if (rowRange.fileIndex >= cgindex.size()) {
+            throw new IllegalArgumentException("Part Index is out of range.");
+          }
+          beginIndex = rowRange.fileIndex;
+          endIndex = beginIndex+1;
+        }
+        init(rowRange, null, null, closeReader);
+      }
+      
       CGScanner(RawComparable beginKey, RawComparable endKey,
           boolean closeReader) throws IOException, ParseException {
         beginIndex = 0;
@@ -807,11 +961,12 @@
             ++endIndex;
           }
         }
-        init(beginKey, endKey, closeReader);
+        init(null, beginKey, endKey, closeReader);
       }
 
-      private void init(RawComparable beginKey, RawComparable endKey,
-          boolean doClose) throws IOException, ParseException {
+      private void init(CGRowSplit rowRange, RawComparable beginKey, 
+                        RawComparable endKey, boolean doClose) 
+             throws IOException, ParseException {
         if (beginIndex > endIndex) {
           throw new IllegalArgumentException("beginIndex > endIndex");
         }
@@ -823,8 +978,9 @@
             RawComparable begin = (i == beginIndex) ? beginKey : null;
             RawComparable end = (i == endIndex - 1) ? endKey : null;
             TFileScanner scanner =
-                new TFileScanner(fs, cgindex.getPath(i, path), begin, end,
-                    cgschema, logicalSchema, conf);
+                new TFileScanner(fs, cgindex.getPath(i, path), rowRange, 
+                                 begin, end,
+                                 cgschema, logicalSchema, conf);
             // skip empty scanners.
             if (!scanner.atEnd()) {
               tmpScanners.add(scanner);
@@ -971,7 +1127,55 @@
         Utils.writeVInt(out, len);
       }
     }
+    
+    public static class CGRowSplit implements Writable {
+      int fileIndex = -1;
+      long startByte = -1;
+      long numBytes = -1;
+      long startRow = -1;
+      long numRows = -1;
+
+      CGRowSplit(int fileIdx, long start, long len) {
+        this.fileIndex = fileIdx;
+        this.startByte = start;
+        this.numBytes = len;
+      }
+
+      public CGRowSplit() {
+        // no-op;
+      }
+      
+      @Override
+      public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{fileIndex = " + fileIndex + "}\n");       
+        sb.append("{startByte = " + startByte + "}\n");
+        sb.append("{numBytes = " + numBytes + "}\n");
+        sb.append("{startRow = " + startRow + "}\n");
+        sb.append("{numRows = " + numRows + "}\n");
+        
+        return sb.toString();
+      }
+
+      @Override
+      public void readFields(DataInput in) throws IOException {
+        fileIndex = Utils.readVInt(in);
+        startByte = Utils.readVLong(in);
+        numBytes = Utils.readVLong(in);
+        startRow = Utils.readVLong(in);
+        numRows = Utils.readVLong(in);
+      }
 
+      @Override
+      public void write(DataOutput out) throws IOException {
+        Utils.writeVInt(out, fileIndex);
+        Utils.writeVLong(out, startByte);
+        Utils.writeVLong(out, numBytes);
+        Utils.writeVLong(out, startRow);
+        Utils.writeVLong(out, numRows);
+      }      
+    }
+    
     private static class SplitColumn {
       SplitColumn(Partition.SplitType st) {
         this.st = st;
@@ -1209,13 +1413,23 @@
     private void createIndex() throws IOException {
       MetaFile.Writer metaFile =
           MetaFile.createWriter(makeMetaFilePath(path), conf);
-      CGIndex index = buildIndex(fs, path, false, conf);
-      DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
-      try {
-        index.write(dos);
-      }
-      finally {
-        dos.close();
+      if (cgschema.isSorted()) {
+        CGIndex index = buildIndex(fs, path, false, conf);
+        DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
+        try {
+          index.write(dos);
+        }
+        finally {
+          dos.close();
+        }
+      } else { /* Create an empty data meta file for unsorted table. */
+        DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
+        try {
+          Utils.writeString(dos, "");
+        } 
+        finally {
+          dos.close();
+        }
       }
       metaFile.close();
     }
@@ -1442,18 +1656,19 @@
    * name, first and last key (inclusive) of a data file
    */
   static class CGIndexEntry implements RawComparable, Writable {
+    int index;
     String name;
     long rows;
     RawComparable firstKey;
     RawComparable lastKey;
 
     // for reading
-    CGIndexEntry() {
+    public CGIndexEntry() {
       // no-op
     }
 
     // for writing
-    CGIndexEntry(String name, long rows, RawComparable firstKey,
+    public CGIndexEntry(String name, long rows, RawComparable firstKey,
         RawComparable lastKey) {
       this.name = name;
       this.rows = rows;
@@ -1461,6 +1676,10 @@
       this.lastKey = lastKey;
     }
 
+    public int getIndex() {
+      return index;
+    }
+    
     public String getName() {
       return name;
     }
@@ -1476,6 +1695,10 @@
     public RawComparable getLastKey() {
       return lastKey;
     }
+    
+    void setIndex (int idx) {
+      this.index = idx;
+    }
 
     @Override
     public byte[] buffer() {
@@ -1538,6 +1761,16 @@
       status = new BasicTableStatus();
       index = new ArrayList<CGIndexEntry>();
     }
+    
+    int getFileIndex(Path path) throws IOException {
+      String filename = path.getName();
+      for (CGIndexEntry cgie : index) {
+        if (cgie.getName().equals(filename)) {
+          return cgie.getIndex(); 
+        }
+      }
+      throw new IOException("File " + filename + " is not in the column group index"); 
+    }
 
     int size() {
       return index.size();
@@ -1692,16 +1925,17 @@
     }
   }
 
-  static class CGPathFilter implements PathFilter {
-    private final Configuration conf;
-
-    CGPathFilter(Configuration conf) {
-      this.conf = conf;
+  public static class CGPathFilter implements PathFilter {
+    private static Configuration conf;
+   
+    public static void setConf(Configuration c) {
+      conf = c;
     }
 
     public boolean accept(Path p) {
       return p.getName().equals(META_FILE) || p.getName().equals(SCHEMA_FILE)
           || p.getName().startsWith(".tmp.")
+          || p.getName().startsWith("ttt")
           || p.getName().startsWith(getNonDataFilePrefix(conf)) ? false : true;
     }
   }

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java?rev=882929&r1=882928&r2=882929&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java Sat Nov 21 15:36:12 2009
@@ -23,8 +23,8 @@
 import java.util.TreeMap;
 
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.file.tfile.RawComparable;
-import org.apache.hadoop.io.file.tfile.ByteArray;
+import org.apache.hadoop.zebra.tfile.RawComparable;
+import org.apache.hadoop.zebra.tfile.ByteArray;
 
 /**
  * Class used to convey the information of how on-disk data are distributed

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java?rev=882929&r1=882928&r2=882929&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java Sat Nov 21 15:36:12 2009
@@ -28,9 +28,9 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.file.tfile.TFile;
-import org.apache.hadoop.io.file.tfile.MetaBlockAlreadyExists;
-import org.apache.hadoop.io.file.tfile.MetaBlockDoesNotExist;
+import org.apache.hadoop.zebra.tfile.TFile;
+import org.apache.hadoop.zebra.tfile.MetaBlockAlreadyExists;
+import org.apache.hadoop.zebra.tfile.MetaBlockDoesNotExist;
 
 
 /**

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java?rev=882929&r1=882928&r2=882929&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java Sat Nov 21 15:36:12 2009
@@ -133,6 +133,26 @@
   }
   
   /**
+   * Get a scanner with an unsorted split.
+   * 
+   * @param split
+   *          The range split.
+   * @param projection
+   *          The projection schema. It should never be null.
+   * @param conf
+   *          The configuration
+   * @return A table scanner.
+   * @throws IOException
+   */
+  public TableScanner getScanner(RowTableSplit split, String projection,
+      Configuration conf) throws IOException, ParseException, ParseException {
+    BasicTable.Reader reader =
+        new BasicTable.Reader(new Path(split.getPath()), conf);
+    reader.setProjection(projection);
+    return reader.getScanner(true, split.getSplit());
+  }
+  
+  /**
    * A leaf table corresponds to a materialized table. It is represented by the
    * path to the BasicTable and the projection.
    */

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=882929&r1=882928&r2=882929&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Sat Nov 21 15:36:12 2009
@@ -24,11 +24,16 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.hadoop.util.StringUtils;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.zebra.tfile.RawComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -38,16 +43,14 @@
 import org.apache.hadoop.zebra.io.BasicTableStatus;
 import org.apache.hadoop.zebra.io.BlockDistribution;
 import org.apache.hadoop.zebra.io.KeyDistribution;
-import org.apache.hadoop.zebra.io.TableScanner;
 import org.apache.hadoop.zebra.io.BasicTable.Reader;
 import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RowSplit;
 import org.apache.hadoop.zebra.mapred.TableExpr.LeafTableInfo;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.types.Projection;
 import org.apache.hadoop.zebra.schema.Schema;
-import org.apache.hadoop.zebra.types.TypesUtils;
 import org.apache.hadoop.zebra.types.SortInfo;
-import org.apache.hadoop.zebra.mapred.TableExpr.LeafTableInfo;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
@@ -133,6 +136,8 @@
  * </UL>
  */
 public class TableInputFormat implements InputFormat<BytesWritable, Tuple> {
+  static Log LOG = LogFactory.getLog(TableInputFormat.class);
+  
   private static final String INPUT_EXPR = "mapred.lib.table.input.expr";
   private static final String INPUT_PROJ = "mapred.lib.table.input.projection";
   private static final String INPUT_SORT = "mapred.lib.table.input.sort";
@@ -162,45 +167,6 @@
     }
   }
   
-  //This method escapes commas in the glob pattern of the given paths. 
-  private static String[] getPathStrings(String commaSeparatedPaths) {
-    int length = commaSeparatedPaths.length();
-    int curlyOpen = 0;
-    int pathStart = 0;
-    boolean globPattern = false;
-    List<String> pathStrings = new ArrayList<String>();
-    
-    for (int i=0; i<length; i++) {
-      char ch = commaSeparatedPaths.charAt(i);
-      switch(ch) {
-        case '{' : {
-          curlyOpen++;
-          if (!globPattern) {
-            globPattern = true;
-          }
-          break;
-        }
-        case '}' : {
-          curlyOpen--;
-          if (curlyOpen == 0 && globPattern) {
-            globPattern = false;
-          }
-          break;
-        }
-        case ',' : {
-          if (!globPattern) {
-            pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
-            pathStart = i + 1 ;
-          }
-          break;
-        }
-      }
-    }
-    pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
-    
-    return pathStrings.toArray(new String[0]);
-  }
-
   /**
    * Set the input expression in the JobConf object.
    * 
@@ -517,7 +483,7 @@
       BlockDistribution bd = null;
       for (Iterator<BasicTable.Reader> it = readers.iterator(); it.hasNext();) {
         BasicTable.Reader reader = it.next();
-        bd = BlockDistribution.sum(bd, reader.getBlockDistribution(null));
+        bd = BlockDistribution.sum(bd, reader.getBlockDistribution((RangeSplit)null));
       }
       
       if (bd == null) {
@@ -603,7 +569,7 @@
       numSplits = -1;
     }
 
-    ArrayList<InputSplit> ret = new ArrayList<InputSplit>();;
+    ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
     if (numSplits <= 0) {
       for (int i = 0; i < readers.size(); ++i) {
         BasicTable.Reader reader = readers.get(i);
@@ -637,6 +603,68 @@
       }
     }
 
+    LOG.info("getSplits : returning " + ret.size() + " file splits.");
+    return ret.toArray(new InputSplit[ret.size()]);
+  }
+  
+  private static class DummyFileInputFormat extends FileInputFormat<BytesWritable, Tuple> {
+    public DummyFileInputFormat(long minSplitSize) {
+      super.setMinSplitSize(minSplitSize);
+    }
+    
+    @Override
+    public RecordReader<BytesWritable, Tuple> getRecordReader(InputSplit split,
+        JobConf conf, Reporter reporter) throws IOException {
+      // no-op
+      return null;
+    }
+  }
+  
+  private static InputSplit[] getRowSplits(JobConf conf, int numSplits,
+      TableExpr expr, List<BasicTable.Reader> readers, 
+      List<BasicTableStatus> status) throws IOException {
+    ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
+    DummyFileInputFormat helper = new DummyFileInputFormat(getMinSplitSize(conf));
+
+    for (int i = 0; i < readers.size(); ++i) {
+      BasicTable.Reader reader = readers.get(i);
+      /* Get the index of the column group that will be used for row-split.*/
+      int splitCGIndex = reader.getRowSplitCGIndex();
+      
+      /* We can create input splits only if there does exist a valid column group for split.
+       * Otherwise, we do not create input splits. */
+      if (splitCGIndex >= 0) {        
+        Path path = new Path (reader.getPath().toString() + "/" + reader.getName(splitCGIndex));
+        DummyFileInputFormat.setInputPaths(conf, path);
+        PathFilter filter = reader.getPathFilter(conf);
+        DummyFileInputFormat.setInputPathFilter(conf, filter.getClass());
+        InputSplit[] inputSplits = helper.getSplits(conf, (numSplits < 1 ? 1 : numSplits));
+        
+        long starts[] = new long[inputSplits.length];
+        long lengths[] = new long[inputSplits.length];
+        Path paths[] = new Path [inputSplits.length];
+        for (int j=0; j<inputSplits.length; j++) {
+          FileSplit fileSplit = (FileSplit) inputSplits[j];
+          Path p = fileSplit.getPath();
+          long start = fileSplit.getStart();
+          long length = fileSplit.getLength();
+
+          starts[j] = start;
+          lengths[j] = length;
+          paths[j] = p;
+        }
+        
+        List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex);
+  
+        for (Iterator<RowSplit> it = subSplits.iterator(); it.hasNext();) {
+          RowSplit subSplit = it.next();
+          RowTableSplit split = new RowTableSplit(reader, subSplit, conf);
+          ret.add(split);
+        }
+      }
+    }
+    
+    LOG.info("getSplits : returning " + ret.size() + " row splits.");
     return ret.toArray(new InputSplit[ret.size()]);
   }
 
@@ -644,8 +672,7 @@
    * @see InputFormat#getSplits(JobConf, int)
    */
   @Override
-  public InputSplit[] getSplits(JobConf conf, int numSplits)
-      throws IOException {
+  public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
     TableExpr expr = getInputExpr(conf);
     if (getSorted(conf))
       expr.setSortedSplit();
@@ -657,7 +684,7 @@
     try {
       projection = getProjection(conf);
     } catch (ParseException e) {
-    	throw new IOException("getProjection failed : "+e.getMessage());
+      throw new IOException("getProjection failed : "+e.getMessage());
     }
     List<LeafTableInfo> leaves = expr.getLeafTables(projection);
     int nLeaves = leaves.size();
@@ -690,9 +717,10 @@
       if (expr.sortedSplitRequired()) {
         return getSortedSplits(conf, numSplits, expr, readers, status);
       }
-      return getUnsortedSplits(conf, numSplits, expr, readers, status);
+       
+      return getRowSplits(conf, numSplits, expr, readers, status);
     } catch (ParseException e) {
-    	throw new IOException("Projection parsing failed : "+e.getMessage());
+      throw new IOException("Projection parsing failed : "+e.getMessage());
     }
     finally {
       for (Iterator<BasicTable.Reader> it = readers.iterator(); it.hasNext();) {
@@ -700,7 +728,7 @@
           it.next().close();
         }
         catch (Exception e) {
-          // trap errors.
+          e.printStackTrace();
           // TODO: log the error here.
         }
       }
@@ -872,7 +900,6 @@
     }
     hosts = WritableUtils.readStringArray(in);
     length = WritableUtils.readVLong(in);
-
   }
 
   @Override
@@ -897,3 +924,76 @@
     return split;
   }
 }
+
+/**
+ * Adaptor class for unsorted InputSplit for table.
+ */
+class RowTableSplit implements InputSplit {
+  String path = null;
+  RowSplit split = null;
+  String[] hosts = null;
+  long length = 1;
+
+  public RowTableSplit(Reader reader, RowSplit split, JobConf conf)
+      throws IOException {
+    this.path = reader.getPath();
+    this.split = split;
+    BlockDistribution dataDist = reader.getBlockDistribution(split);
+    if (dataDist != null) {
+      length = dataDist.getLength();
+      hosts =
+          dataDist.getHosts(conf.getInt("mapred.lib.table.input.nlocation", 5));
+    }
+  }
+  
+  public RowTableSplit() {
+    // no-op for Writable construction
+  }
+  
+  @Override
+  public long getLength() throws IOException {
+    return length;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException {
+    return hosts;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    path = WritableUtils.readString(in);
+    int bool = WritableUtils.readVInt(in);
+    if (bool == 1) {
+      if (split == null) split = new RowSplit();
+      split.readFields(in);
+    }
+    else {
+      split = null;
+    }
+    hosts = WritableUtils.readStringArray(in);
+    length = WritableUtils.readVLong(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeString(out, path);
+    if (split == null) {
+      WritableUtils.writeVInt(out, 0);
+    }
+    else {
+      WritableUtils.writeVInt(out, 1);
+      split.write(out);
+    }
+    WritableUtils.writeStringArray(out, hosts);
+    WritableUtils.writeVLong(out, length);
+  }
+
+  public String getPath() {
+    return path;
+  }
+  
+  public RowSplit getSplit() {
+    return split;
+  }
+}

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java?rev=882929&r1=882928&r2=882929&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java Sat Nov 21 15:36:12 2009
@@ -55,6 +55,9 @@
       scanner =
           expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection,
               conf);
+    } else if (split != null && split instanceof RowTableSplit) {
+      RowTableSplit rowSplit = (RowTableSplit) split;
+      scanner = expr.getScanner(rowSplit, projection, conf);
     }
     else {
       UnsortedTableSplit tblSplit = (UnsortedTableSplit) split;

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java?rev=882929&r1=882928&r2=882929&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java Sat Nov 21 15:36:12 2009
@@ -23,7 +23,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.file.tfile.Utils;
+import org.apache.hadoop.zebra.tfile.Utils;
 import org.apache.pig.data.DataType;
 
 /**

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java?rev=882929&r1=882928&r2=882929&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java Sat Nov 21 15:36:12 2009
@@ -660,13 +660,13 @@
    */
   @Override
   public void readFields(DataInput in) throws IOException {
-    long version = org.apache.hadoop.io.file.tfile.Utils.readVLong(in);
+    long version = org.apache.hadoop.zebra.tfile.Utils.readVLong(in);
 
     if (version > schemaVersion)
       throw new IOException("Schema version is newer than that in software.");
 
     // check-ups are needed for future versions for backward-compatibility
-    String strSchema = org.apache.hadoop.io.file.tfile.Utils.readString(in);
+    String strSchema = org.apache.hadoop.zebra.tfile.Utils.readString(in);
     try {
       init(strSchema, false);
     }
@@ -680,8 +680,8 @@
    */
   @Override
   public void write(DataOutput out) throws IOException {
-    org.apache.hadoop.io.file.tfile.Utils.writeVLong(out, schemaVersion);
-    org.apache.hadoop.io.file.tfile.Utils.writeString(out, toString());
+    org.apache.hadoop.zebra.tfile.Utils.writeVLong(out, schemaVersion);
+    org.apache.hadoop.zebra.tfile.Utils.writeString(out, toString());
   }
 
   private void init(String[] columnNames, boolean virtualColAllowed) throws ParseException {