You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/03/22 07:26:42 UTC

svn commit: r925971 - in /hadoop/pig/trunk/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/ src/java/org/apache/hadoop/zebra/mapreduce/ src/java/org/apache/hadoop/zebra/tfile/ src/test/org/apache/hadoop/ze...

Author: yanz
Date: Mon Mar 22 06:26:41 2010
New Revision: 925971

URL: http://svn.apache.org/viewvc?rev=925971&view=rev
Log:
PIG-1258 Number of sorted input splits is unusually high (yanz)

Modified:
    hadoop/pig/trunk/contrib/zebra/CHANGES.txt
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFile.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java

Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Mon Mar 22 06:26:41 2010
@@ -66,6 +66,8 @@ Trunk (unreleased changes)
 
   BUG FIXES
 
+    PIG-1258 Number of sorted input splits is unusually high (yanz)
+
     PIG-1269 Restrict schema definition for collection (xuefuz via yanz)
 
     PIG-1253: make map/reduce test cases run on real cluster (chaow via yanz)

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Mon Mar 22 06:26:41 2010
@@ -44,7 +44,6 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.zebra.tfile.RawComparable;
 import org.apache.hadoop.zebra.tfile.TFile;
 import org.apache.hadoop.zebra.tfile.Utils;
 import org.apache.hadoop.zebra.tfile.MetaBlockAlreadyExists;
