You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/03/09 00:20:34 UTC

svn commit: r920562 - in /hadoop/pig/trunk/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/ src/java/org/apache/hadoop/zebra/mapreduce/ src/test/org/apache/hadoop/zebra/io/

Author: yanz
Date: Mon Mar  8 23:20:34 2010
New Revision: 920562

URL: http://svn.apache.org/viewvc?rev=920562&view=rev
Log:
PIG-1198: performance improvements through use of unsorted input splits that span multiple files (yanz)

Modified:
    hadoop/pig/trunk/contrib/zebra/CHANGES.txt
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
    hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
    hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java

Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=920562&r1=920561&r2=920562&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Mon Mar  8 23:20:34 2010
@@ -58,6 +58,8 @@
 
   OPTIMIZATIONS
 
+    PIG-1198: performance improvements through use of unsorted input splits that span multiple files (yanz)
+
   BUG FIXES
 
     PIG-1276: Pig resource schema interface changed, so Zebra needs to catch exception thrown from the new interfaces. (xuefuz via yanz)

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=920562&r1=920561&r2=920562&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Mon Mar  8 23:20:34 2010
@@ -44,6 +44,7 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.zebra.tfile.RawComparable;
 import org.apache.hadoop.zebra.tfile.TFile;
 import org.apache.hadoop.zebra.tfile.Utils;
 import org.apache.hadoop.zebra.tfile.MetaBlockAlreadyExists;
@@ -231,6 +232,7 @@
     private MetaFile.Reader metaReader;
     private BasicTableStatus status;
     private int firstValidCG = -1; /// First column group that exists.
+    private int rowSplitCGIndex = -1;
     Partition partition;
     ColumnGroup.Reader[] colGroups;
     Tuple[] cgTuples;
@@ -257,6 +259,7 @@
         }
         partition.setSource(cgTuples);
         inferredMapping = true;
