You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/02/18 18:01:31 UTC

svn commit: r911493 - in /hadoop/pig/branches/load-store-redesign/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/ src/java/org/apache/hadoop/zebra/mapreduce/ src/test/org/apache/hadoop/zebra/io/ src/test/...

Author: yanz
Date: Thu Feb 18 17:01:30 2010
New Revision: 911493

URL: http://svn.apache.org/viewvc?rev=911493&view=rev
Log:
PIG-1227: Throw exception if column group meta file is missing for an unsorted table (yanz) ; PIG-1201: unnecessary name node calls by each mapper; too big input split serialization size by Pig's Slice implementation (yanz)

Modified:
    hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupReaders.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableLoader.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoaderPrune.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageGrammar.java

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt Thu Feb 18 17:01:30 2010
@@ -58,6 +58,10 @@
 
   BUG FIXES
 
+    PIG-1227: Throw exception if column group meta file is missing for an unsorted table (yanz)
+
+    PIG-1201: unnecessary name node calls by each mapper; too big input split serialization size by Pig's Slice implementation (yanz)
+
     PIG-1115: cleanup of temp files left by failed tasks (gauravj via yanz)
 
     PIG-1167: Hadoop file glob support (yanz)

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Thu Feb 18 17:01:30 2010
@@ -92,6 +92,8 @@
 
   private final static String DELETED_CG_PREFIX = ".deleted-";
   
