You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/02/03 02:08:16 UTC

svn commit: r905856 - in /hadoop/pig/branches/branch-0.6/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/ src/java/org/apache/hadoop/zebra/pig/

Author: yanz
Date: Wed Feb  3 01:08:15 2010
New Revision: 905856

URL: http://svn.apache.org/viewvc?rev=905856&view=rev
Log:
PIG-1201: unnecessary name node calls by each mapper; too big input split serialization size by Pig's Slice implementation (yanz)

Modified:
    hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
    hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt?rev=905856&r1=905855&r2=905856&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt Wed Feb  3 01:08:15 2010
@@ -39,6 +39,8 @@
 
   BUG FIXES
 
+    PIG-1201: unnecessary name node calls by each mapper; too big input split serialization size by Pig's Slice implementation (yanz)
+
     PIG-1167: Hadoop file glob support (yanz)
 
     PIG-1145: Merge Join on Large Table throws an EOF exception (yanz)

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=905856&r1=905855&r2=905856&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Wed Feb  3 01:08:15 2010
@@ -95,6 +95,8 @@
 
   private final static String DELETED_CG_PREFIX = ".deleted-";
   
+  public final static String DELETED_CG_SEPARATOR_PER_TABLE = ",";
+
   // no public ctor for instantiating a BasicTable object
   private BasicTable() {
     // no-op
@@ -142,7 +144,7 @@
     /* Retry up to numCGs times accounting for other CG deleting threads or processes.*/
     while (triedCount ++ < numCGs) {
       try {
-        schemaFile = new SchemaFile(path, conf);
+        schemaFile = new SchemaFile(path, null, conf);
         break;
       } catch (FileNotFoundException e) {
         LOG.info("Try " + triedCount + " times : " + e.getMessage());
@@ -198,7 +200,7 @@
     } catch (IOException e) {
       // one remote possibility is that another user 
       // already deleted CG. 
-      SchemaFile tempSchema = new SchemaFile(path, conf);
+      SchemaFile tempSchema = new SchemaFile(path, null, conf);
       if (tempSchema.isCGDeleted(cgIdx)) {
         LOG.info(path + " : " + cgName + 
                  " is deleted by someone else. That is ok.");
@@ -281,10 +283,15 @@
      *          Optional configuration parameters.
      * @throws IOException
      */
+
     public Reader(Path path, Configuration conf) throws IOException {
+      this(path, null, conf);
+    }
+    public Reader(Path path, String[] deletedCGs, Configuration conf) throws IOException {
       try {
+        boolean mapper = (deletedCGs != null);
         this.path = path;
-        schemaFile = new SchemaFile(path, conf);
+        schemaFile = new SchemaFile(path, deletedCGs, conf);
         metaReader = MetaFile.createReader(new Path(path, BT_META_FILE), conf);
         // create column group readers
         int numCGs = schemaFile.getNumOfPhysicalSchemas();
@@ -301,7 +308,7 @@
           if (!schemaFile.isCGDeleted(nx)) {
             colGroups[nx] =
               new ColumnGroup.Reader(new Path(path, partition.getCGSchema(nx).getName()),
-                                     conf);
+                                     conf, mapper);
             if (firstValidCG < 0) {
               firstValidCG = nx;
             }
@@ -311,7 +318,8 @@
           else
             cgTuples[nx] = null;
         }
-        buildStatus();
+        if (schemaFile.isSorted())
+          buildStatus();
         closed = false;
       }
       catch (Exception e) {
@@ -410,7 +418,9 @@
     /**
      * Get the status of the BasicTable.
      */
-    public BasicTableStatus getStatus() {
+    public BasicTableStatus getStatus() throws IOException {
+      if (status == null)
+        buildStatus();
       return status;
     }
 
@@ -565,13 +575,16 @@
      * 
      * @param path
      *          The path to the BasicTable.
+     * @deletedCGs
+     *          The deleted column groups from front end; null if unavailable from front end
      * @param conf
      * @return The logical Schema of the table (all columns).
      * @throws IOException
      */
     public static Schema getSchema(Path path, Configuration conf)
         throws IOException {
-      SchemaFile schF = new SchemaFile(path, conf);
+      // fake an empty deleted cg list as getSchema does not care about deleted cgs
+      SchemaFile schF = new SchemaFile(path, new String[0], conf);
       return schF.getLogical();
     }
 
@@ -653,7 +666,7 @@
      * Get index of the column group that will be used for row-based split. 
      * 
      */
-    public int getRowSplitCGIndex() {
+    public int getRowSplitCGIndex() throws IOException {
       // Try to find the largest non-deleted and used column group by projection;
       int largestCGIndex = -1;
       int splitCGIndex = -1;
@@ -725,8 +738,12 @@
     String getStorageString() {
       return schemaFile.getStorageString();
     }
+    
+    public String getDeletedCGs() {
+      return schemaFile.getDeletedCGs();
+    }
 
-    private void buildStatus() {
+    private void buildStatus() throws IOException {
       status = new BasicTableStatus();
       if (firstValidCG >= 0) {
         status.beginKey = colGroups[firstValidCG].getStatus().getBeginKey();
@@ -913,11 +930,12 @@
         int cgIdx = rowSplit.getCGIndex();
         
         CGRowSplit cgSplit = new CGRowSplit();
-        cgSplit.fileIndex = inputCGSplit.fileIndex;
+        cgSplit.name = inputCGSplit.name;
         // startByte and numBytes from inputCGSplit are ignored, since
         // they make sense for only one CG.
         cgSplit.startRow = inputCGSplit.startRow;
         cgSplit.numRows = inputCGSplit.numRows;
+        cgSplit.size = inputCGSplit.size;
         
         if (cgSplit.startRow >= 0) {
           //assume the rows are already set up.
@@ -1337,9 +1355,11 @@
      * thrown if the table is already closed, or is in the process of being
      * closed.
      */
+    
     public Writer(Path path, Configuration conf) throws IOException {
       try {
-        schemaFile = new SchemaFile(path, conf);
+        // fake an empty deleted cg list as no cg should have been deleted now
+        schemaFile = new SchemaFile(path, new String[0], conf);
         int numCGs = schemaFile.getNumOfPhysicalSchemas();
         partition = schemaFile.getPartition();
         sorted = schemaFile.isSorted();
@@ -1650,8 +1670,8 @@
     boolean[] cgDeletedFlags;
    
     // ctor for reading
-    public SchemaFile(Path path, Configuration conf) throws IOException {
-      readSchemaFile(path, conf);
+    public SchemaFile(Path path, String[] deletedCGs, Configuration conf) throws IOException {
+      readSchemaFile(path, deletedCGs, conf);
     }
 
     public Schema[] getPhysicalSchema() {
@@ -1798,7 +1818,7 @@
       outSchema.close();
     }
 
-    private void readSchemaFile(Path path, Configuration conf)
+    private void readSchemaFile(Path path, String[] deletedCGs, Configuration conf)
         throws IOException {
       Path pathSchema = makeSchemaFilePath(path);
       if (!path.getFileSystem(conf).exists(pathSchema)) {
@@ -1845,7 +1865,18 @@
         throw new IOException("parser.RecordSchema failed :" + e.getMessage());
       }
       sorted = WritableUtils.readVInt(in) == 1 ? true : false;
-      setCGDeletedFlags(path, conf);
+      if (deletedCGs == null)
+        setCGDeletedFlags(path, conf);
+      else {
+        for (String deletedCG : deletedCGs)
+        {
+          for (int i = 0; i < cgschemas.length; i++)
+          {
+            if (cgschemas[i].getName().equals(deletedCG))
+              cgDeletedFlags[i] = true;
+          }
+        }
+      }
       if (version.compareTo(new Version((short)1, (short)0)) > 0)
       {
         int numSortColumns = WritableUtils.readVInt(in);
@@ -1915,7 +1946,23 @@
       }
     }
     
-    
+    String getDeletedCGs() {
+      StringBuilder sb = new StringBuilder();
+      // comma separated
+      boolean first = true;
+      for (int i = 0; i < physical.length; i++) {
+        if (cgDeletedFlags[i])
+        {
+          if (first)
+            first = false;
+          else {
+            sb.append(DELETED_CG_SEPARATOR_PER_TABLE);
+          }
+          sb.append(getName(i));
+        }
+      }
+      return sb.toString();
+    }
   }
 
   static public void dumpInfo(String file, PrintStream out, Configuration conf)

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=905856&r1=905855&r2=905856&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Wed Feb  3 01:08:15 2010
@@ -225,6 +225,7 @@
     SplitColumn top; // directly associated with logical schema
     SplitColumn leaf; // corresponding to projection
     boolean closed;
+    boolean dirty;
 
     /**
      * Get the Column Group physical schema without loading the full CG index.
@@ -255,13 +256,24 @@
      */
     public Reader(Path path, Configuration conf) throws IOException,
       ParseException {
-      this(path, true, conf);
+      this(path, conf, false);
     }
-
+    
+    public Reader(Path path, Configuration conf, boolean mapper) throws IOException,
+      ParseException {
+      this(path, true, conf, mapper);
+    }
+    
     Reader(Path path, boolean dirty, Configuration conf) throws IOException,
       ParseException {
+      this(path, dirty, conf, false);
+    }
+
+    Reader(Path path, boolean dirty, Configuration conf, boolean mapper) throws IOException,
+      ParseException {
       this.path = path;
       this.conf = conf;
+      this.dirty = dirty;
 
       fs = path.getFileSystem(conf);
       // check existence of path
@@ -269,7 +281,7 @@
         throw new IOException("Path doesn't exist: " + path);
       }
 
-      if (!fs.getFileStatus(path).isDir()) {
+      if (!mapper && !fs.getFileStatus(path).isDir()) {
         throw new IOException("Path exists but not a directory: " + path);
       }
 
@@ -279,8 +291,8 @@
       }
       projection = new Projection(cgschema.getSchema()); // default projection to CG schema.
       Path metaFilePath = makeMetaFilePath(path);
-      /* If index file is not existing or loading from an unsorted table. */
-      if (!fs.exists(metaFilePath) || !cgschema.isSorted() ) {
+      /* If index file is not existing */
+      if (!fs.exists(metaFilePath)) {
         // special case for unsorted CG that did not create index properly.
         if (cgschema.isSorted()) {
           throw new FileNotFoundException(
@@ -288,15 +300,16 @@
         }
         cgindex = buildIndex(fs, path, dirty, conf);
       }
-      else {
+      else if (cgschema.isSorted()) {
         MetaFile.Reader metaFile = MetaFile.createReader(metaFilePath, conf);
         try {
           cgindex = new CGIndex();
           DataInputStream dis = metaFile.getMetaBlock(BLOCK_NAME_INDEX);
           try {
             cgindex.readFields(dis);
-          }
-          finally {
+          } catch (IOException e) {
+            throw new IOException("Index file read failure :"+ e.getMessage());
+          } finally {
             dis.close();
           }
         }
@@ -429,6 +442,8 @@
       }
 
       if (split == null) {
+        if (cgindex == null)
+          cgindex = buildIndex(fs, path, dirty, conf);
         return getScanner(new CGRangeSplit(0, cgindex.size()), closeReader);
       }
       if (split.len < 0) {
@@ -474,6 +489,8 @@
         return getBlockDistribution(new CGRangeSplit(0, cgindex.size()));
       }
 
+      if (cgindex == null)
+        cgindex = buildIndex(fs, path, dirty, conf);
       if ((split.start | split.len | (cgindex.size() - split.start - split.len)) < 0) {
         throw new IndexOutOfBoundsException("Bad split");
       }
@@ -509,10 +526,9 @@
       }
 
       BlockDistribution ret = new BlockDistribution();
-      if (split.fileIndex >= 0)
+      if (split.name != null)
       {
-        CGIndexEntry entry = cgindex.get(split.fileIndex);
-        FileStatus tfileStatus = fs.getFileStatus(new Path(path, entry.getName())); 
+        FileStatus tfileStatus = fs.getFileStatus(new Path(path, split.name)); 
         
         BlockLocation[] locations = fs.getFileBlockLocations(tfileStatus, split.startByte, split.numBytes);
         for (BlockLocation l : locations) {
@@ -532,17 +548,26 @@
     void fillRowSplit(CGRowSplit rowSplit, long startOffset, long length) 
                       throws IOException {
 
-      if (rowSplit.fileIndex < 0)
+      if (rowSplit.name == null)
         return;
 
-      Path tfPath = new Path(path, cgindex.get(rowSplit.fileIndex).getName());
-      FileStatus tfile = fs.getFileStatus(tfPath);
+      Path tfPath = new Path(path, rowSplit.name);
 
+      long size = rowSplit.size;
+      if (size == 0)
+      {
+        /* the on disk table is sorted. Later this will be made unnecessary when
+         * CGIndexEntry serializes its bytes field and the meta file versioning is
+         * supported.
+         */ 
+        FileStatus tfile = fs.getFileStatus(tfPath);
+        size = tfile.getLen();
+      }
       TFile.Reader reader = null;
       
       try {
         reader = new TFile.Reader(fs.open(tfPath),
-                                  tfile.getLen(), conf);
+                                  size, conf);
 
         long startRow = reader.getRecordNumNear(startOffset);
         long endRow = reader.getRecordNumNear(startOffset + length);
@@ -703,7 +728,9 @@
     /**
      * Get the status of the ColumnGroup.
      */
-    public BasicTableStatus getStatus() {
+    public BasicTableStatus getStatus() throws IOException {
+      if (cgindex == null)
+        cgindex = buildIndex(fs, path, dirty, conf);
       return cgindex.status;
     }
 
@@ -715,10 +742,12 @@
      * @return A list of range-based splits, whose size may be less than or
      *         equal to n.
      */
-    public List<CGRangeSplit> rangeSplit(int n) {
+    public List<CGRangeSplit> rangeSplit(int n) throws IOException {
       // The output of this method must be only dependent on the cgindex and
       // input parameter n - so that horizontally stitched column groups will
       // get aligned splits.
+      if (cgindex == null)
+        cgindex = buildIndex(fs, path, dirty, conf);
       int numFiles = cgindex.size();
       if ((numFiles < n) || (n < 0)) {
         return rangeSplit(numFiles);
@@ -752,8 +781,10 @@
         long start = starts[i];
         long length = lengths[i];
         Path path = paths[i];
-        int idx = cgindex.getFileIndex(path);        
-        lst.add(new CGRowSplit(idx, start, length));
+        if (cgindex == null)
+          cgindex = buildIndex(fs, this.path, dirty, conf);
+        long size = cgindex.get(cgindex.getFileIndex(path)).bytes;
+        lst.add(new CGRowSplit(path.getName(), start, length, size));
       }
       
       return lst;
@@ -796,7 +827,7 @@
            * compressor is inside cgschema
            */
           reader = new TFile.Reader(ins, fs.getFileStatus(path).getLen(), conf);
-          if (rowRange != null && rowRange.fileIndex >= 0) {
+          if (rowRange != null) {
             scanner = reader.createScannerByRecordNum(rowRange.startRow, 
                                          rowRange.startRow + rowRange.numRows);
           } else {
@@ -921,6 +952,8 @@
 
       CGScanner(CGRangeSplit split, boolean closeReader) throws IOException,
       ParseException {
+        if (cgindex== null)
+          cgindex = buildIndex(fs, path, dirty, conf);
         if (split == null) {
           beginIndex = 0;
           endIndex = cgindex.size();
@@ -940,15 +973,9 @@
        */
       CGScanner(CGRowSplit rowRange, boolean closeReader) 
                  throws IOException, ParseException {
+        
         beginIndex = 0;
-        endIndex = cgindex.size();
-        if (rowRange != null && rowRange.fileIndex>= 0) {
-          if (rowRange.fileIndex >= cgindex.size()) {
-            throw new IllegalArgumentException("Part Index is out of range.");
-          }
-          beginIndex = rowRange.fileIndex;
-          endIndex = beginIndex+1;
-        }
+        endIndex = 1;
         init(rowRange, null, null, closeReader);
       }
       
@@ -981,8 +1008,15 @@
           for (int i = beginIndex; i < endIndex; ++i) {
             RawComparable begin = (i == beginIndex) ? beginKey : null;
             RawComparable end = (i == endIndex - 1) ? endKey : null;
-            TFileScanner scanner =
-                new TFileScanner(fs, cgindex.getPath(i, path), rowRange, 
+            TFileScanner scanner;
+            if (rowRange != null)
+              scanner =
+                new TFileScanner(fs, new Path(path, rowRange.name), rowRange, 
+                                 begin, end,
+                                 cgschema, logicalSchema, conf);
+            else
+              scanner =
+                new TFileScanner(fs, cgindex.getPath(i, path), null, 
                                  begin, end,
                                  cgschema, logicalSchema, conf);
             // skip empty scanners.
@@ -1160,16 +1194,18 @@
     }
     
     public static class CGRowSplit implements Writable {
-      int fileIndex = -1;
+      String name;
       long startByte = -1;
       long numBytes = -1;
       long startRow = -1;
       long numRows = -1;
+      long size = 0; // size of the file in the selected CG 
 
-      CGRowSplit(int fileIdx, long start, long len) {
-        this.fileIndex = fileIdx;
+      CGRowSplit(String name, long start, long len, long size) {
+        this.name = name;
         this.startByte = start;
         this.numBytes = len;
+        this.size = size;
       }
 
       public CGRowSplit() {
@@ -1179,31 +1215,34 @@
       @Override
       public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("{fileIndex = " + fileIndex + "}\n");       
+        sb.append("{name = " + name + "}\n");       
         sb.append("{startByte = " + startByte + "}\n");
         sb.append("{numBytes = " + numBytes + "}\n");
         sb.append("{startRow = " + startRow + "}\n");
         sb.append("{numRows = " + numRows + "}\n");
+        sb.append("{size = " + size + "}\n");
         
         return sb.toString();
       }
 
       @Override
       public void readFields(DataInput in) throws IOException {
-        fileIndex = Utils.readVInt(in);
+        name = Utils.readString(in);
         startByte = Utils.readVLong(in);
         numBytes = Utils.readVLong(in);
         startRow = Utils.readVLong(in);
         numRows = Utils.readVLong(in);
+        size = Utils.readVLong(in);
       }
 
       @Override
       public void write(DataOutput out) throws IOException {
-        Utils.writeVInt(out, fileIndex);
+        Utils.writeString(out, name);
         Utils.writeVLong(out, startByte);
         Utils.writeVLong(out, numBytes);
         Utils.writeVLong(out, startRow);
         Utils.writeVLong(out, numRows);
+        Utils.writeVLong(out, size);
       }      
     }
     
@@ -1444,23 +1483,13 @@
     private void createIndex() throws IOException {
       MetaFile.Writer metaFile =
           MetaFile.createWriter(makeMetaFilePath(path), conf);
-      if (cgschema.isSorted()) {
-        CGIndex index = buildIndex(fs, path, false, conf);
-        DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
-        try {
-          index.write(dos);
-        }
-        finally {
-          dos.close();
-        }
-      } else { /* Create an empty data meta file for unsorted table. */
-        DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
-        try {
-          Utils.writeString(dos, "");
-        } 
-        finally {
-          dos.close();
-        }
+      CGIndex index = buildIndex(fs, path, false, conf);
+      DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
+      try {
+        index.write(dos);
+      }
+      finally {
+        dos.close();
       }
       metaFile.close();
     }
@@ -1689,7 +1718,7 @@
   static class CGIndexEntry implements RawComparable, Writable {
     int index;
     String name;
-    long rows;
+    long rows, bytes;
     RawComparable firstKey;
     RawComparable lastKey;
 
@@ -1890,6 +1919,7 @@
       status.rows += rows;
       index.add(range);
       sorted = false;
+      range.bytes = bytes;
     }
 
     // building dirty index
@@ -1901,6 +1931,7 @@
       next.name = name;
       index.add(next);
       sorted = false;
+      next.bytes = bytes;
     }
 
     int lowerBound(RawComparable key, final Comparator<RawComparable> comparator)
@@ -1935,6 +1966,7 @@
       for (int i = 0; i < n; ++i) {
         CGIndexEntry range = new CGIndexEntry();
         range.readFields(in);
+        range.setIndex(i);
         index.add(range);
       }
       status.readFields(in);
@@ -2035,6 +2067,8 @@
         out.printf("%s : %s\n", e.getKey(), e.getValue());
       }
       out.println("TFiles within the Column Group :");
+      if (reader.cgindex == null)
+        reader.cgindex = buildIndex(reader.fs, reader.path, reader.dirty, conf);
       for (CGIndexEntry entry : reader.cgindex.index) {
         IOutils.indent(out, indent);
         out.printf(" *Name : %s\n", entry.name);

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java?rev=905856&r1=905855&r2=905856&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java Wed Feb  3 01:08:15 2010
@@ -106,7 +106,8 @@
   @Override
   public TableScanner getScanner(BytesWritable begin, BytesWritable end,
       String projection, Configuration conf) throws IOException {
-    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    String[] deletedCGs = getDeletedCGs(conf);
+    BasicTable.Reader reader = new BasicTable.Reader(path, deletedCGs, conf);
     try {
       reader.setProjection(projection);
     } catch (ParseException e) {

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java?rev=905856&r1=905855&r2=905856&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java Wed Feb  3 01:08:15 2010
@@ -27,7 +27,6 @@
 import org.apache.hadoop.zebra.io.BasicTable;
 import org.apache.hadoop.zebra.io.TableScanner;
 import org.apache.hadoop.zebra.parser.ParseException;
-import org.apache.hadoop.zebra.types.Projection;
 import org.apache.hadoop.zebra.schema.Schema;
 
 /**
@@ -105,7 +104,6 @@
    * @see Schema
    * @return A TableScanner object.
    */
-  @SuppressWarnings("unused")
   public TableScanner getScanner(BytesWritable begin,
       BytesWritable end, String projection, Configuration conf)
       throws IOException {
@@ -127,7 +125,7 @@
   public TableScanner getScanner(UnsortedTableSplit split, String projection,
       Configuration conf) throws IOException, ParseException {
     BasicTable.Reader reader =
-        new BasicTable.Reader(new Path(split.getPath()), conf);
+        new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
     reader.setProjection(projection);
     return reader.getScanner(split.getSplit(), true);
   }
@@ -147,7 +145,7 @@
   public TableScanner getScanner(RowTableSplit split, String projection,
       Configuration conf) throws IOException, ParseException, ParseException {
     BasicTable.Reader reader =
-        new BasicTable.Reader(new Path(split.getPath()), conf);
+        new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
     reader.setProjection(projection);
     return reader.getScanner(true, split.getSplit());
   }
@@ -240,4 +238,31 @@
    * dump table info with indent
    */
   protected abstract void dumpInfo(PrintStream ps, Configuration conf, int indent) throws IOException;
+  
+  /**
+   * get the deleted cg for tables in union
+   * @param conf The Configuration object
+   * @return
+   */
+  protected final String[] getDeletedCGsPerUnion(Configuration conf) {
+    return getDeletedCGs(conf, TableInputFormat.DELETED_CG_SEPARATOR_PER_UNION);
+  }
+  
+  protected final String[] getDeletedCGs(Configuration conf) {
+    return getDeletedCGs(conf, BasicTable.DELETED_CG_SEPARATOR_PER_TABLE);
+  }
+  
+  private final String[] getDeletedCGs(Configuration conf, String separator) {
+    String[] deletedCGs = null;
+    String fe;
+    if ((fe = conf.get(TableInputFormat.INPUT_FE)) != null && fe.equals("true"))
+    {
+      String original = conf.get(TableInputFormat.INPUT_DELETEED_CGS, null);
+      if (original == null)
+        deletedCGs = new String[0]; // empty array needed to indicate it is fe checked
+      else
+        deletedCGs = original.split(separator, -1);
+    }
+    return deletedCGs;
+  }
 }

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=905856&r1=905855&r2=905856&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Wed Feb  3 01:08:15 2010
@@ -140,9 +140,12 @@
 public class TableInputFormat implements InputFormat<BytesWritable, Tuple> {
   static Log LOG = LogFactory.getLog(TableInputFormat.class);
   
-  private static final String INPUT_EXPR = "mapred.lib.table.input.expr";
-  private static final String INPUT_PROJ = "mapred.lib.table.input.projection";
-  private static final String INPUT_SORT = "mapred.lib.table.input.sort";
+  public static final String INPUT_EXPR = "mapred.lib.table.input.expr";
+  public static final String INPUT_PROJ = "mapred.lib.table.input.projection";
+  public static final String INPUT_SORT = "mapred.lib.table.input.sort";
+  public static final String INPUT_FE = "mapred.lib.table.input.fe";
+  public static final String INPUT_DELETEED_CGS = "mapred.lib.table.input.deleted_cgs";
+  static final String DELETED_CG_SEPARATOR_PER_UNION = ";";
 
   /**
    * Set the paths to the input table.
@@ -642,8 +645,7 @@
   }
   
   private static InputSplit[] getRowSplits(JobConf conf, int numSplits,
-      TableExpr expr, List<BasicTable.Reader> readers, 
-      List<BasicTableStatus> status) throws IOException {
+      TableExpr expr, List<BasicTable.Reader> readers) throws IOException {
     ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
     DummyFileInputFormat helper = new DummyFileInputFormat(getMinSplitSize(conf));
 
@@ -715,25 +717,40 @@
         new ArrayList<BasicTableStatus>(nLeaves);
 
     try {
+      StringBuilder sb = new StringBuilder();
+      boolean sorted = expr.sortedSplitRequired();
+      boolean first = true;
       for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext();) {
         LeafTableInfo leaf = it.next();
         BasicTable.Reader reader =
           new BasicTable.Reader(leaf.getPath(), conf);
         reader.setProjection(leaf.getProjection());
-        BasicTableStatus s = reader.getStatus();
+        if (sorted)
+        {
+          BasicTableStatus s = reader.getStatus();
+          status.add(s);
+        }
         readers.add(reader);
-        status.add(s);
+        if (first)
+          first = false;
+        else {
+          sb.append(TableInputFormat.DELETED_CG_SEPARATOR_PER_UNION);
+        }
+        sb.append(reader.getDeletedCGs());
       }
       
+      conf.set(INPUT_FE, "true");
+      conf.set(INPUT_DELETEED_CGS, sb.toString());
+      
       if (readers.isEmpty()) {
         return new InputSplit[0];
       }
       
-      if (expr.sortedSplitRequired()) {
+      if (sorted) {
         return getSortedSplits(conf, numSplits, expr, readers, status);
       }
        
-      return getRowSplits(conf, numSplits, expr, readers, status);
+      return getRowSplits(conf, numSplits, expr, readers);
     } catch (ParseException e) {
       throw new IOException("Projection parsing failed : "+e.getMessage());
     }

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java?rev=905856&r1=905855&r2=905856&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java Wed Feb  3 01:08:15 2010
@@ -105,14 +105,20 @@
       throw new IllegalArgumentException("Union of 0 table");
     }
     ArrayList<BasicTable.Reader> readers = new ArrayList<BasicTable.Reader>(n);
-    final ArrayList<BasicTableStatus> status =
-        new ArrayList<BasicTableStatus>(n);
+    String[] deletedCGsInUnion = getDeletedCGsPerUnion(conf);
+    
+    if (deletedCGsInUnion != null && deletedCGsInUnion.length != n)
+      throw new IllegalArgumentException("Invalid string of deleted column group names: expected = "+
+          n + " actual =" + deletedCGsInUnion.length);
+    
     for (int i = 0; i < n; ++i) {
+      String deletedCGs = (deletedCGsInUnion == null ? null : deletedCGsInUnion[i]);
+      String[] deletedCGList = (deletedCGs == null ? null : 
+        deletedCGs.split(BasicTable.DELETED_CG_SEPARATOR_PER_TABLE));
       BasicTableExpr expr = (BasicTableExpr) composite.get(i);
       BasicTable.Reader reader =
-          new BasicTable.Reader(expr.getPath(), conf);
+          new BasicTable.Reader(expr.getPath(), deletedCGList, conf);
       readers.add(reader);
-      status.add(reader.getStatus());
     }
 
     String actualProjection = projection;

Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=905856&r1=905855&r2=905856&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Wed Feb  3 01:08:15 2010
@@ -430,6 +430,9 @@
 		private TreeMap<String, String> configMap;
 		private InputSplit split;
     
+    transient private final String[] zebraConfs = {TableInputFormat.INPUT_EXPR, 
+        TableInputFormat.INPUT_PROJ, TableInputFormat.INPUT_SORT, 
+        TableInputFormat.INPUT_DELETEED_CGS, TableInputFormat.INPUT_FE, "mapred.input.dir"};
 		transient private JobConf conf;
 		transient private int numProjCols = 0;
 		transient private RecordReader<BytesWritable, Tuple> scanner;
@@ -437,16 +440,16 @@
     transient private boolean sorted = false;
 
 		TableSlice(JobConf conf, InputSplit split, boolean sorted) {
-			// hack: expecting JobConf contains nothing but a <string, string>
-			// key-value pair store.
 			configMap = new TreeMap<String, String>();
-			for (Iterator<Map.Entry<String, String>> it = conf.iterator(); it.hasNext();) {
-				Map.Entry<String, String> e = it.next();
-				configMap.put(e.getKey(), e.getValue());
-			}
-			
-			
-			
+      String value;
+
+      for (String zebraConf : zebraConfs)
+      {
+        value = conf.get(zebraConf);
+        if (value != null)
+          configMap.put(zebraConf, value);
+      }
+
 			this.split = split;
 			this.sorted = sorted;
 		}
@@ -500,14 +503,14 @@
 
 		@Override
 		public void init(DataStorage store) throws IOException {
-			Configuration localConf = new Configuration();
+			Configuration localConf = ConfigurationUtil.toConfiguration(store.getConfiguration());
 			for (Iterator<Map.Entry<String, String>> it =
 				configMap.entrySet().iterator(); it.hasNext();) {
 				Map.Entry<String, String> e = it.next();
 				localConf.set(e.getKey(), e.getValue());
 			}
 			conf = new JobConf(localConf);
-			String projection;			
+			String projection;
 			try
 			{
 				projection = TableInputFormat.getProjection(conf);
@@ -516,8 +519,8 @@
 			}
 			numProjCols = Projection.getNumColumns(projection);
 			TableInputFormat inputFormat = new TableInputFormat();
-			if (sorted)
-				TableInputFormat.requireSortedTable(conf, null);
+      if (sorted)
+        TableInputFormat.requireSortedTable(conf, null);
 			scanner = inputFormat.getRecordReader(split, conf, Reporter.NULL);
 			key = new BytesWritable();
 		}