+        buildStatus();
       }
       else {
         // the projection is not changed, so we do not need to recalculate the
@@ -434,9 +437,12 @@
     public BlockDistribution getBlockDistribution(RangeSplit split)
         throws IOException {
       BlockDistribution bd = new BlockDistribution();
-      for (int nx = 0; nx < colGroups.length; nx++) {
-        if (!isCGDeleted(nx)) {
-          bd.add(colGroups[nx].getBlockDistribution(split == null ? null : split.getCGRangeSplit()));
+      if (firstValidCG >= 0)
+      {
+        for (int nx = 0; nx < colGroups.length; nx++) {
+          if (partition.isCGNeeded(nx) && !isCGDeleted(nx)) {
+            bd.add(colGroups[nx].getBlockDistribution(split == null ? null : split.getCGRangeSplit()));
+          }
         }
       }
       return bd;
@@ -644,10 +650,9 @@
      *         construct a TableScanner later. 
      *         
      */
-    public List<RowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths, int splitCGIndex) throws IOException {
-      List<RowSplit> ret; 
-      
-      List<CGRowSplit> cgSplits = colGroups[splitCGIndex].rowSplit(starts, lengths, paths);
+    public List<RowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths, int splitCGIndex, int[] batchSizes, int numBatches) throws IOException {
+      List<RowSplit> ret;      
+      List<CGRowSplit> cgSplits = colGroups[splitCGIndex].rowSplit(starts, lengths, paths, batchSizes, numBatches);
       int numSlices = cgSplits.size();
       ret = new ArrayList<RowSplit>(numSlices);
       for (int slice = 0; slice < numSlices; slice++) {
@@ -658,6 +663,15 @@
       return ret;
     }
     
+    /**
+     * Rearrange the files according to the column group index ordering
+     * 
+     * @param filestatus array of FileStatus to be rearraged on 
+     */
+    public void rearrangeFileIndices(FileStatus[] fileStatus) throws IOException
+    {
+      colGroups[getRowSplitCGIndex()].rearrangeFileIndices(fileStatus);
+    }
 
     /** 
      * Get index of the column group that will be used for row-based split. 
@@ -665,32 +679,33 @@
      */
     public int getRowSplitCGIndex() throws IOException {
       // Try to find the largest non-deleted and used column group by projection;
-      int largestCGIndex = -1;
-      int splitCGIndex = -1;
-      long largestCGSize = -1;
-      for (int i=0; i<colGroups.length; i++) {
-        if (!partition.isCGNeeded(i) || isCGDeleted(i)) {
-          continue;
-        }
-        ColumnGroup.Reader reader = colGroups[i];
-        BasicTableStatus btStatus = reader.getStatus();
-        long size = btStatus.getSize();
-        if (size > largestCGSize) {
-          largestCGIndex = i;
-          largestCGSize = size;
-        }
-      }
-     
-      /* We do have a largest non-deleted and used column group,
-      and we use it to do split. */
-      if (largestCGIndex >= 0) { 
-        splitCGIndex = largestCGIndex;
-      } else if (firstValidCG >= 0) { /* If all projection columns are either deleted or non-existing,
-                                      then we use the first non-deleted column group to do split if it exists. */
-        splitCGIndex = firstValidCG; 
+      if (rowSplitCGIndex == -1)
+      {
+        int largestCGIndex = -1;
+        long largestCGSize = -1;
+        for (int i=0; i<colGroups.length; i++) {
+          if (!partition.isCGNeeded(i) || isCGDeleted(i)) {
+            continue;
+          }
+          ColumnGroup.Reader reader = colGroups[i];
+          BasicTableStatus btStatus = reader.getStatus();
+          long size = btStatus.getSize();
+          if (size > largestCGSize) {
+            largestCGIndex = i;
+            largestCGSize = size;
+          }
+        }
+       
+        /* We do have a largest non-deleted and used column group,
+        and we use it to do split. */
+        if (largestCGIndex >= 0) { 
+          rowSplitCGIndex = largestCGIndex;
+        } else if (firstValidCG >= 0) { /* If all projection columns are either deleted or non-existing,
+                                        then we use the first non-deleted column group to do split if it exists. */
+          rowSplitCGIndex = firstValidCG; 
+        } 
       } 
-     
-      return splitCGIndex;
+      return rowSplitCGIndex;
     }
 
 
@@ -916,7 +931,7 @@
                        Partition partition) throws IOException {
         init(rowSplit, null, null, null, closeReader, partition);
       }      
-      
+    
       /**
        * Creates new CGRowSplit. If the startRow in rowSplit is not set 
        * (i.e. < 0), it sets the startRow and numRows based on 'startByte' 
@@ -928,17 +943,6 @@
         int cgIdx = rowSplit.getCGIndex();
         
         CGRowSplit cgSplit = new CGRowSplit();
-        cgSplit.name = inputCGSplit.name;
-        // startByte and numBytes from inputCGSplit are ignored, since
-        // they make sense for only one CG.
-        cgSplit.startRow = inputCGSplit.startRow;
-        cgSplit.numRows = inputCGSplit.numRows;
-        cgSplit.size = inputCGSplit.size;
-        
-        if (cgSplit.startRow >= 0) {
-          //assume the rows are already set up.
-          return cgSplit;
-        }
         
         // Find the row range :
         if (isCGDeleted(cgIdx)) {
@@ -946,8 +950,7 @@
         }
         
         //fill the row numbers.
-        colGroups[cgIdx].fillRowSplit(cgSplit, inputCGSplit.startByte,
-                                      inputCGSplit.numBytes);
+        colGroups[cgIdx].fillRowSplit(cgSplit, inputCGSplit);
         return cgSplit;
       }
     
@@ -985,7 +988,7 @@
         if (rowSplit != null) {
           cgRowSplit = makeCGRowSplit(rowSplit);
         }
-        
+
         try {
           schema = partition.getProjection();
           cgScanners = new CGScanner[colGroups.length];

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=920562&r1=920561&r2=920562&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Mon Mar  8 23:20:34 2010
@@ -522,11 +522,21 @@
       }
 
       BlockDistribution ret = new BlockDistribution();
-      if (split.name != null)
+      for (int i = 0; i < split.length; i++)
       {
-        FileStatus tfileStatus = fs.getFileStatus(new Path(path, split.name)); 
+        FileStatus tfileStatus = fs.getFileStatus(new Path(path, split.names[i]));
         
-        BlockLocation[] locations = fs.getFileBlockLocations(tfileStatus, split.startByte, split.numBytes);
+        BlockLocation[] locations = null;
+        if (i == 0) {
+        if (split.startByteFirst != -1)
+          locations = fs.getFileBlockLocations(tfileStatus, split.startByteFirst, split.numBytesFirst);
+        } else if (i == split.length - 1) {
+           if (split.startByteLast != -1)
+             locations = fs.getFileBlockLocations(tfileStatus, split.startByteLast, split.numBytesLast);
+        }
+        if (locations == null)
+          locations = fs.getFileBlockLocations(tfileStatus, 0, tfileStatus.getLen());
+
         for (BlockLocation l : locations) {
           ret.add(l);
         }
@@ -541,44 +551,86 @@
     * It is assumed that 'startByte' and 'numBytes' in rowSplit itself
     * are not valid.
     */
-    void fillRowSplit(CGRowSplit rowSplit, long startOffset, long length) 
-                      throws IOException {
+    void fillRowSplit(CGRowSplit rowSplit, CGRowSplit src)throws IOException {
 
-      if (rowSplit.name == null)
+      if (src.names == null || src.length == 0)
         return;
 
-      Path tfPath = new Path(path, rowSplit.name);
-
-      long size = rowSplit.size;
-      if (size == 0)
+      boolean noSizeInIndex = false;
+      long[] sizes = rowSplit.sizes;
+      if (sizes == null)
       {
         /* the on disk table is sorted. Later this will be made unnecessary when
          * CGIndexEntry serializes its bytes field and the meta file versioning is
          * supported.
          */ 
-        FileStatus tfile = fs.getFileStatus(tfPath);
-        size = tfile.getLen();
+        noSizeInIndex = true;
       }
+      rowSplit.names = src.names;
+      rowSplit.length = src.length;
+      rowSplit.startByteFirst = src.startByteFirst;
+      rowSplit.numBytesFirst = src.numBytesFirst;
+      rowSplit.startByteLast = src.startByteLast;
+      rowSplit.numBytesLast = src.numBytesLast;
+
+      Path firstPath = null, lastPath;
       TFile.Reader reader = null;
       
-      try {
-        reader = new TFile.Reader(fs.open(tfPath),
-                                  size, conf);
-
-        long startRow = reader.getRecordNumNear(startOffset);
-        long endRow = reader.getRecordNumNear(startOffset + length);
+      if (src.startByteFirst != -1)
+      {
+        firstPath = new Path(path, rowSplit.names[0]);
+        long size;
+        if (noSizeInIndex)
+        {
+          FileStatus tfile = fs.getFileStatus(firstPath);
+          size = tfile.getLen();
+        } else
+          size = sizes[0];
+        reader = new TFile.Reader(fs.open(firstPath), size, conf);
+        try {
+          long startRow = reader.getRecordNumNear(src.startByteFirst);
+          long endRow = reader.getRecordNumNear(src.startByteFirst + src.numBytesFirst);
 
-        if (endRow < startRow) {
-          endRow = startRow;
+          if (endRow < startRow)
+            endRow = startRow;
+          rowSplit.startRowFirst = startRow;
+          rowSplit.numRowsFirst = endRow - startRow;
+        } catch (IOException e) {
+          reader.close();
+          throw e;
         }
+      }
+      if (src.startByteLast != -1 && rowSplit.length > 1)
+      {
+        lastPath = new Path(path, rowSplit.names[rowSplit.length - 1]);
+        if (reader == null || !firstPath.equals(lastPath))
+        {
+          if (reader != null)
+            reader.close();
+          long size;
+          if (noSizeInIndex)
+          {
+            FileStatus tfile = fs.getFileStatus(lastPath);
+            size = tfile.getLen();
+          } else
+            size = sizes[rowSplit.length - 1];
+          reader = new TFile.Reader(fs.open(lastPath), size, conf);
+        }
+        try {
+          long startRow = reader.getRecordNumNear(src.startByteLast);
+          long endRow = reader.getRecordNumNear(src.startByteLast + src.numBytesLast);
 
-        rowSplit.startRow = startRow;
-        rowSplit.numRows = endRow - startRow;
-      } finally {
-        if (reader != null) {
+          if (endRow < startRow)
+            endRow = startRow;
+          rowSplit.startRowLast = startRow;
+          rowSplit.numRowsLast = endRow - startRow;
+        } catch (IOException e) {
           reader.close();
+          throw e;
         }
       }
+      if (reader != null)
+        reader.close();
     }
     
     /**
@@ -770,22 +822,55 @@
      * @return A list of CGRowSplit objects. 
      *         
      */
-    public List<CGRowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths) throws IOException {
+    public List<CGRowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths, int[] batches, int numBatches) throws IOException {
       List<CGRowSplit> lst = new ArrayList<CGRowSplit>();
+      CGRowSplit cgRowSplit;
+      long startFirst, bytesFirst, startLast, bytesLast;
+      int length;
        
-      for (int i=0; i<starts.length; i++) {
-        long start = starts[i];
-        long length = lengths[i];
-        Path path = paths[i];
-        if (cgindex == null)
-          cgindex = buildIndex(fs, this.path, dirty, conf);
-        long size = cgindex.get(cgindex.getFileIndex(path)).bytes;
-        lst.add(new CGRowSplit(path.getName(), start, length, size));
+      if (numBatches > 0 && cgindex == null)
+        cgindex = buildIndex(fs, this.path, dirty, conf);
+
+      for (int i=0; i< numBatches; i++) {
+        int indexFirst = batches[i];
+        int indexLast = batches[i+1] - 1;
+        startFirst = starts[indexFirst];
+        bytesFirst = lengths[indexFirst];
+        startLast = starts[indexLast];
+        bytesLast = lengths[indexLast];
+        length = batches[i+1] - batches[i];
+        String[] namesInSplit = new String[length];
+        long[] sizesInSplit = new long[length];
+        for (int j = 0; j < length; j++)
+        {
+          namesInSplit[j] = paths[indexFirst+j].getName();
+          sizesInSplit[j] = cgindex.get(cgindex.getFileIndex(paths[indexFirst+j])).bytes;
+        }
+        cgRowSplit = new CGRowSplit(namesInSplit, sizesInSplit, fs, conf, length, 
+                startFirst, bytesFirst, startLast, bytesLast);
+        lst.add(cgRowSplit);
       }
       
       return lst;
-    } 
+    }
     
+    void rearrangeFileIndices(FileStatus[] fileStatus) throws IOException {
+      int size = fileStatus.length;
+      FileStatus[] result = new FileStatus[size];
+      if (cgindex == null)
+        cgindex = buildIndex(fs, path, dirty, conf);
+      if (size < cgindex.size())
+        throw new AssertionError("Incorrect file list size");
+      for (int j, i = 0; i < size; i++)
+      {
+        j = cgindex.getFileIndex(fileStatus[i].getPath());
+        if (j != -1)
+          result[j] = fileStatus[i];
+      }
+      for (int i = 0; i < size; i++)
+        fileStatus[i] = result[i];
+    }
+
     /**
      * Is the ColumnGroup sorted?
      * 
@@ -814,7 +899,7 @@
       TupleReader tupleReader;
 
       TFileScanner(FileSystem fs, Path path, CGRowSplit rowRange, 
-                    RawComparable begin, RawComparable end, 
+                    RawComparable begin, RawComparable end, boolean first, boolean last,
                     CGSchema cgschema, Projection projection,
           Configuration conf) throws IOException, ParseException {
         try {
@@ -823,9 +908,15 @@
            * compressor is inside cgschema
            */
           reader = new TFile.Reader(ins, fs.getFileStatus(path).getLen(), conf);
-          if (rowRange != null) {
-            scanner = reader.createScannerByRecordNum(rowRange.startRow, 
-                                         rowRange.startRow + rowRange.numRows);
+          if (rowRange != null && rowRange.startByteFirst != -1) {
+            if (first && rowRange.startByteFirst != -1)
+              scanner = reader.createScannerByRecordNum(rowRange.startRowFirst, 
+                                              rowRange.startRowFirst + rowRange.numRowsFirst);
+            else if (last && rowRange.startByteLast != -1)
+              scanner = reader.createScannerByRecordNum(rowRange.startRowLast, 
+                  rowRange.startRowLast + rowRange.numRowsLast);
+            else
+              scanner = reader.createScanner();
           } else {
             /* TODO: more investigation is needed for the following.
              *  using deprecated API just so that zebra can work with 
@@ -941,11 +1032,36 @@
      */
     class CGScanner implements TableScanner {
       private Projection logicalSchema = null;
-      private TFileScanner[] scanners;
+      private TFileScannerInfo[] scanners;
       private boolean closeReader;
       private int beginIndex, endIndex;
       private int current; // current scanner
       private boolean scannerClosed = true;
+      private CGRowSplit rowRange;
+      private TFileScanner scanner;
+      
+      private class TFileScannerInfo {
+        boolean first, last;
+        Path path;
+        RawComparable begin, end;
+        TFileScannerInfo(boolean first, boolean last, Path path, RawComparable begin, RawComparable end) {
+          this.first = first;
+          this.last = last;
+          this.begin = begin;
+          this.end = end;
+          this.path = path;
+        }
+        
+        TFileScanner getTFileScanner() throws IOException {
+          try {
+            return new TFileScanner(fs, path, rowRange, 
+                  begin, end, first, last, cgschema, logicalSchema, conf);
+          } catch (ParseException e) {
+            throw new IOException(e.getMessage());
+          }
+        }
+      }
+
 
       CGScanner(CGRangeSplit split, boolean closeReader) throws IOException,
       ParseException {
@@ -970,9 +1086,8 @@
        */
       CGScanner(CGRowSplit rowRange, boolean closeReader) 
                  throws IOException, ParseException {
-        
         beginIndex = 0;
-        endIndex = 1;
+        endIndex = rowRange.length;
         init(rowRange, null, null, closeReader);
       }
       
@@ -995,49 +1110,52 @@
       private void init(CGRowSplit rowRange, RawComparable beginKey, 
                         RawComparable endKey, boolean doClose) 
              throws IOException, ParseException {
+        this.rowRange = rowRange;
         if (beginIndex > endIndex) {
           throw new IllegalArgumentException("beginIndex > endIndex");
         }
         logicalSchema = ColumnGroup.Reader.this.getProjection();
-        List<TFileScanner> tmpScanners =
-            new ArrayList<TFileScanner>(endIndex - beginIndex);
+        List<TFileScannerInfo> tmpScanners =
+            new ArrayList<TFileScannerInfo>(endIndex - beginIndex);
         try {
+          boolean first, last, realFirst = true;
+          Path myPath;
           for (int i = beginIndex; i < endIndex; ++i) {
-            RawComparable begin = (i == beginIndex) ? beginKey : null;
-            RawComparable end = (i == endIndex - 1) ? endKey : null;
-            TFileScanner scanner;
-            if (rowRange != null)
-              scanner =
-                new TFileScanner(fs, new Path(path, rowRange.name), rowRange, 
-                                 begin, end,
-                                 cgschema, logicalSchema, conf);
+            first = (i == beginIndex);
+            last = (i == endIndex -1);
+            RawComparable begin = first ? beginKey : null;
+            RawComparable end = last ? endKey : null;
+            TFileScannerInfo scanner;
+            if (rowRange == null)
+              myPath = cgindex.getPath(i, path);
             else
-              scanner =
-                new TFileScanner(fs, cgindex.getPath(i, path), null, 
-                                 begin, end,
-                                 cgschema, logicalSchema, conf);
-            // skip empty scanners.
-            if (!scanner.atEnd()) {
-              tmpScanners.add(scanner);
-            }
-            else {
-              scanner.close();
+              myPath = new Path(path, rowRange.names[i]);
+            scanner =
+                new TFileScannerInfo(first, last, myPath, begin, end);
+            if (realFirst) {
+              this.scanner = scanner.getTFileScanner();
+              if (this.scanner.atEnd()) {
+                this.scanner.close();
+                this.scanner = null;
+              } else {
+                realFirst = false;
+                tmpScanners.add(scanner);
+              }
+            } else {
+              TFileScanner myScanner = scanner.getTFileScanner();
+              if (!myScanner.atEnd())
+                tmpScanners.add(scanner);
+              myScanner.close();
             }
           }
-          scanners = tmpScanners.toArray(new TFileScanner[tmpScanners.size()]);
+          scanners = tmpScanners.toArray(new TFileScannerInfo[tmpScanners.size()]);
           this.closeReader = doClose;
           scannerClosed = false;
         }
         finally {
           if (scannerClosed) { // failed to initialize the object.
-            for (int i = 0; i < tmpScanners.size(); ++i) {
-              try {
-                tmpScanners.get(i).close();
-              }
-              catch (Exception e) {
-                // no op
-              }
-            }
+            if (scanner != null)
+              scanner.close();
           }
         }
       }
@@ -1047,7 +1165,7 @@
           if (atEnd()) {
             throw new EOFException("No more key-value to read");
           }
-          scanners[current].getKey(key);
+          scanner.getKey(key);
         }
 
         @Override
@@ -1056,19 +1174,19 @@
             throw new EOFException("No more key-value to read");
           }
           try {
-            scanners[current].getValue(row);
+            scanner.getValue(row);
           } catch (ParseException e) {
             throw new IOException("Invalid Projection: "+e.getMessage());
           }
         }
 
       public void getCGKey(BytesWritable key) throws IOException {
-        scanners[current].getKey(key);
+        scanner.getKey(key);
       }
 
       public void getCGValue(Tuple row) throws IOException {
         try {
-            scanners[current].getValue(row);
+            scanner.getValue(row);
           } catch (ParseException e) {
             throw new IOException("Invalid Projection: "+e.getMessage());
           }
@@ -1085,29 +1203,41 @@
 
       @Override
       public boolean advance() throws IOException {
-          if (atEnd()) {
-            return false;
-          }
-          scanners[current].advance();
-          if (scanners[current].atEnd()) {
+        if (atEnd()) {
+          return false;
+        }
+        scanner.advance();
+        while (true)
+        {
+          if (scanner.atEnd()) {
+            scanner.close();
+            scanner = null;
             ++current;
             if (!atEnd()) {
-              scanners[current].rewind();
-            }
-          }
-          return true;
+              scanner = scanners[current].getTFileScanner();
+            } else
+              return false;
+          } else
+            return true;
         }
+      }
 
       public boolean advanceCG() throws IOException {
-          scanners[current].advance();
-          if (scanners[current].atEnd()) {
+        scanner.advance();
+        while (true)
+        {
+          if (scanner.atEnd()) {
+            scanner.close();
+            scanner = null;
             ++current;
             if (!atEnd()) {
-              scanners[current].rewind();
-            }
-          }
-          return true;
+              scanner = scanners[current].getTFileScanner();
+            } else
+              return false;
+          } else
+            return true;
         }
+      }
 
       @Override
       public boolean atEnd() throws IOException {
@@ -1136,12 +1266,34 @@
           index = beginIndex;
         }
 
+        int prevCurrent = current;
         current = index - beginIndex;
-        return scanners[current].seekTo(key);
+        if (current != prevCurrent)
+        {
+          if (scanner != null)
+          {
+            try {
+              scanner.close();
+            } catch (Exception e) {
+              // no-op
+            }
+          }
+          scanner = scanners[current].getTFileScanner();
+        }
+        return scanner.seekTo(key);
       }
 
       @Override
       public void seekToEnd() throws IOException {
+        if (scanner != null)
+        {
+          try {
+            scanner.close();
+          } catch (Exception e) {
+            // no-op
+          }
+        }
+        scanner = null;
         current = scanners.length;
       }
 
@@ -1149,11 +1301,12 @@
       public void close() throws IOException {
         if (!scannerClosed) {
           scannerClosed = true;
-          for (int i = 0; i < scanners.length; ++i) {
+          if (scanner != null)
+          {
             try {
-              scanners[i].close();
-            }
-            catch (Exception e) {
+              scanner.close();
+              scanner = null;
+            } catch (Exception e) {
               // no-op
             }
           }
@@ -1191,18 +1344,35 @@
     }
     
     public static class CGRowSplit implements Writable {
-      String name;
-      long startByte = -1;
-      long numBytes = -1;
-      long startRow = -1;
-      long numRows = -1;
-      long size = 0; // size of the file in the selected CG 
+      int length; // number of files in the batch
+      long startByteFirst = -1;
+      long numBytesFirst;
+      long startRowFirst = -1;
+      long numRowsFirst = -1;
+      long startByteLast = -1;
+      long numBytesLast;
+      long startRowLast = -1;
+      long numRowsLast = -1;
+      String[] names;
+      long[] sizes = null;
+
+      CGRowSplit(String[] names, long[] sizes, FileSystem fs, Configuration conf,
+          int length, long startFirst, long bytesFirst,
+          long startLast, long bytesLast) throws IOException {
+        this.names = names;
+        this.sizes = sizes;
+        this.length = length;
 
-      CGRowSplit(String name, long start, long len, long size) {
-        this.name = name;
-        this.startByte = start;
-        this.numBytes = len;
-        this.size = size;
+        if (startFirst != -1)
+        {
+          startByteFirst = startFirst;
+          numBytesFirst = bytesFirst;
+        }
+        if (startLast != -1 && this.length > 1)
+        {
+          startByteLast = startLast;
+          numBytesLast = bytesLast;
+        }
       }
 
       public CGRowSplit() {
@@ -1212,35 +1382,64 @@
       @Override
       public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("{name = " + name + "}\n");       
-        sb.append("{startByte = " + startByte + "}\n");
-        sb.append("{numBytes = " + numBytes + "}\n");
-        sb.append("{startRow = " + startRow + "}\n");
-        sb.append("{numRows = " + numRows + "}\n");
-        sb.append("{size = " + size + "}\n");
+        sb.append("{length = " + length + "}\n");
+        for (int i = 0; i < length; i++)
+        {
+          sb.append("{name = " + names[i] + "}\n");
+          sb.append("{size = " + sizes[i] + "}\n");
+        }
+        sb.append("{startByteFirst = " + startByteFirst + "}\n");
+        sb.append("{numBytesFirst = " + numBytesFirst + "}\n");
+        sb.append("{startRowFirst = " + startRowFirst + "}\n");
+        sb.append("{numRowsFirst = " + numRowsFirst + "}\n");
+        sb.append("{startByteLast = " + startByteLast + "}\n");
+        sb.append("{numBytesLast = " + numBytesLast + "}\n");
+        sb.append("{startRowLast = " + startRowLast + "}\n");
+        sb.append("{numRowsLast = " + numRowsLast + "}\n");
         
         return sb.toString();
       }
 
       @Override
       public void readFields(DataInput in) throws IOException {
-        name = Utils.readString(in);
-        startByte = Utils.readVLong(in);
-        numBytes = Utils.readVLong(in);
-        startRow = Utils.readVLong(in);
-        numRows = Utils.readVLong(in);
-        size = Utils.readVLong(in);
+        length = Utils.readVInt(in);
+        if (length > 0)
+        {
+          names = new String[length];
+          sizes = new long[length];
+        }
+        for (int i = 0; i < length; i++)
+        {
+          names[i] = Utils.readString(in);
+          sizes[i] = Utils.readVLong(in);
+        }
+        startByteFirst = Utils.readVLong(in);
+        numBytesFirst = Utils.readVLong(in);
+        startRowFirst = Utils.readVLong(in);
+        numRowsFirst = Utils.readVLong(in);
+        startByteLast = Utils.readVLong(in);
+        numBytesLast = Utils.readVLong(in);
+        startRowLast = Utils.readVLong(in);
+        numRowsLast = Utils.readVLong(in);
       }
 
       @Override
       public void write(DataOutput out) throws IOException {
-        Utils.writeString(out, name);
-        Utils.writeVLong(out, startByte);
-        Utils.writeVLong(out, numBytes);
-        Utils.writeVLong(out, startRow);
-        Utils.writeVLong(out, numRows);
-        Utils.writeVLong(out, size);
-      }      
+        Utils.writeVInt(out, length);
+        for (int i = 0; i < length; i++)
+        {
+          Utils.writeString(out, names[i]);
+          Utils.writeVLong(out, sizes[i]);
+        }
+        Utils.writeVLong(out, startByteFirst);
+        Utils.writeVLong(out, numBytesFirst);
+        Utils.writeVLong(out, startRowFirst);
+        Utils.writeVLong(out, numRowsFirst);
+        Utils.writeVLong(out, startByteLast);
+        Utils.writeVLong(out, numBytesLast);
+        Utils.writeVLong(out, startRowLast);
+        Utils.writeVLong(out, numRowsLast);
+      }
     }
     
     private static class SplitColumn {
@@ -1852,7 +2051,7 @@
           return cgie.getIndex(); 
         }
       }
-      throw new IOException("File " + filename + " is not in the column group index"); 
+      return -1;
     }
 
     int size() {

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=920562&r1=920561&r2=920562&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Mon Mar  8 23:20:34 2010
@@ -29,6 +29,8 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.RawComparator;
@@ -38,6 +40,7 @@
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InvalidInputException;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.zebra.io.BasicTable;
@@ -379,7 +382,6 @@
 		 {
 			 throw new IOException("The table is not properly sorted");
 		 }
-		 setSorted(conf);
 	 } else {
 		 List<LeafTableInfo> leaves = expr.getLeafTables(null);
 		 for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext(); )
@@ -403,9 +405,9 @@
          }
        }
 		 }
-		 // need key range input splits for sorted table union
-		 setSorted(conf);
 	 }
+	 // need key range input splits for sorted table union
+	 setSorted(conf);
   }
   
   /**
@@ -635,8 +637,48 @@
   }
   
   private static class DummyFileInputFormat extends FileInputFormat<BytesWritable, Tuple> {
-    public DummyFileInputFormat(long minSplitSize) {
+    /**
+     * the next constant and class are copies from FileInputFormat
+     */
+    private static final PathFilter hiddenFileFilter = new PathFilter(){
+        public boolean accept(Path p){
+          String name = p.getName(); 
+          return !name.startsWith("_") && !name.startsWith("."); 
+        }
+      }; 
+
+    /**
+     * Proxy PathFilter that accepts a path only if all filters given in the
+     * constructor do. Used by the listPaths() to apply the built-in
+     * hiddenFileFilter together with a user provided one (if any).
+     */
+    private static class MultiPathFilter implements PathFilter {
+      private List<PathFilter> filters;
+
+      public MultiPathFilter(List<PathFilter> filters) {
+        this.filters = filters;
+      }
+
+      public boolean accept(Path path) {
+        for (PathFilter filter : filters) {
+          if (!filter.accept(path)) {
+            return false;
+          }
+        }
+        return true;
+      }
+    }
+    private Integer[] fileNumbers = null;
+
+    private List<BasicTable.Reader> readers;
+
+    public Integer[] getFileNumbers() {
+      return fileNumbers;
+    }
+
+    public DummyFileInputFormat(long minSplitSize, List<BasicTable.Reader> readers) {
       super.setMinSplitSize(minSplitSize);
+      this.readers = readers;
     }
     
     @Override
@@ -645,12 +687,93 @@
       // no-op
       return null;
     }
+
+    @Override
+    public long computeSplitSize(long goalSize, long minSize, long blockSize) {
+      return super.computeSplitSize(goalSize, minSize, blockSize);
+    }
+
+    /**
+     * copy from FileInputFormat: add assignment to table file numbers
+     */
+    @Override
+    public FileStatus[] listStatus(JobConf job) throws IOException {
+      Path[] dirs = getInputPaths(job);
+      if (dirs.length == 0) {
+        throw new IOException("No input paths specified in job");
+      }
+
+      List<FileStatus> result = new ArrayList<FileStatus>();
+      List<IOException> errors = new ArrayList<IOException>();
+      
+      // creates a MultiPathFilter with the hiddenFileFilter and the
+      // user provided one (if any).
+      List<PathFilter> filters = new ArrayList<PathFilter>();
+      filters.add(hiddenFileFilter);
+      PathFilter jobFilter = getInputPathFilter(job);
+      if (jobFilter != null) {
+        filters.add(jobFilter);
+      }
+      PathFilter inputFilter = new MultiPathFilter(filters);
+
+      ArrayList<Integer> fileNumberList  = new ArrayList<Integer>();
+      int index = 0;
+      for (Path p: dirs) {
+        FileSystem fs = p.getFileSystem(job); 
+        FileStatus[] matches = fs.globStatus(p, inputFilter);
+        if (matches == null) {
+          errors.add(new IOException("Input path does not exist: " + p));
+        } else if (matches.length == 0) {
+          errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+        } else {
+          for (FileStatus globStat: matches) {
+            if (globStat.isDir()) {
+              FileStatus[] fileStatuses = fs.listStatus(globStat.getPath(), inputFilter);
+              // reorder according to CG index
+              BasicTable.Reader reader = readers.get(index);
+              reader.rearrangeFileIndices(fileStatuses);
+              for(FileStatus stat: fileStatuses) {
+                if (stat != null)
+                  result.add(stat);
+              }
+              fileNumberList.add(fileStatuses.length);
+            } else {
+              result.add(globStat);
+              fileNumberList.add(1);
+            }
+          }
+        }
+        index++;
+      }
+      fileNumbers = new Integer[fileNumberList.size()];
+      fileNumberList.toArray(fileNumbers);
+
+      if (!errors.isEmpty()) {
+        throw new InvalidInputException(errors);
+      }
+      LOG.info("Total input paths to process : " + result.size()); 
+      return result.toArray(new FileStatus[result.size()]);
+    }
   }
   
   private static InputSplit[] getRowSplits(JobConf conf, int numSplits,
-      TableExpr expr, List<BasicTable.Reader> readers) throws IOException {
+      TableExpr expr, List<BasicTable.Reader> readers,
+      List<BasicTableStatus> status) throws IOException {
     ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
-    DummyFileInputFormat helper = new DummyFileInputFormat(getMinSplitSize(conf));
+
+    long minSplitSize = getMinSplitSize(conf);
+  
+    long minSize = Math.max(conf.getLong("mapred.min.split.size", 1), minSplitSize);
+    long totalBytes = 0;
+    for (Iterator<BasicTableStatus> it = status.iterator(); it.hasNext(); )
+    {
+      totalBytes += it.next().getSize();
+    }
+    long goalSize = totalBytes / (numSplits < 1 ? 1 : numSplits);
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    PathFilter filter = null;
+    List<BasicTable.Reader> realReaders = new ArrayList<BasicTable.Reader>();
 
     for (int i = 0; i < readers.size(); ++i) {
       BasicTable.Reader reader = readers.get(i);
@@ -660,28 +783,116 @@
       /* We can create input splits only if there does exist a valid column group for split.
        * Otherwise, we do not create input splits. */
       if (splitCGIndex >= 0) {        
-        Path path = new Path (reader.getPath().toString() + "/" + reader.getName(splitCGIndex));
-        DummyFileInputFormat.setInputPaths(conf, path);
-        PathFilter filter = reader.getPathFilter(conf);
-        DummyFileInputFormat.setInputPathFilter(conf, filter.getClass());
-        InputSplit[] inputSplits = helper.getSplits(conf, (numSplits < 1 ? 1 : numSplits));
+        realReaders.add(reader);
+        if (first)
+        {
+          // filter is identical across tables
+          filter = reader.getPathFilter(conf);
+          first = false;
+        } else
+          sb.append(",");
+        sb.append(reader.getPath().toString() + "/" + reader.getName(splitCGIndex));
+      }
+    }
+    
+    DummyFileInputFormat helper = new DummyFileInputFormat(minSplitSize, realReaders);
+
+    if (!realReaders.isEmpty())
+    {
+      DummyFileInputFormat.setInputPaths(conf, sb.toString());
+      DummyFileInputFormat.setInputPathFilter(conf, filter.getClass());
+      InputSplit[] inputSplits = helper.getSplits(conf, (numSplits < 1 ? 1 : numSplits));
+
+      int batchesPerSplit = inputSplits.length / (numSplits < 1 ? 1 : numSplits);
+      if (batchesPerSplit <= 0)
+        batchesPerSplit = 1;
+
+      /*
+       * Potential file batching optimizations include:
+       * 1) sort single file inputSplits in the descending order of their sizes so
+       *    that the ops of new file opens are spread to a maximum degree;
+       * 2) batching the files with maximum block distribution affinities into the same input split
+       */
+
+      int[] inputSplitBoundaries = new int[realReaders.size()];
+      long start, prevStart = Long.MIN_VALUE;
+      int tableIndex = 0, fileNumber = 0;
+      Integer[] fileNumbers = helper.getFileNumbers();
+      if (fileNumbers.length != realReaders.size())
+        throw new IOException("Number of tables in input paths of input splits is incorrect.");
+      for (int j=0; j<inputSplits.length; j++) {
+        FileSplit fileSplit = (FileSplit) inputSplits[j];
+        start = fileSplit.getStart();
+        if (start <= prevStart)
+        {
+          fileNumber++;
+          if (fileNumber >= fileNumbers[tableIndex])
+          {
+            inputSplitBoundaries[tableIndex++] = j;
+            fileNumber = 0;
+          }
+        }
+        prevStart = start;
+      }
+      inputSplitBoundaries[tableIndex++] =  inputSplits.length;
+      if (tableIndex != realReaders.size())
+        throw new IOException("Number of tables in input splits is incorrect.");
+      for (tableIndex = 0; tableIndex < realReaders.size(); tableIndex++)
+      {
+        int startSplitIndex = (tableIndex == 0 ? 0 : inputSplitBoundaries[tableIndex - 1]);
+        int splitLen = (tableIndex == 0 ? inputSplitBoundaries[0] :
+            inputSplitBoundaries[tableIndex] - inputSplitBoundaries[tableIndex-1]);
+        BasicTable.Reader reader = realReaders.get(tableIndex);
+        /* Get the index of the column group that will be used for row-split.*/
+        int splitCGIndex = reader.getRowSplitCGIndex();
         
-        long starts[] = new long[inputSplits.length];
-        long lengths[] = new long[inputSplits.length];
-        Path paths[] = new Path [inputSplits.length];
-        for (int j=0; j<inputSplits.length; j++) {
+        long starts[] = new long[splitLen];
+        long lengths[] = new long[splitLen];
+        int batches[] = new int[splitLen + 1];
+        batches[0] = 0;
+        int numBatches = 0;
+        int batchSize = 0;
+        Path paths[] = new Path [splitLen];
+        long totalLen = 0;
+        final double SPLIT_SLOP = 1.1;
+        int endSplitIndex = startSplitIndex + splitLen;
+        for (int j=startSplitIndex; j< endSplitIndex; j++) {
           FileSplit fileSplit = (FileSplit) inputSplits[j];
           Path p = fileSplit.getPath();
-          long start = fileSplit.getStart();
+          long blockSize = p.getFileSystem(conf).getBlockSize(p);
+          long splitSize = (long) (helper.computeSplitSize(goalSize, minSize, blockSize) * SPLIT_SLOP);
+          start = fileSplit.getStart();
           long length = fileSplit.getLength();
-
-          starts[j] = start;
-          lengths[j] = length;
-          paths[j] = p;
+          int index = j - startSplitIndex;
+          starts[index] = start;
+          lengths[index] = length;
+          totalLen += length;
+          paths[index] = p;
+          if (totalLen >= splitSize)
+          {
+
+             for (int ii = batches[numBatches] + 1; ii < index - 1; ii++)
+               starts[ii] = -1; // all intermediate files are not split
+             batches[++numBatches] = index;
+             batchSize = 1;
+             totalLen = length;
+          } else if (batchSize + 1 > batchesPerSplit) {
+            for (int ii = batches[numBatches] + 1; ii < index - 1; ii++)
+              starts[ii] = -1; // all intermediate files are not split
+            batches[++numBatches] = index;
+            batchSize = 1;
+            totalLen = length;
+          } else {
+            batchSize++;
+          }
         }
+        for (int ii = batches[numBatches] + 1; ii < splitLen - 1; ii++)
+          starts[ii] = -1; // all intermediate files are not split
+        if (splitLen > 0)
+          batches[++numBatches] = splitLen;
         
-        List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex);
-  
+        List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex, batches, numBatches);
+    
         for (Iterator<RowSplit> it = subSplits.iterator(); it.hasNext();) {
           RowSplit subSplit = it.next();
           RowTableSplit split = new RowTableSplit(reader, subSplit, conf);
@@ -689,7 +900,7 @@
         }
       }
     }