+  public final static String DELETED_CG_SEPARATOR_PER_TABLE = ",";
+
   // no public ctor for instantiating a BasicTable object
   private BasicTable() {
     // no-op
@@ -139,7 +141,7 @@
     /* Retry up to numCGs times accounting for other CG deleting threads or processes.*/
     while (triedCount ++ < numCGs) {
       try {
-        schemaFile = new SchemaFile(path, conf);
+        schemaFile = new SchemaFile(path, null, conf);
         break;
       } catch (FileNotFoundException e) {
         LOG.info("Try " + triedCount + " times : " + e.getMessage());
@@ -195,7 +197,7 @@
     } catch (IOException e) {
       // one remote possibility is that another user 
       // already deleted CG. 
-      SchemaFile tempSchema = new SchemaFile(path, conf);
+      SchemaFile tempSchema = new SchemaFile(path, null, conf);
       if (tempSchema.isCGDeleted(cgIdx)) {
         LOG.info(path + " : " + cgName + 
                  " is deleted by someone else. That is ok.");
@@ -278,10 +280,15 @@
      *          Optional configuration parameters.
      * @throws IOException
      */
+
     public Reader(Path path, Configuration conf) throws IOException {
+      this(path, null, conf);
+    }
+    public Reader(Path path, String[] deletedCGs, Configuration conf) throws IOException {
       try {
+        boolean mapper = (deletedCGs != null);
         this.path = path;
-        schemaFile = new SchemaFile(path, conf);
+        schemaFile = new SchemaFile(path, deletedCGs, conf);
         metaReader = MetaFile.createReader(new Path(path, BT_META_FILE), conf);
         // create column group readers
         int numCGs = schemaFile.getNumOfPhysicalSchemas();
@@ -298,7 +305,7 @@
           if (!schemaFile.isCGDeleted(nx)) {
             colGroups[nx] =
               new ColumnGroup.Reader(new Path(path, partition.getCGSchema(nx).getName()),
-                                     conf);
+                                     conf, mapper);
             if (firstValidCG < 0) {
               firstValidCG = nx;
             }
@@ -308,7 +315,8 @@
           else
             cgTuples[nx] = null;
         }
-        buildStatus();
+        if (schemaFile.isSorted())
+          buildStatus();
         closed = false;
       }
       catch (Exception e) {
@@ -407,7 +415,9 @@
     /**
      * Get the status of the BasicTable.
      */
-    public BasicTableStatus getStatus() {
+    public BasicTableStatus getStatus() throws IOException {
+      if (status == null)
+        buildStatus();
       return status;
     }
 
@@ -562,13 +572,16 @@
      * 
      * @param path
      *          The path to the BasicTable.
+     * @deletedCGs
+     *          The deleted column groups from front end; null if unavailable from front end
      * @param conf
      * @return The logical Schema of the table (all columns).
      * @throws IOException
      */
     public static Schema getSchema(Path path, Configuration conf)
         throws IOException {
-      SchemaFile schF = new SchemaFile(path, conf);
+      // fake an empty deleted cg list as getSchema does not care about deleted cgs
+      SchemaFile schF = new SchemaFile(path, new String[0], conf);
       return schF.getLogical();
     }
 
@@ -650,7 +663,7 @@
      * Get index of the column group that will be used for row-based split. 
      * 
      */
-    public int getRowSplitCGIndex() {
+    public int getRowSplitCGIndex() throws IOException {
       // Try to find the largest non-deleted and used column group by projection;
       int largestCGIndex = -1;
       int splitCGIndex = -1;
@@ -722,8 +735,12 @@
     String getStorageString() {
       return schemaFile.getStorageString();
     }
+    
+    public String getDeletedCGs() {
+      return schemaFile.getDeletedCGs();
+    }
 
-    private void buildStatus() {
+    private void buildStatus() throws IOException {
       status = new BasicTableStatus();
       if (firstValidCG >= 0) {
         status.beginKey = colGroups[firstValidCG].getStatus().getBeginKey();
@@ -911,11 +928,12 @@
         int cgIdx = rowSplit.getCGIndex();
         
         CGRowSplit cgSplit = new CGRowSplit();
-        cgSplit.fileIndex = inputCGSplit.fileIndex;
+        cgSplit.name = inputCGSplit.name;
         // startByte and numBytes from inputCGSplit are ignored, since
         // they make sense for only one CG.
         cgSplit.startRow = inputCGSplit.startRow;
         cgSplit.numRows = inputCGSplit.numRows;
+        cgSplit.size = inputCGSplit.size;
         
         if (cgSplit.startRow >= 0) {
           //assume the rows are already set up.
@@ -1345,7 +1363,8 @@
       try {
       	actualOutputPath = path;
     	writerConf = conf;    	  
-        schemaFile = new SchemaFile(path, conf);
+        // fake an empty deleted cg list as no cg should have been deleted now
+        schemaFile = new SchemaFile(path, new String[0], conf);
         int numCGs = schemaFile.getNumOfPhysicalSchemas();
         partition = schemaFile.getPartition();
         sorted = schemaFile.isSorted();
@@ -1677,8 +1696,8 @@
     boolean[] cgDeletedFlags;
    
     // ctor for reading
-    public SchemaFile(Path path, Configuration conf) throws IOException {
-      readSchemaFile(path, conf);
+    public SchemaFile(Path path, String[] deletedCGs, Configuration conf) throws IOException {
+      readSchemaFile(path, deletedCGs, conf);
     }
 
     public Schema[] getPhysicalSchema() {
@@ -1825,7 +1844,7 @@
       outSchema.close();
     }
 
-    private void readSchemaFile(Path path, Configuration conf)
+    private void readSchemaFile(Path path, String[] deletedCGs, Configuration conf)
         throws IOException {
       Path pathSchema = makeSchemaFilePath(path);
       if (!path.getFileSystem(conf).exists(pathSchema)) {
@@ -1872,7 +1891,18 @@
         throw new IOException("parser.RecordSchema failed :" + e.getMessage());
       }
       sorted = WritableUtils.readVInt(in) == 1 ? true : false;
-      setCGDeletedFlags(path, conf);
+      if (deletedCGs == null)
+        setCGDeletedFlags(path, conf);
+      else {
+        for (String deletedCG : deletedCGs)
+        {
+          for (int i = 0; i < cgschemas.length; i++)
+          {
+            if (cgschemas[i].getName().equals(deletedCG))
+              cgDeletedFlags[i] = true;
+          }
+        }
+      }
       if (version.compareTo(new Version((short)1, (short)0)) > 0)
       {
         int numSortColumns = WritableUtils.readVInt(in);
@@ -1942,7 +1972,23 @@
       }
     }
     
-    
+    String getDeletedCGs() {
+      StringBuilder sb = new StringBuilder();
+      // comma separated
+      boolean first = true;
+      for (int i = 0; i < physical.length; i++) {
+        if (cgDeletedFlags[i])
+        {
+          if (first)
+            first = false;
+          else {
+            sb.append(DELETED_CG_SEPARATOR_PER_TABLE);
+          }
+          sb.append(getName(i));
+        }
+      }
+      return sb.toString();
+    }
   }
 
   static public void dumpInfo(String file, PrintStream out, Configuration conf)

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Thu Feb 18 17:01:30 2010
@@ -225,6 +225,7 @@
     SplitColumn top; // directly associated with logical schema
     SplitColumn leaf; // corresponding to projection
     boolean closed;
+    boolean dirty;
 
     /**
      * Get the Column Group physical schema without loading the full CG index.
@@ -255,13 +256,24 @@
      */
     public Reader(Path path, Configuration conf) throws IOException,
       ParseException {
-      this(path, true, conf);
+      this(path, conf, false);
     }
-
+    
+    public Reader(Path path, Configuration conf, boolean mapper) throws IOException,
+      ParseException {
+      this(path, true, conf, mapper);
+    }
+    
     Reader(Path path, boolean dirty, Configuration conf) throws IOException,
       ParseException {
+      this(path, dirty, conf, false);
+    }
+
+    Reader(Path path, boolean dirty, Configuration conf, boolean mapper) throws IOException,
+      ParseException {
       this.path = path;
       this.conf = conf;
+      this.dirty = dirty;
 
       fs = path.getFileSystem(conf);
       // check existence of path
@@ -269,7 +281,7 @@
         throw new IOException("Path doesn't exist: " + path);
       }
 
-      if (!fs.getFileStatus(path).isDir()) {
+      if (!mapper && !fs.getFileStatus(path).isDir()) {
         throw new IOException("Path exists but not a directory: " + path);
       }
 
@@ -279,24 +291,21 @@
       }
       projection = new Projection(cgschema.getSchema()); // default projection to CG schema.
       Path metaFilePath = makeMetaFilePath(path);
-      /* If index file is not existing or loading from an unsorted table. */
-      if (!fs.exists(metaFilePath) || !cgschema.isSorted() ) {
-        // special case for unsorted CG that did not create index properly.
-        if (cgschema.isSorted()) {
-          throw new FileNotFoundException(
-              "Missing Meta File for sorted Column Group");
-        }
-        cgindex = buildIndex(fs, path, dirty, conf);
+      /* If index file is not existing */
+      if (!fs.exists(metaFilePath)) {
+        throw new FileNotFoundException(
+              "Missing Meta File of " + metaFilePath);
       }
-      else {
+      else if (cgschema.isSorted()) {
         MetaFile.Reader metaFile = MetaFile.createReader(metaFilePath, conf);
         try {
           cgindex = new CGIndex();
           DataInputStream dis = metaFile.getMetaBlock(BLOCK_NAME_INDEX);
           try {
             cgindex.readFields(dis);
-          }
-          finally {
+          } catch (IOException e) {
+            throw new IOException("Index file read failure :"+ e.getMessage());
+          } finally {
             dis.close();
           }
         }
@@ -429,6 +438,8 @@
       }
 
       if (split == null) {
+        if (cgindex == null)
+          cgindex = buildIndex(fs, path, dirty, conf);
         return getScanner(new CGRangeSplit(0, cgindex.size()), closeReader);
       }
       if (split.len < 0) {
@@ -474,6 +485,8 @@
         return getBlockDistribution(new CGRangeSplit(0, cgindex.size()));
       }
 
+      if (cgindex == null)
+        cgindex = buildIndex(fs, path, dirty, conf);
       if ((split.start | split.len | (cgindex.size() - split.start - split.len)) < 0) {
         throw new IndexOutOfBoundsException("Bad split");
       }
@@ -509,10 +522,9 @@
       }
 
       BlockDistribution ret = new BlockDistribution();
-      if (split.fileIndex >= 0)
+      if (split.name != null)
       {
-        CGIndexEntry entry = cgindex.get(split.fileIndex);
-        FileStatus tfileStatus = fs.getFileStatus(new Path(path, entry.getName())); 
+        FileStatus tfileStatus = fs.getFileStatus(new Path(path, split.name)); 
         
         BlockLocation[] locations = fs.getFileBlockLocations(tfileStatus, split.startByte, split.numBytes);
         for (BlockLocation l : locations) {
@@ -532,17 +544,26 @@
     void fillRowSplit(CGRowSplit rowSplit, long startOffset, long length) 
                       throws IOException {
 
-      if (rowSplit.fileIndex < 0)
+      if (rowSplit.name == null)
         return;
 
-      Path tfPath = new Path(path, cgindex.get(rowSplit.fileIndex).getName());
-      FileStatus tfile = fs.getFileStatus(tfPath);
+      Path tfPath = new Path(path, rowSplit.name);
 
+      long size = rowSplit.size;
+      if (size == 0)
+      {
+        /* the on disk table is sorted. Later this will be made unnecessary when
+         * CGIndexEntry serializes its bytes field and the meta file versioning is
+         * supported.
+         */ 
+        FileStatus tfile = fs.getFileStatus(tfPath);
+        size = tfile.getLen();
+      }
       TFile.Reader reader = null;
       
       try {
         reader = new TFile.Reader(fs.open(tfPath),
-                                  tfile.getLen(), conf);
+                                  size, conf);
 
         long startRow = reader.getRecordNumNear(startOffset);
         long endRow = reader.getRecordNumNear(startOffset + length);
@@ -703,7 +724,9 @@
     /**
      * Get the status of the ColumnGroup.
      */
-    public BasicTableStatus getStatus() {
+    public BasicTableStatus getStatus() throws IOException {
+      if (cgindex == null)
+        cgindex = buildIndex(fs, path, dirty, conf);
       return cgindex.status;
     }
 
@@ -715,10 +738,12 @@
      * @return A list of range-based splits, whose size may be less than or
      *         equal to n.
      */
-    public List<CGRangeSplit> rangeSplit(int n) {
+    public List<CGRangeSplit> rangeSplit(int n) throws IOException {
       // The output of this method must be only dependent on the cgindex and
       // input parameter n - so that horizontally stitched column groups will
       // get aligned splits.
+      if (cgindex == null)
+        cgindex = buildIndex(fs, path, dirty, conf);
       int numFiles = cgindex.size();
       if ((numFiles < n) || (n < 0)) {
         return rangeSplit(numFiles);
@@ -752,8 +777,10 @@
         long start = starts[i];
         long length = lengths[i];
         Path path = paths[i];
-        int idx = cgindex.getFileIndex(path);        
-        lst.add(new CGRowSplit(idx, start, length));
+        if (cgindex == null)
+          cgindex = buildIndex(fs, this.path, dirty, conf);
+        long size = cgindex.get(cgindex.getFileIndex(path)).bytes;
+        lst.add(new CGRowSplit(path.getName(), start, length, size));
       }
       
       return lst;
@@ -796,7 +823,7 @@
            * compressor is inside cgschema
            */
           reader = new TFile.Reader(ins, fs.getFileStatus(path).getLen(), conf);
-          if (rowRange != null && rowRange.fileIndex >= 0) {
+          if (rowRange != null) {
             scanner = reader.createScannerByRecordNum(rowRange.startRow, 
                                          rowRange.startRow + rowRange.numRows);
           } else {
@@ -922,6 +949,8 @@
 
       CGScanner(CGRangeSplit split, boolean closeReader) throws IOException,
       ParseException {
+        if (cgindex== null)
+          cgindex = buildIndex(fs, path, dirty, conf);
         if (split == null) {
           beginIndex = 0;
           endIndex = cgindex.size();
@@ -941,15 +970,9 @@
        */
       CGScanner(CGRowSplit rowRange, boolean closeReader) 
                  throws IOException, ParseException {
+        
         beginIndex = 0;
-        endIndex = cgindex.size();
-        if (rowRange != null && rowRange.fileIndex>= 0) {
-          if (rowRange.fileIndex >= cgindex.size()) {
-            throw new IllegalArgumentException("Part Index is out of range.");
-          }
-          beginIndex = rowRange.fileIndex;
-          endIndex = beginIndex+1;
-        }
+        endIndex = 1;
         init(rowRange, null, null, closeReader);
       }
       
@@ -982,8 +1005,15 @@
           for (int i = beginIndex; i < endIndex; ++i) {
             RawComparable begin = (i == beginIndex) ? beginKey : null;
             RawComparable end = (i == endIndex - 1) ? endKey : null;
-            TFileScanner scanner =
-                new TFileScanner(fs, cgindex.getPath(i, path), rowRange, 
+            TFileScanner scanner;
+            if (rowRange != null)
+              scanner =
+                new TFileScanner(fs, new Path(path, rowRange.name), rowRange, 
+                                 begin, end,
+                                 cgschema, logicalSchema, conf);
+            else
+              scanner =
+                new TFileScanner(fs, cgindex.getPath(i, path), null, 
                                  begin, end,
                                  cgschema, logicalSchema, conf);
             // skip empty scanners.
@@ -1161,16 +1191,18 @@
     }
     
     public static class CGRowSplit implements Writable {
-      int fileIndex = -1;
+      String name;
       long startByte = -1;
       long numBytes = -1;
       long startRow = -1;
       long numRows = -1;
+      long size = 0; // size of the file in the selected CG 
 
-      CGRowSplit(int fileIdx, long start, long len) {
-        this.fileIndex = fileIdx;
+      CGRowSplit(String name, long start, long len, long size) {
+        this.name = name;
         this.startByte = start;
         this.numBytes = len;
+        this.size = size;
       }
 
       public CGRowSplit() {
@@ -1180,31 +1212,34 @@
       @Override
       public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("{fileIndex = " + fileIndex + "}\n");       
+        sb.append("{name = " + name + "}\n");       
         sb.append("{startByte = " + startByte + "}\n");
         sb.append("{numBytes = " + numBytes + "}\n");
         sb.append("{startRow = " + startRow + "}\n");
         sb.append("{numRows = " + numRows + "}\n");
+        sb.append("{size = " + size + "}\n");
         
         return sb.toString();
       }
 
       @Override
       public void readFields(DataInput in) throws IOException {
-        fileIndex = Utils.readVInt(in);
+        name = Utils.readString(in);
         startByte = Utils.readVLong(in);
         numBytes = Utils.readVLong(in);
         startRow = Utils.readVLong(in);
         numRows = Utils.readVLong(in);
+        size = Utils.readVLong(in);
       }
 
       @Override
       public void write(DataOutput out) throws IOException {
-        Utils.writeVInt(out, fileIndex);
+        Utils.writeString(out, name);
         Utils.writeVLong(out, startByte);
         Utils.writeVLong(out, numBytes);
         Utils.writeVLong(out, startRow);
         Utils.writeVLong(out, numRows);
+        Utils.writeVLong(out, size);
       }      
     }
     
@@ -1466,24 +1501,14 @@
 
     private void createIndex() throws IOException {
       MetaFile.Writer metaFile =
-          MetaFile.createWriter(makeMetaFilePath(finalOutputPath), conf);
-      if (cgschema.isSorted()) {
-        CGIndex index = buildIndex(fs, finalOutputPath, false, conf);
-        DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
-        try {
-          index.write(dos);
-        }
-        finally {
-          dos.close();
-        }
-      } else { /* Create an empty data meta file for unsorted table. */
-        DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
-        try {
-          Utils.writeString(dos, "");
-        } 
-        finally {
-          dos.close();
-        }
+      MetaFile.createWriter(makeMetaFilePath(finalOutputPath), conf);
+      CGIndex index = buildIndex(fs, finalOutputPath, false, conf);
+      DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
+      try {
+        index.write(dos);
+      }
+      finally {
+        dos.close();
       }
       metaFile.close();
     }
@@ -1714,7 +1739,7 @@
   static class CGIndexEntry implements RawComparable, Writable {
     int index;
     String name;
-    long rows;
+    long rows, bytes;
     RawComparable firstKey;
     RawComparable lastKey;
 
@@ -1915,6 +1940,7 @@
       status.rows += rows;
       index.add(range);
       sorted = false;
+      range.bytes = bytes;
     }
 
     // building dirty index
@@ -1926,6 +1952,7 @@
       next.name = name;
       index.add(next);
       sorted = false;
+      next.bytes = bytes;
     }
 
     int lowerBound(RawComparable key, final Comparator<RawComparable> comparator)
@@ -1960,6 +1987,7 @@
       for (int i = 0; i < n; ++i) {
         CGIndexEntry range = new CGIndexEntry();
         range.readFields(in);
+        range.setIndex(i);
         index.add(range);
       }
       status.readFields(in);
@@ -2060,6 +2088,8 @@
         out.printf("%s : %s\n", e.getKey(), e.getValue());
       }
       out.println("TFiles within the Column Group :");
+      if (reader.cgindex == null)
+        reader.cgindex = buildIndex(reader.fs, reader.path, reader.dirty, conf);
       for (CGIndexEntry entry : reader.cgindex.index) {
         IOutils.indent(out, indent);
         out.printf(" *Name : %s\n", entry.name);

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java Thu Feb 18 17:01:30 2010
@@ -106,7 +106,8 @@
   @Override
   public TableScanner getScanner(BytesWritable begin, BytesWritable end,
       String projection, Configuration conf) throws IOException {
-    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    String[] deletedCGs = getDeletedCGs(conf);
+    BasicTable.Reader reader = new BasicTable.Reader(path, deletedCGs, conf);
     try {
       reader.setProjection(projection);
     } catch (ParseException e) {

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java Thu Feb 18 17:01:30 2010
@@ -27,7 +27,6 @@
 import org.apache.hadoop.zebra.io.BasicTable;
 import org.apache.hadoop.zebra.io.TableScanner;
 import org.apache.hadoop.zebra.parser.ParseException;
-import org.apache.hadoop.zebra.types.Projection;
 import org.apache.hadoop.zebra.schema.Schema;
 
 /**
@@ -105,7 +104,6 @@
    * @see Schema
    * @return A TableScanner object.
    */
-  @SuppressWarnings("unused")
   public TableScanner getScanner(BytesWritable begin,
       BytesWritable end, String projection, Configuration conf)
       throws IOException {
@@ -127,7 +125,7 @@
   public TableScanner getScanner(UnsortedTableSplit split, String projection,
       Configuration conf) throws IOException, ParseException {
     BasicTable.Reader reader =
-        new BasicTable.Reader(new Path(split.getPath()), conf);
+        new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
     reader.setProjection(projection);
     return reader.getScanner(split.getSplit(), true);
   }
@@ -147,7 +145,7 @@
   public TableScanner getScanner(RowTableSplit split, String projection,
       Configuration conf) throws IOException, ParseException, ParseException {
     BasicTable.Reader reader =
-        new BasicTable.Reader(new Path(split.getPath()), conf);
+        new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
     reader.setProjection(projection);
     return reader.getScanner(true, split.getSplit());
   }
@@ -240,4 +238,31 @@
    * dump table info with indent
    */
   protected abstract void dumpInfo(PrintStream ps, Configuration conf, int indent) throws IOException;
+  
+  /**
+   * get the deleted cg for tables in union
+   * @param conf The Configuration object
+   * @return
+   */
+  protected final String[] getDeletedCGsPerUnion(Configuration conf) {
+    return getDeletedCGs(conf, TableInputFormat.DELETED_CG_SEPARATOR_PER_UNION);
+  }
+  
+  protected final String[] getDeletedCGs(Configuration conf) {
+    return getDeletedCGs(conf, BasicTable.DELETED_CG_SEPARATOR_PER_TABLE);
+  }
+  
+  private final String[] getDeletedCGs(Configuration conf, String separator) {
+    String[] deletedCGs = null;
+    String fe;
+    if ((fe = conf.get(TableInputFormat.INPUT_FE)) != null && fe.equals("true"))
+    {
+      String original = conf.get(TableInputFormat.INPUT_DELETED_CGS, null);
+      if (original == null)
+        deletedCGs = new String[0]; // empty array needed to indicate it is fe checked
+      else
+        deletedCGs = original.split(separator, -1);
+    }
+    return deletedCGs;
+  }
 }

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Thu Feb 18 17:01:30 2010
@@ -143,9 +143,12 @@
 public class TableInputFormat implements InputFormat<BytesWritable, Tuple> {
   static Log LOG = LogFactory.getLog(TableInputFormat.class);
   
-  private static final String INPUT_EXPR = "mapred.lib.table.input.expr";
-  private static final String INPUT_PROJ = "mapred.lib.table.input.projection";
-  private static final String INPUT_SORT = "mapred.lib.table.input.sort";
+  public static final String INPUT_EXPR = "mapred.lib.table.input.expr";
+  public static final String INPUT_PROJ = "mapred.lib.table.input.projection";
+  public static final String INPUT_SORT = "mapred.lib.table.input.sort";
+  public static final String INPUT_FE = "mapred.lib.table.input.fe";
+  public static final String INPUT_DELETED_CGS = "mapred.lib.table.input.deleted_cgs";
+  static final String DELETED_CG_SEPARATOR_PER_UNION = ";";
 
   /**
    * Set the paths to the input table.
@@ -645,8 +648,7 @@
   }
   
   private static InputSplit[] getRowSplits(JobConf conf, int numSplits,
-      TableExpr expr, List<BasicTable.Reader> readers, 
-      List<BasicTableStatus> status) throws IOException {
+      TableExpr expr, List<BasicTable.Reader> readers) throws IOException {
     ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
     DummyFileInputFormat helper = new DummyFileInputFormat(getMinSplitSize(conf));
 
@@ -718,25 +720,40 @@
         new ArrayList<BasicTableStatus>(nLeaves);
 
     try {
+      StringBuilder sb = new StringBuilder();
+      boolean sorted = expr.sortedSplitRequired();
+      boolean first = true;
       for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext();) {
         LeafTableInfo leaf = it.next();
         BasicTable.Reader reader =
           new BasicTable.Reader(leaf.getPath(), conf);
         reader.setProjection(leaf.getProjection());
-        BasicTableStatus s = reader.getStatus();
+        if (sorted)
+        {
+          BasicTableStatus s = reader.getStatus();
+          status.add(s);
+        }
         readers.add(reader);
-        status.add(s);
+        if (first)
+          first = false;
+        else {
+          sb.append(TableInputFormat.DELETED_CG_SEPARATOR_PER_UNION);
+        }
+        sb.append(reader.getDeletedCGs());
       }
       
+      conf.set(INPUT_FE, "true");
+      conf.set(INPUT_DELETED_CGS, sb.toString());
+      
       if (readers.isEmpty()) {
         return new InputSplit[0];
       }
       
-      if (expr.sortedSplitRequired()) {
+      if (sorted) {
         return getSortedSplits(conf, numSplits, expr, readers, status);
       }
        
-      return getRowSplits(conf, numSplits, expr, readers, status);
+      return getRowSplits(conf, numSplits, expr, readers);
     } catch (ParseException e) {
       throw new IOException("Projection parsing failed : "+e.getMessage());
     }

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java Thu Feb 18 17:01:30 2010
@@ -105,14 +105,20 @@
       throw new IllegalArgumentException("Union of 0 table");
     }
     ArrayList<BasicTable.Reader> readers = new ArrayList<BasicTable.Reader>(n);
-    final ArrayList<BasicTableStatus> status =
-        new ArrayList<BasicTableStatus>(n);
+    String[] deletedCGsInUnion = getDeletedCGsPerUnion(conf);
+    
+    if (deletedCGsInUnion != null && deletedCGsInUnion.length != n)
+      throw new IllegalArgumentException("Invalid string of deleted column group names: expected = "+
+          n + " actual =" + deletedCGsInUnion.length);
+    
     for (int i = 0; i < n; ++i) {
+      String deletedCGs = (deletedCGsInUnion == null ? null : deletedCGsInUnion[i]);
+      String[] deletedCGList = (deletedCGs == null ? null : 
+        deletedCGs.split(BasicTable.DELETED_CG_SEPARATOR_PER_TABLE));
       BasicTableExpr expr = (BasicTableExpr) composite.get(i);
       BasicTable.Reader reader =
-          new BasicTable.Reader(expr.getPath(), conf);
+          new BasicTable.Reader(expr.getPath(), deletedCGList, conf);
       readers.add(reader);
-      status.add(reader.getStatus());
     }
 
     String actualProjection = projection;

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java Thu Feb 18 17:01:30 2010
@@ -106,7 +106,8 @@
   @Override
   public TableScanner getScanner(BytesWritable begin, BytesWritable end,
       String projection, Configuration conf) throws IOException {
-    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    String[] deletedCGs = getDeletedCGs(conf);
+    BasicTable.Reader reader = new BasicTable.Reader(path, deletedCGs, conf);
     try {
       reader.setProjection(projection);
     } catch (ParseException e) {

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java Thu Feb 18 17:01:30 2010
@@ -27,6 +27,7 @@
 import org.apache.hadoop.zebra.io.BasicTable;
 import org.apache.hadoop.zebra.io.TableScanner;
 import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
 import org.apache.hadoop.zebra.schema.Schema;
 
 /**
@@ -125,7 +126,7 @@
   public TableScanner getScanner(UnsortedTableSplit split, String projection,
       Configuration conf) throws IOException, ParseException {
     BasicTable.Reader reader =
-        new BasicTable.Reader(new Path(split.getPath()), conf);
+        new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
     reader.setProjection(projection);
     return reader.getScanner(split.getSplit(), true);
   }
@@ -145,7 +146,7 @@
   public TableScanner getScanner(RowTableSplit split, String projection,
       Configuration conf) throws IOException, ParseException, ParseException {
     BasicTable.Reader reader =
-        new BasicTable.Reader(new Path(split.getPath()), conf);
+        new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
     reader.setProjection(projection);
     return reader.getScanner(true, split.getSplit());
   }
@@ -238,4 +239,31 @@
    * dump table info with indent
    */
   protected abstract void dumpInfo(PrintStream ps, Configuration conf, int indent) throws IOException;
+
+  /**
+   * get the deleted cg for tables in union
+   * @param conf The Configuration object
+   * @return
+   */
+  protected final String[] getDeletedCGsPerUnion(Configuration conf) {
+    return getDeletedCGs(conf, TableInputFormat.DELETED_CG_SEPARATOR_PER_UNION);
+  }
+  
+  protected final String[] getDeletedCGs(Configuration conf) {
+    return getDeletedCGs(conf, BasicTable.DELETED_CG_SEPARATOR_PER_TABLE);
+  }
+  
+  private final String[] getDeletedCGs(Configuration conf, String separator) {
+    String[] deletedCGs = null;
+    String fe;
+    if ((fe = conf.get(TableInputFormat.INPUT_FE)) != null && fe.equals("true"))
+    {
+      String original = conf.get(TableInputFormat.INPUT_DELETED_CGS, null);
+      if (original == null)
+        deletedCGs = new String[0]; // empty array needed to indicate it is fe checked
+      else
+        deletedCGs = original.split(separator, -1);
+    }
+    return deletedCGs;
+  }
 }

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java Thu Feb 18 17:01:30 2010
@@ -146,6 +146,9 @@
   private static final String INPUT_EXPR = "mapreduce.lib.table.input.expr";
   private static final String INPUT_PROJ = "mapreduce.lib.table.input.projection";
   private static final String INPUT_SORT = "mapreduce.lib.table.input.sort";
+  static final String INPUT_FE = "mapreduce.lib.table.input.fe";
+  static final String INPUT_DELETED_CGS = "mapreduce.lib.table.input.deleted_cgs";
+  static final String DELETED_CG_SEPARATOR_PER_UNION = ";";
 
   /**
    * Set the paths to the input table.
@@ -624,8 +627,7 @@
   }
   
   private static List<InputSplit> getRowSplits(Configuration conf,
-      TableExpr expr, List<BasicTable.Reader> readers, 
-      List<BasicTableStatus> status) throws IOException {
+      TableExpr expr, List<BasicTable.Reader> readers) throws IOException {
     ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
     Job job = new Job(conf);
     DummyFileInputFormat helper = new DummyFileInputFormat(job, getMinSplitSize(conf));
@@ -708,24 +710,39 @@
     		new ArrayList<BasicTableStatus>(nLeaves);
 
     	try {
+        StringBuilder sb = new StringBuilder();
+        boolean sorted = expr.sortedSplitRequired();
+        boolean first = true;
     		for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext();) {
     			LeafTableInfo leaf = it.next();
     			BasicTable.Reader reader =
     				new BasicTable.Reader(leaf.getPath(), conf );
     			reader.setProjection(leaf.getProjection());
-    			BasicTableStatus s = reader.getStatus();
+          if (sorted)
+          {
+    			  BasicTableStatus s = reader.getStatus();
+    		  	status.add(s);
+          }
     			readers.add(reader);
-    			status.add(s);
+          if (first)
+            first = false;
+          else {
+            sb.append(TableInputFormat.DELETED_CG_SEPARATOR_PER_UNION);
+          }
+          sb.append(reader.getDeletedCGs());
     		}
 
+        conf.set(INPUT_FE, "true");
+        conf.set(INPUT_DELETED_CGS, sb.toString());
+
     		if( readers.isEmpty() ) {
     			// I think we should throw exception here.
     			return new ArrayList<InputSplit>();
     		}
 
-    		return expr.sortedSplitRequired() ? 
-    				singleSplit ? getSortedSplits( conf, 1, expr, readers, status ) : getSortedSplits(conf, -1, expr, readers, status) : 
-    					getRowSplits( conf, expr, readers, status );
+    		return sorted ? 
+    				singleSplit ? getSortedSplits( conf, 1, expr, readers, status) : getSortedSplits(conf, -1, expr, readers, status) : 
+    					getRowSplits( conf, expr, readers);
     	} catch (ParseException e) {
     		throw new IOException("Projection parsing failed : "+e.getMessage());
     	} finally {

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java Thu Feb 18 17:01:30 2010
@@ -105,14 +105,20 @@
       throw new IllegalArgumentException("Union of 0 table");
     }
     ArrayList<BasicTable.Reader> readers = new ArrayList<BasicTable.Reader>(n);
-    final ArrayList<BasicTableStatus> status =
-        new ArrayList<BasicTableStatus>(n);
+    String[] deletedCGsInUnion = getDeletedCGsPerUnion(conf);
+
+    if (deletedCGsInUnion != null && deletedCGsInUnion.length != n)
+      throw new IllegalArgumentException("Invalid string of deleted column group names: expected = "+
+          n + " actual =" + deletedCGsInUnion.length);
+
     for (int i = 0; i < n; ++i) {
+      String deletedCGs = (deletedCGsInUnion == null ? null : deletedCGsInUnion[i]);
+      String[] deletedCGList = (deletedCGs == null ? null :
+        deletedCGs.split(BasicTable.DELETED_CG_SEPARATOR_PER_TABLE));
       BasicTableExpr expr = (BasicTableExpr) composite.get(i);
       BasicTable.Reader reader =
-          new BasicTable.Reader(expr.getPath(), conf);
+          new BasicTable.Reader(expr.getPath(), deletedCGList, conf);
       readers.add(reader);
-      status.add(reader.getStatus());
     }
 
     String actualProjection = projection;

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java Thu Feb 18 17:01:30 2010
@@ -353,7 +353,7 @@
   @Test
   public void testNormalCases() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestBasicTableNormal");
-    doReadWrite(path, 2, 250, "a, b, c", "", null, "a, d, c, f", false, false);
+    doReadWrite(path, 2, 250, "a, b, c", "", null, "a, d, c, f", true, false);
     doReadWrite(path, 2, 250, "a, b, c", "", null, "a, d, c, f", true, false);
     doReadWrite(path, 2, 250, "a, b, c", "", "a", "a, d, c, f", true, true);
   }

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java Thu Feb 18 17:01:30 2010
@@ -411,7 +411,6 @@
   @Test
   public void testEmptyCG() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestColumnGroupEmptyCG");
-    doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", false, false, null);
     doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, false, null);
     doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, true, null);
   }
@@ -419,14 +418,12 @@
   @Test
   public void testEmptyTFiles() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestColumnGroupEmptyTFile");
-    doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", false, false, null);
     doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, false, null);
     doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, true, null);    
   }
 
   public void testNormalCases() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestColumnGroupNormal");
-    doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", false, false, null);
     doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, false, null);
     doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, true, null);
   }
@@ -435,8 +432,6 @@
   public void testSomeEmptyTFiles() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestColumnGroupSomeEmptyTFile");
 		for (int[] emptyTFiles : new int[][] { { 1, 2 }}) {
-      doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", false, false,
-          emptyTFiles);
       doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, false,
           emptyTFiles);
       doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, true,

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupReaders.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupReaders.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupReaders.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupReaders.java Thu Feb 18 17:01:30 2010
@@ -67,7 +67,7 @@
 
   @AfterClass
   public static void tearDownOnce() throws IOException {
-    finish();
+    close();
   }
 
   @SuppressWarnings("unchecked")
@@ -175,9 +175,7 @@
     ColumnGroup.Writer writer2 = writeOnePart(null, 2);
     ColumnGroup.Writer writer3 = writeOnePart(null, 3);
 
-    writer1.finish();
-    writer2.finish();
-    writer3.finish();
+    writer3.close();
 
     // read in parts
     readOnePart(1);
@@ -327,7 +325,7 @@
 
   private static void finish() throws IOException {
     if (writer != null) {
-      writer.finish();
+      writer.close();
     }
   }
 

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java Thu Feb 18 17:01:30 2010
@@ -412,7 +412,6 @@
   @Test
   public void testEmptyCG() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestColumnGroupEmptyCG");
-    doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", false, false, null);
     doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, false, null);
     doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, true, null);
   }
@@ -420,14 +419,12 @@
   @Test
   public void testEmptyTFiles() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestColumnGroupEmptyTFile");
-    doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", false, false, null);
     doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, false, null);
     doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, true, null);    
   }
 
   public void testNormalCases() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestColumnGroupNormal");
-    doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", false, false, null);
     doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, false, null);
     doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, true, null);
   }
@@ -436,8 +433,6 @@
   public void testSomeEmptyTFiles() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestColumnGroupSomeEmptyTFile");
 		for (int[] emptyTFiles : new int[][] { { 1, 2 }}) {
-      doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", false, false,
-          emptyTFiles);
       doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, false,
           emptyTFiles);
       doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, true,

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java Thu Feb 18 17:01:30 2010
@@ -305,6 +305,7 @@
     // set map-only job.
     jobConf.setNumReduceTasks(0);
     JobClient.runJob(jobConf);
+    BasicTableOutputFormat.close(jobConf);
   }
 
   /**
@@ -599,6 +600,7 @@
     jobConf.setNumReduceTasks(options.numReducer);
 
     JobClient.runJob(jobConf);
+    BasicTableOutputFormat.close(jobConf);
   }
 
   void reduce(Summary sum, Summary delta) {
@@ -950,6 +952,7 @@
     jobConf.setNumReduceTasks(1);
 
     JobClient.runJob(jobConf);
+    BasicTableOutputFormat.close(jobConf);
   }
 
   void printFreqWords() throws IOException, ParseException {

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java Thu Feb 18 17:01:30 2010
@@ -108,6 +108,7 @@
     for (int i = 0; i < numsInserters; i++) {
       inserters[i].close();
     }
+    writer.close();
 
     /*
      * create 2nd basic table;
@@ -141,6 +142,7 @@
     for (int i = 0; i < numsInserters; i++) {
       inserters[i].close();
     }
+    writer.close();
   }
 
   @AfterClass

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java Thu Feb 18 17:01:30 2010
@@ -126,7 +126,8 @@
     for (int i = 0; i < numsInserters; i++) {
       inserters[i].close();
     }
-
+    writer.close();
+    
     /*
      * create 2nd basic table;
      */
@@ -159,7 +160,8 @@
     for (int i = 0; i < numsInserters; i++) {
       inserters[i].close();
     }
-
+    writer.close();
+    
     /*
      * create 3rd basic table;
      */
@@ -192,6 +194,8 @@
     for (int i = 0; i < numsInserters; i++) {
       inserters[i].close();
     }
+    writer.close();
+    
     /*
      * create 4th basic table;
      */
@@ -224,6 +228,7 @@
     for (int i = 0; i < numsInserters; i++) {
       inserters[i].close();
     }
+    writer.close();
     /*
      * create 5th basic table;
      */
@@ -256,7 +261,7 @@
     for (int i = 0; i < numsInserters; i++) {
       inserters[i].close();
     }
-
+    writer.close();
   }
 
   @AfterClass

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java Thu Feb 18 17:01:30 2010
@@ -118,6 +118,7 @@
     for (int i = 0; i < numsInserters; i++) {
       inserters[i].close();
     }
+    writer.close();
   }
 
   @AfterClass
@@ -137,4 +138,4 @@
       System.out.println(cur);
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java Thu Feb 18 17:01:30 2010
@@ -159,6 +159,7 @@
     for (int i = 0; i < numsInserters; i++) {
       inserters[i].close();
     }
+    writer.close();
   }
 
   @AfterClass

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableLoader.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableLoader.java Thu Feb 18 17:01:30 2010
@@ -114,6 +114,7 @@
     for (int i = 0; i < numsInserters; i++) {
       inserters[i].close();
     }
+    writer.close();
   }
 
   @AfterClass

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java Thu Feb 18 17:01:30 2010
@@ -103,6 +103,7 @@
     for (int i = 0; i < numsInserters; i++) {
       inserters[i].close();
     }
+    writer.close();
   }
 
   @AfterClass

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoaderPrune.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoaderPrune.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoaderPrune.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoaderPrune.java Thu Feb 18 17:01:30 2010
@@ -149,6 +149,7 @@
     for (int i = 0; i < numsInserters; i++) {
       inserters[i].close();
     }
+    writer.close();
   }
 
   @AfterClass

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageGrammar.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageGrammar.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageGrammar.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageGrammar.java Thu Feb 18 17:01:30 2010
@@ -359,9 +359,10 @@
     } catch (Exception e) {
       e.printStackTrace();
     }
-    writer.finish();
+    writer.close();
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     PrintStream ps = new PrintStream(bos);
+
     BasicTable.dumpInfo(path1.toString(), ps, conf);
     System.out.println("start dumpinfo ===========\n" + bos.toString());
     Assert.assertEquals(true, bos.toString().contains("Serializer: pig"));
@@ -382,7 +383,7 @@
     } catch (Exception e) {
       e.printStackTrace();
     }
-    writer.finish();
+    writer.close();
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     PrintStream ps = new PrintStream(bos);
     BasicTable.dumpInfo(path1.toString(), ps, conf);
@@ -404,7 +405,7 @@
     } catch (Exception e) {
       e.printStackTrace();
     }
-    writer.finish();
+    writer.close();
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     PrintStream ps = new PrintStream(bos);
     BasicTable.dumpInfo(path1.toString(), ps, conf);
@@ -426,7 +427,7 @@
     } catch (Exception e) {
       e.printStackTrace();
     }
-    writer.finish();
+    writer.close();
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     PrintStream ps = new PrintStream(bos);
     BasicTable.dumpInfo(path1.toString(), ps, conf);
@@ -448,7 +449,7 @@
     } catch (Exception e) {
       e.printStackTrace();
     }
-    writer.finish();
+    writer.close();
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     PrintStream ps = new PrintStream(bos);
     BasicTable.dumpInfo(path1.toString(), ps, conf);
@@ -470,7 +471,7 @@
     } catch (Exception e) {
       e.printStackTrace();
     }
-    writer.finish();
+    writer.close();
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     PrintStream ps = new PrintStream(bos);
     BasicTable.dumpInfo(path1.toString(), ps, conf);
@@ -491,7 +492,7 @@
     } catch (Exception e) {
       e.printStackTrace();
     }
-    writer.finish();
+    writer.close();
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     PrintStream ps = new PrintStream(bos);
     BasicTable.dumpInfo(path1.toString(), ps, conf);
@@ -605,7 +606,7 @@
     fs = path.getFileSystem(conf);
     BasicTable.Writer writer = new BasicTable.Writer(path1, schema, storage,
         conf);
-    writer.finish();
+    writer.close();
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     PrintStream ps = new PrintStream(bos);
     System.out.println("start dumpinfo 17 ===========");
@@ -699,7 +700,7 @@
     fs = path.getFileSystem(conf);
     BasicTable.Writer writer = new BasicTable.Writer(path1, schema, storage,
         conf);
-    writer.finish();
+    writer.close();
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     PrintStream ps = new PrintStream(bos);
     System.out.println("start dumpinfo 22===========" + bos.toString());