@@ -474,21 +473,18 @@ public class BasicTable {
      * 
      * @param n
      *          Targeted size of the sampling.
+     * @param nTables
+     *          Number of tables in union
      * @return KeyDistribution object.
      * @throws IOException
      */
-    public KeyDistribution getKeyDistribution(int n) throws IOException {
-      KeyDistribution kd =
-          new KeyDistribution(TFile.makeComparator(schemaFile.getComparator()));
-      for (int nx = 0; nx < colGroups.length; nx++) {
-        if (!isCGDeleted(nx)) {
-           kd.add(colGroups[nx].getKeyDistribution(n));
-        }
-      }
-      if (n >= 0 && kd.size() > (int) (n * 1.5)) {
-        kd.resize(n);
+    public KeyDistribution getKeyDistribution(int n, int nTables, BlockDistribution lastBd) throws IOException {
+      if (firstValidCG >= 0)
+      {
+        // pick the largest CG as in the row split case
+        return colGroups[getRowSplitCGIndex()].getKeyDistribution(n, nTables, lastBd);
       }
-      return kd;
+      return null;
     }
 
     /**
@@ -650,7 +646,8 @@ public class BasicTable {
      *         construct a TableScanner later. 
      *         
      */
-    public List<RowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths, int splitCGIndex, int[] batchSizes, int numBatches) throws IOException {
+    public List<RowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths,
+        int splitCGIndex, int[] batchSizes, int numBatches) throws IOException {
       List<RowSplit> ret;      
       List<CGRowSplit> cgSplits = colGroups[splitCGIndex].rowSplit(starts, lengths, paths, batchSizes, numBatches);
       int numSlices = cgSplits.size();
@@ -679,6 +676,7 @@ public class BasicTable {
      */
     public int getRowSplitCGIndex() throws IOException {
       // Try to find the largest non-deleted and used column group by projection;
+      // Try to find the largest non-deleted and used column group by projection;
       if (rowSplitCGIndex == -1)
       {
         int largestCGIndex = -1;
@@ -702,7 +700,7 @@ public class BasicTable {
           rowSplitCGIndex = largestCGIndex;
         } else if (firstValidCG >= 0) { /* If all projection columns are either deleted or non-existing,
                                         then we use the first non-deleted column group to do split if it exists. */
-          rowSplitCGIndex = firstValidCG; 
+          rowSplitCGIndex = firstValidCG;
         } 
       } 
       return rowSplitCGIndex;
@@ -844,8 +842,7 @@ public class BasicTable {
      * A row-based split on the zebra table;
      */
     public static class RowSplit implements Writable {
-		
-	int cgIndex;  // column group index where split lies on;
+      int cgIndex;  // column group index where split lies on;
       CGRowSplit slice; 
 
       RowSplit(int cgidx, CGRowSplit split) {
@@ -931,7 +928,7 @@ public class BasicTable {
                        Partition partition) throws IOException {
         init(rowSplit, null, null, null, closeReader, partition);
       }      
-    
+
       /**
        * Creates new CGRowSplit. If the startRow in rowSplit is not set 
        * (i.e. < 0), it sets the startRow and numRows based on 'startByte' 
@@ -943,12 +940,11 @@ public class BasicTable {
         int cgIdx = rowSplit.getCGIndex();
         
         CGRowSplit cgSplit = new CGRowSplit();
-        
+
         // Find the row range :
         if (isCGDeleted(cgIdx)) {
           throw new IOException("CG " + cgIdx + " is deleted.");
         }
-        
         //fill the row numbers.
         colGroups[cgIdx].fillRowSplit(cgSplit, inputCGSplit);
         return cgSplit;

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java Mon Mar 22 06:26:41 2010
@@ -39,10 +39,10 @@ public class BlockDistribution {
   private long uniqueBytes;
   private Map<String, Long> dataDistri; // map from host names to bytes.
 
-  BlockDistribution() {
+  public BlockDistribution() {
     dataDistri = new HashMap<String, Long>();
   }
-
+  
   void add(long bytes, Map<String, Long> distri) {
     this.uniqueBytes += bytes;
     reduceDataDistri(dataDistri, distri);
@@ -58,7 +58,7 @@ public class BlockDistribution {
       lv.put(key, (sum == null) ? delta : sum + delta);
     }
   }
-
+  
   void add(BlockLocation blkLocation) throws IOException {
     long blkLen = blkLocation.getLength();
     Map<String, Long> tmp = new HashMap<String, Long>();

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Mon Mar 22 06:26:41 2010
@@ -90,7 +90,7 @@ class ColumnGroup {
   private final static String CONF_MIN_SPLIT_SIZE = "table.input.split.minSize";
   private final static int DEFAULT_MIN_SPLIT_SIZE = 64 * 1024;
 
-  private static final double SPLIT_SLOP = 1.1; // 10% slop
+  static final double SPLIT_SLOP = 1.1; // 10% slop
 
   // excluding files start with the following prefix, may change to regex
   private final static String CONF_NON_DATAFILE_PREFIX =
@@ -102,6 +102,9 @@ class ColumnGroup {
   // meta data TFile for entire CG, used as a flag of closed CG
   final static String META_FILE = ".meta";
 
+  // sorted table key ranges for default sorted table split generations
+  private final static String KEY_RANGE_FOR_DEFAULT_SORTED_SPLIT = ".keyrange";
+
   static final String BLOCK_NAME_INDEX = "ColumnGroup.index";
 
   static Path makeMetaFilePath(Path parent) {
@@ -527,23 +530,50 @@ class ColumnGroup {
         FileStatus tfileStatus = fs.getFileStatus(new Path(path, split.names[i]));
         
         BlockLocation[] locations = null;
+        long len = 0;
         if (i == 0) {
-        if (split.startByteFirst != -1)
-          locations = fs.getFileBlockLocations(tfileStatus, split.startByteFirst, split.numBytesFirst);
+          if (split.startByteFirst != -1)
+          {
+            len = split.numBytesFirst;
+            locations = fs.getFileBlockLocations(tfileStatus, split.startByteFirst, len);
+          }
         } else if (i == split.length - 1) {
-           if (split.startByteLast != -1)
-             locations = fs.getFileBlockLocations(tfileStatus, split.startByteLast, split.numBytesLast);
+           if (split.numBytesLast != -1)
+           {
+             len = split.numBytesLast;
+             locations = fs.getFileBlockLocations(tfileStatus, 0, len);
+           }
         }
+
         if (locations == null)
-          locations = fs.getFileBlockLocations(tfileStatus, 0, tfileStatus.getLen());
+        {
+          len = tfileStatus.getLen();
+          locations = fs.getFileBlockLocations(tfileStatus, 0, len);
+        }
 
         for (BlockLocation l : locations) {
           ret.add(l);
         }
-      } 
+      }
       return ret;
     }
 
+  private int getStartBlockIndex(long[] startOffsets, long offset)
+  {
+    int index = Arrays.binarySearch(startOffsets, offset);
+    if (index < 0)
+      index = -index - 2;
+    return index;
+  }
+  
+  private int getEndBlockIndex(long[] startOffsets, long offset)
+  {
+    int index = Arrays.binarySearch(startOffsets, offset);
+    if (index < 0)
+      index = -index - 1;
+    return index;
+  }
+
    /**
     * Sets startRow and number of rows in rowSplit based on
     * startOffset and length.
@@ -551,7 +581,7 @@ class ColumnGroup {
     * It is assumed that 'startByte' and 'numBytes' in rowSplit itself
     * are not valid.
     */
-    void fillRowSplit(CGRowSplit rowSplit, CGRowSplit src)throws IOException {
+    void fillRowSplit(CGRowSplit rowSplit, CGRowSplit src) throws IOException {
 
       if (src.names == null || src.length == 0)
         return;
@@ -570,7 +600,6 @@ class ColumnGroup {
       rowSplit.length = src.length;
       rowSplit.startByteFirst = src.startByteFirst;
       rowSplit.numBytesFirst = src.numBytesFirst;
-      rowSplit.startByteLast = src.startByteLast;
       rowSplit.numBytesLast = src.numBytesLast;
 
       Path firstPath = null, lastPath;
@@ -600,7 +629,7 @@ class ColumnGroup {
           throw e;
         }
       }
-      if (src.startByteLast != -1 && rowSplit.length > 1)
+      if (src.numBytesLast != -1 && rowSplit.length > 1)
       {
         lastPath = new Path(path, rowSplit.names[rowSplit.length - 1]);
         if (reader == null || !firstPath.equals(lastPath))
@@ -617,13 +646,8 @@ class ColumnGroup {
           reader = new TFile.Reader(fs.open(lastPath), size, conf);
         }
         try {
-          long startRow = reader.getRecordNumNear(src.startByteLast);
-          long endRow = reader.getRecordNumNear(src.startByteLast + src.numBytesLast);
-
-          if (endRow < startRow)
-            endRow = startRow;
-          rowSplit.startRowLast = startRow;
-          rowSplit.numRowsLast = endRow - startRow;
+          long endRow = reader.getRecordNumNear(src.numBytesLast);
+          rowSplit.numRowsLast = endRow;
         } catch (IOException e) {
           reader.close();
           throw e;
@@ -642,15 +666,45 @@ class ColumnGroup {
      * 
      * @param n
      *          Targeted size of the sampling.
+     * @param nTables
+     *          Number of tables in a union
      * @return KeyDistribution object.
      * @throws IOException
      */
-    public KeyDistribution getKeyDistribution(int n) throws IOException {
+    public KeyDistribution getKeyDistribution(int n, int nTables, BlockDistribution lastBd) throws IOException {
       // TODO: any need for similar capability for unsorted for sorted CGs?
       if (!isSorted()) {
         throw new IOException("Cannot get key distribution for unsorted table");
       }
       KeyDistribution ret = new KeyDistribution(comparator);
+
+      if (n < 0)
+      {
+        /*
+        Path keyRangeFile = new Path(path, KEY_RANGE_FOR_DEFAULT_SORTED_SPLIT);
+        if (fs.exists(keyRangeFile))
+        {
+          try {
+            FSDataInputStream ins = fs.open(keyRangeFile);
+            long minStepSize = ins.readLong();
+            int size = ins.readInt();
+            for (int i = 0; i < size; i++)
+            {
+              BytesWritable keyIn = new BytesWritable();
+              keyIn.readFields(ins);
+              ByteArray key = new ByteArray(keyIn.getBytes());
+              ret.add(key);
+            }
+            ret.setMinStepSize(minStepSize);
+            return ret;
+          } catch (Exception e) {
+            // no-op
+          }
+        }
+        */
+        n = 1;
+      }
+
       Path[] paths = new Path[cgindex.size()];
       FileStatus[] tfileStatus = new FileStatus[paths.length];
       long totalBytes = 0;
@@ -659,48 +713,73 @@ class ColumnGroup {
         tfileStatus[i] = fs.getFileStatus(paths[i]);
         totalBytes += tfileStatus[i].getLen();
       }
-      // variable.
 
-      final long EPSILON = (long) (getMinSplitSize(conf) * (SPLIT_SLOP - 1));
+      final long minSize = getMinSplitSize(conf);
+      final long EPSILON = (long) (minSize * (SPLIT_SLOP - 1));
       long goalSize = totalBytes / n;
-      goalSize = Math.max(getMinSplitSize(conf), goalSize);
+      long batchSize = 0;
+      BlockDistribution bd = new BlockDistribution();;
+      RawComparable prevKey = null;
+
+      long minStepSize = -1;
+      FSDataInputStream nextFsdis = null;
+      TFile.Reader nextReader = null;
       for (int i = 0; i < paths.length; ++i) {
         FileStatus fstatus = tfileStatus[i];
         long blkSize = fstatus.getBlockSize();
         long fileLen = fstatus.getLen();
-        long stepSize =
-            (goalSize > blkSize) ? goalSize / blkSize * blkSize : blkSize
-                / (blkSize / goalSize);
+        long stepSize = Math.max(minSize,
+            (goalSize < blkSize) ? goalSize : blkSize);
+        if (minStepSize== -1 || minStepSize > stepSize)
+          minStepSize = stepSize;
+        // adjust the block size by the scaling factor
+        blkSize /= nTables;
+        stepSize = Math.max(minSize,
+          (goalSize < blkSize) ? goalSize : blkSize);
         FSDataInputStream fsdis = null;
         TFile.Reader reader = null;
         long remainLen = fileLen;
-        boolean done = false;
         try {
-          fsdis = fs.open(paths[i]);
-          reader = new TFile.Reader(fsdis, tfileStatus[i].getLen(), conf);
+          if (nextReader == null)
+          {
+            fsdis = fs.open(paths[i]);
+            reader = new TFile.Reader(fsdis, fileLen, conf);
+          } else {
+            fsdis = nextFsdis;
+            reader = nextReader;
+          }
+          BlockLocation[] locations =
+              fs.getFileBlockLocations(fstatus, 0, fileLen);
+          if (locations.length == 0) {
+            throw new AssertionError(
+                "getFileBlockLocations returns 0 location");
+          }
+
+          Arrays.sort(locations, new Comparator<BlockLocation>() {
+            @Override
+            public int compare(BlockLocation o1, BlockLocation o2) {
+              long diff = o1.getOffset() - o2.getOffset();
+              if (diff < 0) return -1;
+              if (diff > 0) return 1;
+              return 0;
+            }
+          });
+          
+          long[] startOffsets = new long[locations.length];
+
+          for (int ii = 0; ii < locations.length; ii++)
+            startOffsets[ii] = locations[ii].getOffset();
+
+          boolean done = false;
           while ((remainLen > 0) && !done) {
             long splitBytes =
-                (remainLen > stepSize * SPLIT_SLOP) ? stepSize : remainLen;
+                remainLen > stepSize ? stepSize : remainLen;
             long offsetBegin = fileLen - remainLen;
             long offsetEnd = offsetBegin + splitBytes;
-            BlockLocation[] locations =
-                fs.getFileBlockLocations(fstatus, offsetBegin, splitBytes);
-            if (locations.length == 0) {
-              throw new AssertionError(
-                  "getFileBlockLocations returns 0 location");
-            }
-
-            Arrays.sort(locations, new Comparator<BlockLocation>() {
-              @Override
-              public int compare(BlockLocation o1, BlockLocation o2) {
-                long diff = o1.getOffset() - o2.getOffset();
-                if (diff < 0) return -1;
-                if (diff > 0) return 1;
-                return 0;
-              }
-            });
-            BlockLocation firstBlock = locations[0];
-            BlockLocation lastBlock = locations[locations.length - 1];
+            int indexBegin = getStartBlockIndex(startOffsets, offsetBegin);
+            int indexEnd = getEndBlockIndex(startOffsets, offsetEnd);
+            BlockLocation firstBlock = locations[indexBegin];
+            BlockLocation lastBlock = locations[indexEnd-1];
             long lastBlockOffsetBegin = lastBlock.getOffset();
             long lastBlockOffsetEnd =
                 lastBlockOffsetBegin + lastBlock.getLength();
@@ -719,6 +798,7 @@ class ColumnGroup {
             	// only if this is not the last chunk
                 offsetEnd = lastBlockOffsetBegin;
                 splitBytes = offsetEnd - offsetBegin;
+                indexEnd--;
               }
             }
             else if ((lastBlockOffsetEnd > offsetEnd)
@@ -732,22 +812,44 @@ class ColumnGroup {
             if (key == null) {
               offsetEnd = fileLen;
               splitBytes = offsetEnd - offsetBegin;
-              key = reader.getLastKey();
+              if (i < paths.length-1)
+              {
+                nextFsdis = fs.open(paths[i+1]);
+                nextReader = new TFile.Reader(nextFsdis, tfileStatus[i+1].getLen(), conf);
+                key = nextReader.getFirstKey();
+              }
               done = true; // TFile index too large? Is it necessary now?
             }
             remainLen -= splitBytes;
+            batchSize += splitBytes;
 
-            BlockDistribution bd = new BlockDistribution();
-            for (BlockLocation l : locations) {
-              long blkBeginOffset = l.getOffset();
-              long blkEndOffset = blkBeginOffset + l.getLength();
-              if (blkBeginOffset < offsetBegin) blkBeginOffset = offsetBegin;
-              if (blkEndOffset > offsetEnd) blkEndOffset = offsetEnd;
-              if (blkEndOffset > blkBeginOffset) {
-                bd.add(l, blkEndOffset - blkBeginOffset);
+            if (key != null && batchSize >= stepSize)
+            {
+              if (batchSize - splitBytes < EPSILON || splitBytes < EPSILON)
+              {
+                // the last chunk or this chunk is small enough to create a new range for this key
+                setBlockDistribution(bd, reader, locations, fstatus, startOffsets, prevKey, key);
+                ret.add(key, bd);
+                batchSize = 0;
+                bd = new BlockDistribution();
+              } else {
+                ret.add(prevKey, bd);
+                batchSize = splitBytes;
+                bd = new BlockDistribution();
+                if (batchSize >= stepSize)
+                {
+                  setBlockDistribution(bd, reader, locations, fstatus, startOffsets, prevKey, key);
+                  ret.add(key, bd);
+                  batchSize = 0;
+                  bd = new BlockDistribution();
+                } else {
+                  setBlockDistribution(bd, reader, locations, fstatus, startOffsets, prevKey, key);
+                }
               }
+            } else {
+              setBlockDistribution(bd, reader, locations, fstatus, startOffsets, prevKey, key);
             }
-            ret.add(key, bd);
+            prevKey = key;
           }
         }
         finally {
@@ -769,10 +871,47 @@ class ColumnGroup {
           }
         }
       }
-
+      if (lastBd != null)
+        lastBd.add(bd);
+      ret.setMinStepSize(minStepSize);
+      
       return ret;
     }
 
+    private void setBlockDistribution(BlockDistribution bd, TFile.Reader reader,
+        BlockLocation[] locations, FileStatus fileStatus, long[] startOffsets,
+        RawComparable begin, RawComparable end) throws IOException
+    {
+      long beginOffset, endOffset = -1;
+      if (begin == null)
+        beginOffset = 0;
+      else
+        beginOffset = reader.getOffsetForKey(begin);
+      if (end != null)
+      {
+        if (begin == null)
+          begin = reader.getFirstKey();
+        /* Only if the key range is empty. This is needed because TFile has a 16-byte
+         * Magic that causes getOffsetForKey to return 16 (not 0) even on the first key.
+         */
+        if (comparator.compare(begin, end) != 0)
+          endOffset = reader.getOffsetForKey(end);
+      }
+      int startBlockIndex = (beginOffset == 0 ? 0 : getStartBlockIndex(startOffsets, beginOffset));
+      BlockLocation l;
+      int endBlockIndex = (end == null ? locations.length : endOffset == -1 ?
+          startBlockIndex : getEndBlockIndex(startOffsets, endOffset));
+      for (int ii = startBlockIndex; ii < endBlockIndex; ii++) {
+        l = locations[ii];
+        long blkBeginOffset = l.getOffset();
+        long blkEndOffset = blkBeginOffset + l.getLength();
+        if (blkEndOffset > blkBeginOffset) {
+          bd.add(l, blkEndOffset - blkBeginOffset);
+        }
+      }
+      return;
+    }
+
     /**
      * Get the status of the ColumnGroup.
      */
@@ -807,7 +946,6 @@ class ColumnGroup {
         lst.add(new CGRangeSplit(beginIndex, endIndex - beginIndex));
         beginIndex = endIndex;
       }
-
       return lst;
     }
 
@@ -822,21 +960,35 @@ class ColumnGroup {
      * @return A list of CGRowSplit objects. 
      *         
      */
-    public List<CGRowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths, int[] batches, int numBatches) throws IOException {
+    public List<CGRowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths,
+        int[] batches, int numBatches) throws IOException {
       List<CGRowSplit> lst = new ArrayList<CGRowSplit>();
       CGRowSplit cgRowSplit;
-      long startFirst, bytesFirst, startLast, bytesLast;
+      long startFirst, bytesFirst, bytesLast;
       int length;
        
-      if (numBatches > 0 && cgindex == null)
+      if (numBatches == 0)
+      {
+        cgRowSplit = new CGRowSplit(null, null, 0, -1, 0, 0);
+        lst.add(cgRowSplit);
+        return lst;
+      }
+
+      if (cgindex == null)
         cgindex = buildIndex(fs, this.path, dirty, conf);
 
+      if (cgindex.size() == 0)
+      {
+        cgRowSplit = new CGRowSplit(null, null, 0, -1, 0, 0);
+        lst.add(cgRowSplit);
+        return lst;
+      }
+
       for (int i=0; i< numBatches; i++) {
         int indexFirst = batches[i];
         int indexLast = batches[i+1] - 1;
         startFirst = starts[indexFirst];
         bytesFirst = lengths[indexFirst];
-        startLast = starts[indexLast];
         bytesLast = lengths[indexLast];
         length = batches[i+1] - batches[i];
         String[] namesInSplit = new String[length];
@@ -846,8 +998,8 @@ class ColumnGroup {
           namesInSplit[j] = paths[indexFirst+j].getName();
           sizesInSplit[j] = cgindex.get(cgindex.getFileIndex(paths[indexFirst+j])).bytes;
         }
-        cgRowSplit = new CGRowSplit(namesInSplit, sizesInSplit, fs, conf, length, 
-                startFirst, bytesFirst, startLast, bytesLast);
+        cgRowSplit = new CGRowSplit(namesInSplit, sizesInSplit, length, 
+                startFirst, bytesFirst, bytesLast);
         lst.add(cgRowSplit);
       }
       
@@ -912,9 +1064,8 @@ class ColumnGroup {
             if (first && rowRange.startByteFirst != -1)
               scanner = reader.createScannerByRecordNum(rowRange.startRowFirst, 
                                               rowRange.startRowFirst + rowRange.numRowsFirst);
-            else if (last && rowRange.startByteLast != -1)
-              scanner = reader.createScannerByRecordNum(rowRange.startRowLast, 
-                  rowRange.startRowLast + rowRange.numRowsLast);
+            else if (last && rowRange.numBytesLast != -1)
+              scanner = reader.createScannerByRecordNum(0, rowRange.numRowsLast);
             else
               scanner = reader.createScanner();
           } else {
@@ -977,6 +1128,7 @@ class ColumnGroup {
         DataInputStream dis = scanner.entry().getValueStream();
         try {
           tupleReader.get(dis, val);
+          
         }
         finally {
           dis.close();
@@ -1349,16 +1501,13 @@ class ColumnGroup {
       long numBytesFirst;
       long startRowFirst = -1;
       long numRowsFirst = -1;
-      long startByteLast = -1;
-      long numBytesLast;
-      long startRowLast = -1;
+      long numBytesLast = -1;
       long numRowsLast = -1;
       String[] names;
       long[] sizes = null;
 
-      CGRowSplit(String[] names, long[] sizes, FileSystem fs, Configuration conf,
-          int length, long startFirst, long bytesFirst,
-          long startLast, long bytesLast) throws IOException {
+      CGRowSplit(String[] names, long[] sizes, int length, long startFirst, long bytesFirst,
+          long bytesLast) throws IOException {
         this.names = names;
         this.sizes = sizes;
         this.length = length;
@@ -1368,9 +1517,8 @@ class ColumnGroup {
           startByteFirst = startFirst;
           numBytesFirst = bytesFirst;
         }
-        if (startLast != -1 && this.length > 1)
+        if (bytesLast != -1 && this.length > 1)
         {
-          startByteLast = startLast;
           numBytesLast = bytesLast;
         }
       }
@@ -1392,9 +1540,7 @@ class ColumnGroup {
         sb.append("{numBytesFirst = " + numBytesFirst + "}\n");
         sb.append("{startRowFirst = " + startRowFirst + "}\n");
         sb.append("{numRowsFirst = " + numRowsFirst + "}\n");
-        sb.append("{startByteLast = " + startByteLast + "}\n");
         sb.append("{numBytesLast = " + numBytesLast + "}\n");
-        sb.append("{startRowLast = " + startRowLast + "}\n");
         sb.append("{numRowsLast = " + numRowsLast + "}\n");
         
         return sb.toString();
@@ -1417,9 +1563,7 @@ class ColumnGroup {
         numBytesFirst = Utils.readVLong(in);
         startRowFirst = Utils.readVLong(in);
         numRowsFirst = Utils.readVLong(in);
-        startByteLast = Utils.readVLong(in);
         numBytesLast = Utils.readVLong(in);
-        startRowLast = Utils.readVLong(in);
         numRowsLast = Utils.readVLong(in);
       }
 
@@ -1435,9 +1579,7 @@ class ColumnGroup {
         Utils.writeVLong(out, numBytesFirst);
         Utils.writeVLong(out, startRowFirst);
         Utils.writeVLong(out, numRowsFirst);
-        Utils.writeVLong(out, startByteLast);
         Utils.writeVLong(out, numBytesLast);
-        Utils.writeVLong(out, startRowLast);
         Utils.writeVLong(out, numRowsLast);
       }
     }
@@ -1876,7 +2018,6 @@ class ColumnGroup {
           out.close();
           out = null;
           // do renaming only if all the above is successful.
-//          fs.rename(new Path(path, tmpName), new Path(path, name));
           fs.rename(new Path(path, tmpName), new Path(finalOutputPath, name));
 
 /*

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java Mon Mar 22 06:26:41 2010
@@ -21,6 +21,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.io.IOException;
 
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.zebra.tfile.RawComparable;
@@ -33,50 +34,37 @@ import org.apache.hadoop.zebra.tfile.Byt
  */
 public class KeyDistribution {
   private long uniqueBytes;
+  private long minStepSize = -1;
   private SortedMap<RawComparable, BlockDistribution> data;
 
   KeyDistribution(Comparator<? super RawComparable> comparator) {
     data = new TreeMap<RawComparable, BlockDistribution>(comparator);
   }
 
-  void add(RawComparable key, BlockDistribution bucket) {
+  void add(RawComparable key) {
+    data.put(key, null);
+  }
+  
+  void add(RawComparable key, BlockDistribution bucket)
+  {
     uniqueBytes += bucket.getLength();
     data.put(key, BlockDistribution.sum(data.get(key), bucket));
   }
-
-  void add(KeyDistribution other) {
-    this.uniqueBytes += other.uniqueBytes;
-    reduceKeyDistri(this.data, other.data);
-  }
-
-  static void reduceKeyDistri(SortedMap<RawComparable, BlockDistribution> lv,
-      SortedMap<RawComparable, BlockDistribution> rv) {
-    for (Iterator<Map.Entry<RawComparable, BlockDistribution>> it =
-        rv.entrySet().iterator(); it.hasNext();) {
-      Map.Entry<RawComparable, BlockDistribution> e = it.next();
-      RawComparable key = e.getKey();
-      BlockDistribution sum = lv.get(key);
-      BlockDistribution delta = e.getValue();
-      lv.put(key, BlockDistribution.sum(sum, delta));
-    }
+  
+  void setMinStepSize(long minStepSize)
+  {
+    this.minStepSize = minStepSize;
   }
 
   /**
-   * Aggregate two key distributions.
+   * Get the total unique bytes contained in the key-partitioned buckets.
    * 
-   * @param a
-   *          first key distribution (can be null)
-   * @param b
-   *          second key distribution (can be null)
-   * @return the aggregated key distribution.
+   * @return The total number of bytes contained in the key-partitioned buckets.
    */
-  public static KeyDistribution sum(KeyDistribution a, KeyDistribution b) {
-    if (a == null) return b;
-    if (b == null) return a;
-    a.add(b);
-    return a;
+  public long length() {
+    return uniqueBytes;
   }
-  
+
   /**
    * Get the size of the key sampling.
    * 
@@ -87,74 +75,160 @@ public class KeyDistribution {
   }
 
   /**
-   * Get the total unique bytes contained in the key-partitioned buckets.
+   * Get the minimum split step size from all tables in union
+   */
+  public long getMinStepSize() {
+    return minStepSize;
+  }
+ 
+  /** Get the list of sampling keys
    * 
-   * @return The total number of bytes contained in the key-partitioned buckets.
+   * @return A list of sampling keys
    */
-  public long length() {
-    return uniqueBytes;
+  public RawComparable[] getKeys() {
+    RawComparable[] ret = new RawComparable[data.size()];
+    return data.keySet().toArray(ret);
   }
-
+  
+  public BlockDistribution getBlockDistribution(RawComparable key) {
+    return data.get(key);
+  }
+  
   /**
-   * Resize the key samples
+   * Merge the key samples
    * 
-   * @param n
-   *          targeted sampling size
-   * @return the actual size after the resize().
-   */
-  public int resize(int n) {
-    Iterator<Map.Entry<RawComparable, BlockDistribution>> it =
-        data.entrySet().iterator();
-    KeyDistribution adjusted = new KeyDistribution(data.comparator());
-    for (int i = 0; i < n; ++i) {
-      long targetMarker = (i + 1) * uniqueBytes / n;
-      if (adjusted.uniqueBytes >= targetMarker) {
-        continue;
+   * Algorithm: select the smallest key from all clean source ranges and ranges subsequent to
+   *            respective dirty ranges. A dirty range is a range that has been partially needed
+   *            by one or more of the previous final ranges.  
+   *
+   * @param sourceKeys
+   *          key samples to be merged
+   * @return the merged key samples
+   */
+  public static KeyDistribution merge(KeyDistribution[] sourceKeys) throws IOException {
+    if (sourceKeys == null || sourceKeys.length == 0)
+      return null;
+    int srcSize = sourceKeys.length;
+    if (srcSize == 1)
+      return sourceKeys[0];
+    
+    Comparator<? super RawComparable> comp = sourceKeys[0].data.comparator();
+    // TODO check the identical comparators used in the source keys
+    /*
+    for (int i = 1; i < srcSize; i++)
+      if (!comp.equals(sourceKeys[i].data.comparator()))
+        throw new IOException("Incompatible sort keys found:" + comp.toString() + " vs. "+ sourceKeys[i].data.comparator().toString());
+     */
+    
+    KeyDistribution result = new KeyDistribution(comp);
+    
+    result.minStepSize = sourceKeys[0].minStepSize;
+    for (int i = 1; i < srcSize; i++)
+      if (result.minStepSize > sourceKeys[i].minStepSize)
+        result.minStepSize = sourceKeys[i].minStepSize;
+    
+    RawComparable[][] its = new RawComparable[srcSize][];
+    for (int i = 0; i < srcSize; i++)
+      its[i] = sourceKeys[i].getKeys();
+    RawComparable min, current;
+    int minIndex = -1;
+    int[] index = new int[srcSize];
+    boolean[] dirty = new boolean[srcSize];
+    while (true)
+    {
+      min = null;
+      BlockDistribution bd = new BlockDistribution();
+      for (int i = 0; i < srcSize; i++)
+      {
+        if (index[i] >= its[i].length)
+          continue;
+        current = its[i][index[i]];
+        bd.add(sourceKeys[i].getBlockDistribution(current));
+        if (min == null || comp.compare(min, current) > 0)
+        {
+          min = current;
+          minIndex = i;
+        }
       }
-      RawComparable key = null;
-      do {
-        Map.Entry<RawComparable, BlockDistribution> e = it.next();
-        if (key == null) {
-          key = e.getKey();
+      if (min == null)
+        break;
+
+      result.add(min, bd);
+      for (int i = 0; i < srcSize; i++)
+      {
+        if (index[i] >= its[i].length)
+          continue;
+        current = its[i][index[i]];
+        if (i != minIndex)
+        {
+          if (comp.compare(min, current) != 0)
+          {
+            if (!dirty[i])
+            {
+              dirty[i] = true;
+              index[i]++;
+            } else if (comp.compare(min, its[i][index[i] - 1]) > 0 )
+              index[i]++;
+          } else {
+            if (dirty[i])
+              dirty[i] = false;
+            index[i]++;
+          }
+        } else {
+          if (dirty[i])
+            dirty[i] = false;
+          index[i]++;
         }
-        adjusted.add(key, e.getValue());
       }
-      while (adjusted.uniqueBytes < targetMarker);
     }
-
+    return result;
+  }
+  
+  public int resize(BlockDistribution lastBd)
+  {
+    Iterator<Map.Entry<RawComparable, BlockDistribution>> it =
+      data.entrySet().iterator();
+    KeyDistribution adjusted = new KeyDistribution(data.comparator());
+    long realSize = 0, mySize = 0;
+    RawComparable key = null;
+    BlockDistribution bd = null, bd0 = null;
+    while (it.hasNext())
+    {
+      Map.Entry<RawComparable, BlockDistribution> mapEntry = it.next();
+      bd0 = mapEntry.getValue();
+      mySize = bd0.getLength();
+      if (realSize >= minStepSize/2 ||
+          (realSize + mySize >= minStepSize*ColumnGroup.SPLIT_SLOP && 
+              realSize >= minStepSize * (ColumnGroup.SPLIT_SLOP-1)))
+      {
+        adjusted.add(key, bd);
+        bd = null;
+        realSize = 0;
+      }
+      key = mapEntry.getKey();
+      realSize += mySize;
+      bd = BlockDistribution.sum(bd, bd0);
+    }
+    if (bd != null)
+    {
+      realSize += lastBd.getLength();
+      if (realSize >= minStepSize/2 || adjusted.size() == 0)
+      {
+         // the last plus would contain more than liked, don't merge them.
+        adjusted.add(key, bd);
+      } else
+        BlockDistribution.sum(lastBd, bd);
+    }
     swap(adjusted);
     return data.size();
   }
   
-  void swap(KeyDistribution other) {
-    long tmp = uniqueBytes;
-    uniqueBytes = other.uniqueBytes;
-    other.uniqueBytes = tmp;
+  private void swap(KeyDistribution other) {
+    long tmp = minStepSize;
+    minStepSize = other.minStepSize;
+    other.minStepSize = tmp;
     SortedMap<RawComparable, BlockDistribution> tmp2 = data;
     data = other.data;
     other.data = tmp2;
   }
-  
-  /**
-   * Get the list of sampling keys.
-   * 
-   * @return A list of sampling keys.
-   */
-  public RawComparable[] getKeys() {
-    RawComparable[] ret = new RawComparable[data.size()];
-    return data.keySet().toArray(ret);
-
-  }
-
-  /**
-   * Get the block distribution of all data that maps to the key bucket.
-   */
-  public BlockDistribution getBlockDistribution(BytesWritable key) {
-    ByteArray key0 = new ByteArray(key.getBytes(), 0, key.getLength());
-    BlockDistribution bInfo = data.get(key0);
-    if (bInfo == null) {
-      throw new IllegalArgumentException("Invalid key");
-    }
-    return bInfo;
-  }
 }

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Mon Mar 22 06:26:41 2010
@@ -382,7 +382,7 @@ public class TableInputFormat implements
 		 {
 			 throw new IOException("The table is not properly sorted");
 		 }
-	 } else {
+    } else {
 		 List<LeafTableInfo> leaves = expr.getLeafTables(null);
 		 for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext(); )
 		 {
@@ -406,8 +406,8 @@ public class TableInputFormat implements
        }
 		 }
 	 }
-	 // need key range input splits for sorted table union
-	 setSorted(conf);
+    // need key range input splits for sorted table union
+    setSorted(conf);
   }
   
   /**
@@ -509,7 +509,7 @@ public class TableInputFormat implements
       BlockDistribution bd = null;
       for (Iterator<BasicTable.Reader> it = readers.iterator(); it.hasNext();) {
         BasicTable.Reader reader = it.next();
-        bd = BlockDistribution.sum(bd, reader.getBlockDistribution((RangeSplit)null));
+        bd = BlockDistribution.sum(bd, reader.getBlockDistribution((RangeSplit) null));
       }
       
       SortedTableSplit split = new SortedTableSplit(null, null, bd, conf);
@@ -519,44 +519,49 @@ public class TableInputFormat implements
     // TODO: Does it make sense to interleave keys for all leaf tables if
     // numSplits <= 0 ?
     int nLeaves = readers.size();
-    KeyDistribution keyDistri = null;
+    BlockDistribution lastBd = new BlockDistribution();
+    ArrayList<KeyDistribution> btKeyDistributions = new ArrayList<KeyDistribution>();
     for (int i = 0; i < nLeaves; ++i) {
       KeyDistribution btKeyDistri =
           readers.get(i).getKeyDistribution(
               (numSplits <= 0) ? -1 :
-              Math.max(numSplits * 5 / nLeaves, numSplits));
-      keyDistri = KeyDistribution.sum(keyDistri, btKeyDistri);
+              Math.max(numSplits * 5 / nLeaves, numSplits), nLeaves, lastBd);
+      btKeyDistributions.add(btKeyDistri);
     }
+    int btSize = btKeyDistributions.size();
+    KeyDistribution[] btKds = new KeyDistribution[btSize];
+    Object[] btArray = btKeyDistributions.toArray();
+    for (int i = 0; i < btSize; i++)
+      btKds[i] = (KeyDistribution) btArray[i];
     
+    KeyDistribution keyDistri = KeyDistribution.merge(btKds);
+
     if (keyDistri == null) {
       // should never happen.
       SortedTableSplit split = new SortedTableSplit(null, null, null, conf);
       return new InputSplit[] { split };
     }
     
-    if (numSplits > 0) {
-      keyDistri.resize(numSplits);
-    }
-    
-    RawComparable[] rawKeys = keyDistri.getKeys();
-    BytesWritable[] keys = new BytesWritable[rawKeys.length];
-    for (int i=0; i<keys.length; ++i) {
-      RawComparable rawKey = rawKeys[i];
-      keys[i] = new BytesWritable();
-      keys[i].setSize(rawKey.size());
-      System.arraycopy(rawKey.buffer(), rawKey.offset(), keys[i].get(), 0,
-          rawKey.size());
-    }
+    keyDistri.resize(lastBd);
     
-    // TODO: Should we change to RawComparable to avoid the creation of
-    // BytesWritables?
-    for (int i = 0; i < keys.length; ++i) {
-      BytesWritable begin = (i == 0) ? null : keys[i - 1];
-      BytesWritable end = (i == keys.length - 1) ? null : keys[i];
-      BlockDistribution bd = keyDistri.getBlockDistribution(keys[i]);
-      SortedTableSplit split = new SortedTableSplit(begin, end, bd, conf);
+    RawComparable[] keys = keyDistri.getKeys();
+    for (int i = 0; i <= keys.length; ++i) {
+      RawComparable begin = (i == 0) ? null : keys[i - 1];
+      RawComparable end = (i == keys.length) ? null : keys[i];
+      BlockDistribution bd;
+      if (i < keys.length)
+        bd = keyDistri.getBlockDistribution(keys[i]);
+      else
+        bd = lastBd;
+      BytesWritable beginB = null, endB = null;
+      if (begin != null)
+        beginB = new BytesWritable(begin.buffer());
+      if (end != null)
+        endB = new BytesWritable(end.buffer());
+      SortedTableSplit split = new SortedTableSplit(beginB, endB, bd, conf);
       splits.add(split);
     }
+
     return splits.toArray(new InputSplit[splits.size()]);
   }
   

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java Mon Mar 22 06:26:41 2010
@@ -53,16 +53,17 @@ public class TableRecordReader implement
   public TableRecordReader(TableExpr expr, String projection,
       InputSplit split,
       JobConf conf) throws IOException, ParseException {
-    if (expr.sortedSplitRequired()) {
+    if (split != null && split instanceof RowTableSplit) {
+      RowTableSplit rowSplit = (RowTableSplit) split;
+      if (!expr.sortedSplitRequired() && Projection.getVirtualColumnIndices(projection) != null)
+        throw new IllegalArgumentException("virtual column requires union of multiple sorted tables");
+      scanner = expr.getScanner(rowSplit, projection, conf);
+    } else if (expr.sortedSplitRequired()) {
       SortedTableSplit tblSplit = (SortedTableSplit) split;
       scanner =
           expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection,
               conf);
-    } else if (split != null && split instanceof RowTableSplit) {
-      RowTableSplit rowSplit = (RowTableSplit) split;
-      scanner = expr.getScanner(rowSplit, projection, conf);
-    }
-    else {
+    } else {
       UnsortedTableSplit tblSplit = (UnsortedTableSplit) split;
       scanner = expr.getScanner(tblSplit, projection, conf);
     }

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java Mon Mar 22 06:26:41 2010
@@ -546,7 +546,7 @@ public class TableInputFormat extends In
       BlockDistribution bd = null;
       for (Iterator<BasicTable.Reader> it = readers.iterator(); it.hasNext();) {
         BasicTable.Reader reader = it.next();
-        bd = BlockDistribution.sum(bd, reader.getBlockDistribution((RangeSplit)null));
+        bd = BlockDistribution.sum(bd, reader.getBlockDistribution((RangeSplit) null));
       }
       
       SortedTableSplit split = new SortedTableSplit(null, null, bd, conf);
@@ -557,41 +557,46 @@ public class TableInputFormat extends In
     // TODO: Does it make sense to interleave keys for all leaf tables if
     // numSplits <= 0 ?
     int nLeaves = readers.size();
-    KeyDistribution keyDistri = null;
+    BlockDistribution lastBd = new BlockDistribution();
+    ArrayList<KeyDistribution> btKeyDistributions = new ArrayList<KeyDistribution>();
     for (int i = 0; i < nLeaves; ++i) {
       KeyDistribution btKeyDistri =
           readers.get(i).getKeyDistribution(
               (numSplits <= 0) ? -1 :
-              Math.max(numSplits * 5 / nLeaves, numSplits));
-      keyDistri = KeyDistribution.sum(keyDistri, btKeyDistri);
+              Math.max(numSplits * 5 / nLeaves, numSplits), nLeaves, lastBd);
+      btKeyDistributions.add(btKeyDistri);
     }
     
+    int btSize = btKeyDistributions.size();
+    KeyDistribution[] btKds = new KeyDistribution[btSize];
+    Object[] btArray = btKeyDistributions.toArray();
+    for (int i = 0; i < btSize; i++)
+      btKds[i] = (KeyDistribution) btArray[i];
+
+    KeyDistribution keyDistri = KeyDistribution.merge(btKds);
+
     if (keyDistri == null) {
       // should never happen.
        return splits;
     }
-    
-    if (numSplits > 0) {
-      keyDistri.resize(numSplits);
-    }
-    
-    RawComparable[] rawKeys = keyDistri.getKeys();
-    BytesWritable[] keys = new BytesWritable[rawKeys.length];
-    for (int i=0; i<keys.length; ++i) {
-      RawComparable rawKey = rawKeys[i];
-      keys[i] = new BytesWritable();
-      keys[i].setSize(rawKey.size());
-      System.arraycopy(rawKey.buffer(), rawKey.offset(), keys[i].getBytes(), 0,
-          rawKey.size());
-    }
-    
-    // TODO: Should we change to RawComparable to avoid the creation of
-    // BytesWritables?
-    for (int i = 0; i < keys.length; ++i) {
-      BytesWritable begin = (i == 0) ? null : keys[i - 1];
-      BytesWritable end = (i == keys.length - 1) ? null : keys[i];
-      BlockDistribution bd = keyDistri.getBlockDistribution(keys[i]);
-      SortedTableSplit split = new SortedTableSplit(begin, end, bd, conf);
+
+    keyDistri.resize(lastBd);
+
+    RawComparable[] keys = keyDistri.getKeys();
+    for (int i = 0; i <= keys.length; ++i) {
+      RawComparable begin = (i == 0) ? null : keys[i - 1];
+      RawComparable end = (i == keys.length) ? null : keys[i];
+      BlockDistribution bd;
+      if (i < keys.length)
+        bd = keyDistri.getBlockDistribution(keys[i]);
+      else
+        bd = lastBd;
+      BytesWritable beginB = null, endB = null;
+      if (begin != null)
+        beginB = new BytesWritable(begin.buffer());
+      if (end != null)
+        endB = new BytesWritable(end.buffer());
+      SortedTableSplit split = new SortedTableSplit(beginB, endB, bd, conf);
       splits.add(split);
     }
     return splits;

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFile.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFile.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFile.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFile.java Mon Mar 22 06:26:41 2010
@@ -1059,6 +1059,25 @@ public class TFile {
       return new ByteArray(tfileIndex.getEntry(blockIndex).key);
     }
 
+    public long getOffsetForKey(RawComparable key) throws IOException {
+      Location l = getBlockContainsKey(key, false);
+      int blockIndex = l.getBlockIndex();
+      if (blockIndex == end.blockIndex)
+      {
+        if (blockIndex > 0)
+        {
+          BCFile.Reader.BlockReader blkReader = readerBCF.getDataBlock(blockIndex - 1);
+          return blkReader.getStartPos() + blkReader.getCompressedSize();
+        } else
+          return 0;
+      } else
+        return readerBCF.getDataBlock(blockIndex).getStartPos();
+    }
+    
+    public long getLastDataOffset() throws IOException {
+      BCFile.Reader.BlockReader blkReader = readerBCF.getDataBlock(end.blockIndex - 1);
+      return blkReader.getStartPos() + blkReader.getCompressedSize();
+    }
     /**
      * Get a scanner than can scan the whole TFile.
      * 

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java Mon Mar 22 06:26:41 2010
@@ -170,13 +170,14 @@ public class TestBasicTable {
     BasicTable.Reader reader = new BasicTable.Reader(path, conf);
     reader.setProjection(strProjection);
     long totalBytes = reader.getStatus().getSize();
-    KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10);
-    Assert.assertEquals(totalBytes, keyDistri.length());
+    BlockDistribution lastBd = new BlockDistribution();
+    KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10, 1, lastBd);
+    Assert.assertEquals(totalBytes, keyDistri.length()+lastBd.getLength());
     reader.close();
     BytesWritable[] keys = null;
     if (keyDistri.size() >= numSplits) {
-      keyDistri.resize(numSplits);
-      Assert.assertEquals(totalBytes, keyDistri.length());
+      keyDistri.resize(lastBd);
+      Assert.assertEquals(totalBytes, keyDistri.length()+lastBd.getLength());
       RawComparable[] rawComparables = keyDistri.getKeys();
       keys = new BytesWritable[rawComparables.length];
       for (int i = 0; i < keys.length; ++i) {

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java Mon Mar 22 06:26:41 2010
@@ -285,13 +285,14 @@ public class TestColumnGroup {
     ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
     reader.setProjection(strProjection);
     long totalBytes = reader.getStatus().getSize();
-    KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10);
-    Assert.assertEquals(totalBytes, keyDistri.length());
+    BlockDistribution lastBd = new BlockDistribution();
+    KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10, 1, lastBd);
+    Assert.assertEquals(totalBytes, keyDistri.length()+lastBd.getLength());
     reader.close();
     BytesWritable[] keys = null;
     if (keyDistri.size() >= numSplits) {
-      keyDistri.resize(numSplits);
-      Assert.assertEquals(totalBytes, keyDistri.length());
+      keyDistri.resize(lastBd);
+      Assert.assertEquals(totalBytes, keyDistri.length()+lastBd.getLength());
       RawComparable[] rawComparables = keyDistri.getKeys();
       keys = new BytesWritable[rawComparables.length];
       for (int i = 0; i < keys.length; ++i) {

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java Mon Mar 22 06:26:41 2010
@@ -286,13 +286,14 @@ public class TestColumnGroupWithWorkPath
     ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
     reader.setProjection(strProjection);
     long totalBytes = reader.getStatus().getSize();
-    KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10);
-    Assert.assertEquals(totalBytes, keyDistri.length());
+    BlockDistribution lastBd = new BlockDistribution();
+    KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10, 1, lastBd);
+    Assert.assertEquals(totalBytes, keyDistri.length()+lastBd.getLength());
     reader.close();
     BytesWritable[] keys = null;
     if (keyDistri.size() >= numSplits) {
-      keyDistri.resize(numSplits);
-      Assert.assertEquals(totalBytes, keyDistri.length());
+      keyDistri.resize(lastBd);
+      Assert.assertEquals(totalBytes, keyDistri.length()+lastBd.getLength());
       RawComparable[] rawComparables = keyDistri.getKeys();
       keys = new BytesWritable[rawComparables.length];
       for (int i = 0; i < keys.length; ++i) {