-    
+
     LOG.info("getSplits : returning " + ret.size() + " row splits.");
     return ret.toArray(new InputSplit[ret.size()]);
   }
@@ -728,11 +939,8 @@
         BasicTable.Reader reader =
           new BasicTable.Reader(leaf.getPath(), conf);
         reader.setProjection(leaf.getProjection());
-        if (sorted)
-        {
-          BasicTableStatus s = reader.getStatus();
-          status.add(s);
-        }
+        BasicTableStatus s = reader.getStatus();
+        status.add(s);
         readers.add(reader);
         if (first)
           first = false;
@@ -753,7 +961,7 @@
         return getSortedSplits(conf, numSplits, expr, readers, status);
       }
        
-      return getRowSplits(conf, numSplits, expr, readers);
+      return getRowSplits(conf, numSplits, expr, readers, status);
     } catch (ParseException e) {
       throw new IOException("Projection parsing failed : "+e.getMessage());
     }

Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=920562&r1=920561&r2=920562&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java Mon Mar  8 23:20:34 2010
@@ -30,6 +30,8 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.WritableUtils;
@@ -41,6 +43,7 @@
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.zebra.io.BasicTable;
@@ -401,7 +404,6 @@
 		 {
 			 throw new IOException("The table is not properly sorted");
 		 }
-		 setSorted( conf );
 	 } else {
 		 List<LeafTableInfo> leaves = expr.getLeafTables(null);
 		 for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext(); )
@@ -425,9 +427,9 @@
          }
        }
 		 }
-		 // need key range input splits for sorted table union
-		 setSorted( conf );
 	 }
+   // need key range input splits for sorted table union
+   	 setSorted(conf);
   }
   
   /**
@@ -612,25 +614,145 @@
   }
   
   private static class DummyFileInputFormat extends FileInputFormat<BytesWritable, Tuple> {
-    public DummyFileInputFormat(Job job, long minSplitSize) {
-      super.setMinInputSplitSize( job, minSplitSize );
+    /**
+      * the next constant and class are copies from FileInputFormat
+      */
+    private static final PathFilter hiddenFileFilter = new PathFilter(){
+        public boolean accept(Path p){
+          String name = p.getName(); 
+          return !name.startsWith("_") && !name.startsWith("."); 
+        }
+      }; 
+ 
+    /**
+     * Proxy PathFilter that accepts a path only if all filters given in the
+     * constructor do. Used by the listPaths() to apply the built-in
+     * hiddenFileFilter together with a user provided one (if any).
+     */
+    private static class MultiPathFilter implements PathFilter {
+      private List<PathFilter> filters;
+ 
+      public MultiPathFilter(List<PathFilter> filters) {
+        this.filters = filters;
+      }
+ 
+      public boolean accept(Path path) {
+        for (PathFilter filter : filters) {
+          if (!filter.accept(path)) {
+            return false;
+          }
+        }
+        return true;
+      }
+    }
+    private Integer[] fileNumbers = null;
+ 
+    private List<BasicTable.Reader> readers;
+ 
+    public Integer[] getFileNumbers() {
+      return fileNumbers;
+    }
+ 
+    public DummyFileInputFormat(Job job, long minSplitSize, List<BasicTable.Reader> readers) {
+     super.setMinInputSplitSize(job, minSplitSize);
+     this.readers = readers;
     }
     
 
-	@Override
-	public RecordReader<BytesWritable, Tuple> createRecordReader(
-			InputSplit arg0, TaskAttemptContext arg1) throws IOException,
-			InterruptedException {
-		// no-op
-		return null;
-	}
+    @Override
+    public RecordReader<BytesWritable, Tuple> createRecordReader(
+    InputSplit arg0, TaskAttemptContext arg1) throws IOException,
+        InterruptedException {
+       // no-op
+      return null;
+    }
+
+    @Override
+    public long computeSplitSize(long blockSize, long minSize, long maxSize) {
+      return super.computeSplitSize(blockSize, minSize, maxSize);
+    }
+
+    /**
+     * copy from FileInputFormat: add assignment to table file numbers
+     */
+    @Override
+    public List<FileStatus> listStatus(JobContext jobContext) throws IOException {
+      Configuration job = jobContext.getConfiguration();
+      Path[] dirs = getInputPaths(jobContext);
+      if (dirs.length == 0) {
+        throw new IOException("No input paths specified in job");
+      }
+
+      List<FileStatus> result = new ArrayList<FileStatus>();
+      List<IOException> errors = new ArrayList<IOException>();
+      
+      // creates a MultiPathFilter with the hiddenFileFilter and the
+      // user provided one (if any).
+      List<PathFilter> filters = new ArrayList<PathFilter>();
+      filters.add(hiddenFileFilter);
+      PathFilter jobFilter = getInputPathFilter(jobContext);
+      if (jobFilter != null) {
+        filters.add(jobFilter);
+      }
+      PathFilter inputFilter = new MultiPathFilter(filters);
+
+      ArrayList<Integer> fileNumberList  = new ArrayList<Integer>();
+      int index = 0;
+      for (Path p: dirs) {
+        FileSystem fs = p.getFileSystem(job); 
+        FileStatus[] matches = fs.globStatus(p, inputFilter);
+        if (matches == null) {
+          errors.add(new IOException("Input path does not exist: " + p));
+        } else if (matches.length == 0) {
+          errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+        } else {
+          for (FileStatus globStat: matches) {
+            if (globStat.isDir()) {
+              FileStatus[] fileStatuses = fs.listStatus(globStat.getPath(), inputFilter);
+              // reorder according to CG index
+              BasicTable.Reader reader = readers.get(index);
+              reader.rearrangeFileIndices(fileStatuses);
+              for(FileStatus stat: fileStatuses) {
+                if (stat != null)
+                  result.add(stat);
+              }
+              fileNumberList.add(fileStatuses.length);
+            } else {
+              result.add(globStat);
+              fileNumberList.add(1);
+            }
+          }
+        }
+        index++;
+      }
+      fileNumbers = new Integer[fileNumberList.size()];
+      fileNumberList.toArray(fileNumbers);
+
+      if (!errors.isEmpty()) {
+        throw new InvalidInputException(errors);
+      }
+      LOG.info("Total input paths to process : " + result.size()); 
+      return result;
+    }
   }
   
   private static List<InputSplit> getRowSplits(Configuration conf,
-      TableExpr expr, List<BasicTable.Reader> readers) throws IOException {
+      TableExpr expr, List<BasicTable.Reader> readers,
+      List<BasicTableStatus> status) throws IOException {
     ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
     Job job = new Job(conf);
-    DummyFileInputFormat helper = new DummyFileInputFormat(job, getMinSplitSize(conf));
+    long minSplitSize = getMinSplitSize(conf);
+  
+    long minSize = Math.max(conf.getLong("mapreduce.min.split.size", 1), minSplitSize);
+    long totalBytes = 0;
+    for (Iterator<BasicTableStatus> it = status.iterator(); it.hasNext(); )
+    {
+      totalBytes += it.next().getSize();
+    }
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    PathFilter filter = null;
+    List<BasicTable.Reader> realReaders = new ArrayList<BasicTable.Reader>();
 
     for (int i = 0; i < readers.size(); ++i) {
       BasicTable.Reader reader = readers.get(i);
@@ -640,29 +762,103 @@
       /* We can create input splits only if there does exist a valid column group for split.
        * Otherwise, we do not create input splits. */
       if (splitCGIndex >= 0) {        
-        Path path = new Path (reader.getPath().toString() + "/" + reader.getName(splitCGIndex));
-        DummyFileInputFormat.setInputPaths(job, path);
-        PathFilter filter = reader.getPathFilter( job.getConfiguration() );
-        DummyFileInputFormat.setInputPathFilter(job, filter.getClass());
-        
-        List<InputSplit> inputSplits = helper.getSplits( job );
-        
-        long starts[] = new long[inputSplits.size()];
-        long lengths[] = new long[inputSplits.size()];
-        Path paths[] = new Path [inputSplits.size()];
-        for (int j=0; j<inputSplits.size(); j++) {
-          FileSplit fileSplit = (FileSplit) inputSplits.get( j );
+        realReaders.add(reader);
+         if (first)
+         {
+           // filter is identical across tables
+           filter = reader.getPathFilter(conf);
+           first = false;
+         } else
+           sb.append(",");
+         sb.append(reader.getPath().toString() + "/" + reader.getName(splitCGIndex));
+       }
+     }
+     
+     DummyFileInputFormat helper = new DummyFileInputFormat(job,minSplitSize, realReaders);
+ 
+     if (!realReaders.isEmpty())
+     {
+       DummyFileInputFormat.setInputPaths(job, sb.toString());
+       DummyFileInputFormat.setInputPathFilter(job, filter.getClass());
+       List<InputSplit> inputSplitList = helper.getSplits(job);
+       InputSplit[] inputSplits = inputSplitList.toArray(new InputSplit[0]);
+ 
+       /*
+        * Potential file batching optimizations include:
+        * 1) sort single file inputSplits in the descending order of their sizes so
+        *    that the ops of new file opens are spread to a maximum degree;
+        * 2) batching the files with maximum block distribution affinities into the same input split
+        */
+ 
+       int[] inputSplitBoundaries = new int[realReaders.size()];
+       long start, prevStart = Long.MIN_VALUE;
+       int tableIndex = 0, fileNumber = 0;
+       Integer[] fileNumbers = helper.getFileNumbers();
+       if (fileNumbers.length != realReaders.size())
+         throw new IOException("Number of tables in input paths of input splits is incorrect.");
+       for (int j=0; j<inputSplits.length; j++) {
+         FileSplit fileSplit = (FileSplit) inputSplits[j];
+         start = fileSplit.getStart();
+         if (start <= prevStart)
+         {
+           fileNumber++;
+           if (fileNumber >= fileNumbers[tableIndex])
+           {
+             inputSplitBoundaries[tableIndex++] = j;
+             fileNumber = 0;
+           }
+         }
+         prevStart = start;
+       }
+       inputSplitBoundaries[tableIndex++] =  inputSplits.length;
+       if (tableIndex != realReaders.size())
+         throw new IOException("Number of tables in input splits is incorrect.");
+       for (tableIndex = 0; tableIndex < realReaders.size(); tableIndex++)
+       {
+         int startSplitIndex = (tableIndex == 0 ? 0 : inputSplitBoundaries[tableIndex - 1]);
+         int splitLen = (tableIndex == 0 ? inputSplitBoundaries[0] :
+             inputSplitBoundaries[tableIndex] - inputSplitBoundaries[tableIndex-1]);
+         BasicTable.Reader reader = realReaders.get(tableIndex);
+         /* Get the index of the column group that will be used for row-split.*/
+         int splitCGIndex = reader.getRowSplitCGIndex();
+         
+         long starts[] = new long[splitLen];
+         long lengths[] = new long[splitLen];
+         int batches[] = new int[splitLen + 1];
+         batches[0] = 0;
+         int numBatches = 0;
+         Path paths[] = new Path [splitLen];
+         long totalLen = 0;
+         final double SPLIT_SLOP = 1.1;
+         int endSplitIndex = startSplitIndex + splitLen;
+         for (int j=startSplitIndex; j< endSplitIndex; j++) {
+          FileSplit fileSplit = (FileSplit) inputSplits[j];
           Path p = fileSplit.getPath();
-          long start = fileSplit.getStart();
+          long blockSize = p.getFileSystem(conf).getBlockSize(p);
+          long splitSize = (long) (helper.computeSplitSize(blockSize, minSize, totalBytes) * SPLIT_SLOP);
+          start = fileSplit.getStart();
           long length = fileSplit.getLength();
+          int index = j - startSplitIndex;
+          starts[index] = start;
+          lengths[index] = length;
+          totalLen += length;
+          paths[index] = p;
+          if (totalLen >= splitSize)
+          {
 
-          starts[j] = start;
-          lengths[j] = length;
-          paths[j] = p;
+             for (int ii = batches[numBatches] + 1; ii < index - 1; ii++)
+               starts[ii] = -1; // all intermediate files are not split
+             batches[++numBatches] = index;
+             totalLen = length;
+          }
         }
+        for (int ii = batches[numBatches] + 1; ii < splitLen - 1; ii++)
+          starts[ii] = -1; // all intermediate files are not split
+        if (splitLen > 0)
+          batches[++numBatches] = splitLen;
+        
+        List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex, batches, numBatches);
         
-        List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex);
-  
         for (Iterator<RowSplit> it = subSplits.iterator(); it.hasNext();) {
           RowSplit subSplit = it.next();
           RowTableSplit split = new RowTableSplit(reader, subSplit, conf);
@@ -670,7 +866,7 @@
         }
       }
     }
-    
+
     LOG.info("getSplits : returning " + ret.size() + " row splits.");
     return ret;
   }
@@ -718,11 +914,8 @@
     			BasicTable.Reader reader =
     				new BasicTable.Reader(leaf.getPath(), conf );
     			reader.setProjection(leaf.getProjection());
-          if (sorted)
-          {
-    			  BasicTableStatus s = reader.getStatus();
-    		  	status.add(s);
-          }
+    			BasicTableStatus s = reader.getStatus();
+    		  status.add(s);
     			readers.add(reader);
           if (first)
             first = false;
@@ -742,7 +935,7 @@
 
     		return sorted ? 
     				singleSplit ? getSortedSplits( conf, 1, expr, readers, status) : getSortedSplits(conf, -1, expr, readers, status) : 
-    					getRowSplits( conf, expr, readers);
+    					getRowSplits( conf, expr, readers, status);
     	} catch (ParseException e) {
     		throw new IOException("Projection parsing failed : "+e.getMessage());
     	} finally {

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java?rev=920562&r1=920561&r2=920562&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java Mon Mar  8 23:20:34 2010
@@ -174,13 +174,13 @@
         false }, numRows);
 
     TestBasicTable.doRangeSplit(new int[] { 4, 0, 2 }, numRows,
-        "a, b, e, f, x", path);
+        "a, b, c, e, f, x", path);
 
     // Remove another CG.
     BasicTable.dropColumnGroup(path, conf, "CG0");
 
     TestBasicTable.doRangeSplit(new int[] { 4, 0, 2, 3, 1 }, numRows,
-        "a, y, e, f, x", path);
+        "a, y, c, e, f, x", path);
 
     BasicTable.drop(path, conf);
   }