You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/03/28 14:16:54 UTC

svn commit: r928385 [1/3] - in /hadoop/pig/branches/branch-0.7/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/mapreduce/ src/java/org/apache/hadoop/zebra/pig/ src/test/org/apache/hadoop/zebra/ src/test/org/apache/hadoop/zebra/mapred/ src/test/org/a...

Author: yanz
Date: Sun Mar 28 12:16:54 2010
New Revision: 928385

URL: http://svn.apache.org/viewvc?rev=928385&view=rev
Log:
PIG-1306 Support of locally sorted input splits (yanz)

Modified:
    hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjection.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveSimple.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveUnion.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveVariableTable.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionMergeJoin.java
    hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionSourceTableProj.java

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt Sun Mar 28 12:16:54 2010
@@ -16,6 +16,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    PIG-1306 Support of locally sorted input splits (yanz)
+
     PIG-1268 Need an ant target that runs all pig-related tests in Zebra (xuefuz via yanz)
 
     PIG-1207 Data sanity check should be performed at the end of writing instead of later at query time (yanz)

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java Sun Mar 28 12:16:54 2010
@@ -29,6 +29,8 @@ import org.apache.hadoop.zebra.io.BasicT
 import org.apache.hadoop.zebra.io.TableScanner;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.pig.data.Tuple;
 
 /**
  * Table expression for reading a BasicTable.
@@ -117,6 +119,12 @@ class BasicTableExpr extends TableExpr {
   } 
 
   @Override
+  public TableScanner getScanner(RowTableSplit split, String projection,
+      Configuration conf) throws IOException, ParseException {
+    return new BasicTableScanner(split, projection, conf);
+  }
+  
+  @Override
   public Schema getSchema(Configuration conf) throws IOException {
     return BasicTable.Reader.getSchema(path, conf);
   }
@@ -131,4 +139,75 @@ class BasicTableExpr extends TableExpr {
   {
     BasicTable.dumpInfo(path.toString(), ps, conf, indent);
   }
+  
+  /**
+   * Basic Table Scanner
+   */
+  class BasicTableScanner implements TableScanner {
+    private int tableIndex = -1;
+    private Integer[] virtualColumnIndices = null;
+    private TableScanner scanner = null;
+    
+    BasicTableScanner(RowTableSplit split, String projection,
+        Configuration conf) throws IOException, ParseException, ParseException {
+      tableIndex = split.getTableIndex();
+      virtualColumnIndices = Projection.getVirtualColumnIndices(projection);
+      BasicTable.Reader reader =
+        new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
+      reader.setProjection(projection);
+      scanner = reader.getScanner(true, split.getSplit());
+    }
+    
+    @Override
+    public boolean advance() throws IOException {
+      return scanner.advance();
+    }
+    
+    @Override
+    public boolean atEnd() throws IOException {
+      return scanner.atEnd();
+    }
+    
+    @Override
+    public Schema getSchema() {
+      return scanner.getSchema();
+    }
+    
+    @Override
+    public void getKey(BytesWritable key) throws IOException {
+      scanner.getKey(key);
+    }
+    
+    @Override
+    public void getValue(Tuple row) throws IOException {
+      scanner.getValue(row);
+      if (virtualColumnIndices != null)
+      {
+        for (int i = 0; i < virtualColumnIndices.length; i++)
+        {
+          row.set(virtualColumnIndices[i], tableIndex);
+        }
+      }
+    }
+    
+    @Override
+    public boolean seekTo(BytesWritable key) throws IOException {
+      return scanner.seekTo(key);
+    }
+    
+    @Override
+    public void seekToEnd() throws IOException {
+      scanner.seekToEnd();
+    }
+    
+    @Override 
+    public void close() throws IOException {
+      scanner.close();
+    }
+    
+    @Override
+    public String getProjection() {
+      return scanner.getProjection();
+    }
+  }
 }

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java Sun Mar 28 12:16:54 2010
@@ -27,7 +27,6 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.zebra.io.BasicTable;
 import org.apache.hadoop.zebra.io.TableScanner;
 import org.apache.hadoop.zebra.parser.ParseException;
-import org.apache.hadoop.zebra.types.Projection;
 import org.apache.hadoop.zebra.schema.Schema;
 
 /**
@@ -132,10 +131,10 @@ abstract class TableExpr {
   }
   
   /**
-   * Get a scanner with an unsorted split.
+   * Get a scanner with an row split.
    * 
    * @param split
-   *          The range split.
+   *          The row split.
    * @param projection
    *          The projection schema. It should never be null.
    * @param conf
@@ -144,11 +143,8 @@ abstract class TableExpr {
    * @throws IOException
    */
   public TableScanner getScanner(RowTableSplit split, String projection,
-      Configuration conf) throws IOException, ParseException, ParseException {
-    BasicTable.Reader reader =
-        new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
-    reader.setProjection(projection);
-    return reader.getScanner(true, split.getSplit());
+      Configuration conf) throws IOException, ParseException {
+    return null;
   }
   
   /**

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java Sun Mar 28 12:16:54 2010
@@ -144,6 +144,12 @@ import org.apache.pig.data.Tuple;
  * </UL>
  */
 public class TableInputFormat extends InputFormat<BytesWritable, Tuple> {
+  public enum SplitMode {
+    UNSORTED, /* Data is not sorted. Default split mode*/
+    LOCALLY_SORTED, /* Output by the each mapper is sorted */
+    GLOBALLY_SORTED /* Output is locally sorted and the key ranges are not overlapped, not even on boundary */
+  };
+    
   static Log LOG = LogFactory.getLog(TableInputFormat.class);
   
   private static final String INPUT_EXPR = "mapreduce.lib.table.input.expr";
@@ -151,6 +157,10 @@ public class TableInputFormat extends In
   private static final String INPUT_SORT = "mapreduce.lib.table.input.sort";
   static final String INPUT_FE = "mapreduce.lib.table.input.fe";
   static final String INPUT_DELETED_CGS = "mapreduce.lib.table.input.deleted_cgs";
+  private static final String INPUT_SPLIT_MODE = "mapreduce.lib.table.input.split_mode";
+  private static final String UNSORTED = "unsorted";
+  private static final String GLOBALLY_SORTED = "globally_sorted";
+  private static final String LOCALLY_SORTED = "locally_sorted";
   static final String DELETED_CG_SEPARATOR_PER_UNION = ";";
 
   /**
@@ -316,16 +326,28 @@ public class TableInputFormat extends In
     return null;
   }
       
-  /**
-   * Set requirement for sorted table
-   *
-   *@param conf
-   *          Configuration object.
-   */
-  private static void setSorted(Configuration conf) {
+  private static boolean globalOrderingRequired(JobContext jobContext)
+  {
+    Configuration conf = jobContext.getConfiguration();
+    String result = conf.get(INPUT_SPLIT_MODE, UNSORTED);
+    return result.equalsIgnoreCase(GLOBALLY_SORTED);
+  }
+
+  private static void setSorted(JobContext jobContext) {
+    Configuration conf = jobContext.getConfiguration();
     conf.setBoolean(INPUT_SORT, true);
   }
   
+  private static void setSorted(JobContext jobContext, SplitMode sm)
+  {
+    setSorted(jobContext);
+    Configuration conf = jobContext.getConfiguration();
+	  if (sm == SplitMode.GLOBALLY_SORTED)
+      conf.set(INPUT_SPLIT_MODE, GLOBALLY_SORTED);
+    else if (sm == SplitMode.LOCALLY_SORTED)
+      conf.set(INPUT_SPLIT_MODE, LOCALLY_SORTED);
+  }
+  
   /**
    * Get the SortInfo object regarding a Zebra table
    *
@@ -368,13 +390,33 @@ public class TableInputFormat extends In
   /**
    * Requires sorted table or table union
    * 
+   * @deprecated
+   * 
    * @param jobContext
    *          JobContext object.
    * @param sortInfo
    *          ZebraSortInfo object containing sorting information.
    *        
    */
-  public static void requireSortedTable(JobContext jobContext, ZebraSortInfo sortInfo) throws IOException {
+  
+   public static void requireSortedTable(JobContext jobContext, ZebraSortInfo sortInfo) throws IOException {
+     setSplitMode(jobContext, SplitMode.GLOBALLY_SORTED, sortInfo);
+   }
+
+  /**
+   * 
+   * @param conf
+   *          JonConf object
+   * @param sm
+   *          Split mode: unsorted, globally sorted, locally sorted. Default is unsorted
+   * @param sortInfo
+   *          ZebraSortInfo object containing sorting information. Will be ignored if
+   *          the split mode is null or unsorted
+   * @throws IOException
+   */
+  public static void setSplitMode(JobContext jobContext, SplitMode sm, ZebraSortInfo sortInfo) throws IOException {
+   if (sm == null || sm == SplitMode.UNSORTED)
+    return;
 	 TableExpr expr = getInputExpr( jobContext );
      Configuration conf = jobContext.getConfiguration();
 	 String comparatorName = null;
@@ -428,8 +470,7 @@ public class TableInputFormat extends In
        }
 		 }
 	 }
-   // need key range input splits for sorted table union
-   	 setSorted(conf);
+	 setSorted(jobContext, sm);
   }
   
   /**
@@ -599,6 +640,7 @@ public class TableInputFormat extends In
       SortedTableSplit split = new SortedTableSplit(beginB, endB, bd, conf);
       splits.add(split);
     }
+    LOG.info("getSplits : returning " + splits.size() + " sorted splits.");
     return splits;
   }
   
@@ -716,7 +758,8 @@ public class TableInputFormat extends In
               FileStatus[] fileStatuses = fs.listStatus(globStat.getPath(), inputFilter);
               // reorder according to CG index
               BasicTable.Reader reader = readers.get(index);
-              reader.rearrangeFileIndices(fileStatuses);
+              if (fileStatuses.length > 1)
+                reader.rearrangeFileIndices(fileStatuses);
               for(FileStatus stat: fileStatuses) {
                 if (stat != null)
                   result.add(stat);
@@ -758,6 +801,7 @@ public class TableInputFormat extends In
     boolean first = true;
     PathFilter filter = null;
     List<BasicTable.Reader> realReaders = new ArrayList<BasicTable.Reader>();
+    int[] realReaderIndices = new int[readers.size()];
 
     for (int i = 0; i < readers.size(); ++i) {
       BasicTable.Reader reader = readers.get(i);
@@ -766,7 +810,8 @@ public class TableInputFormat extends In
       
       /* We can create input splits only if there does exist a valid column group for split.
        * Otherwise, we do not create input splits. */
-      if (splitCGIndex >= 0) {        
+      if (splitCGIndex >= 0) {
+        realReaderIndices[realReaders.size()] = i;
         realReaders.add(reader);
          if (first)
          {
@@ -862,11 +907,13 @@ public class TableInputFormat extends In
         if (splitLen > 0)
           batches[++numBatches] = splitLen;
         
-        List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex, batches, numBatches);
+        List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex,
+            batches, numBatches);
         
+        int realTableIndex = realReaderIndices[tableIndex];
         for (Iterator<RowSplit> it = subSplits.iterator(); it.hasNext();) {
           RowSplit subSplit = it.next();
-          RowTableSplit split = new RowTableSplit(reader, subSplit, conf);
+          RowTableSplit split = new RowTableSplit(reader, subSplit, realTableIndex, conf);
           ret.add(split);
         }
       }
@@ -884,7 +931,7 @@ public class TableInputFormat extends In
 	    return getSplits( jobContext, false );
     }
 
-    private List<InputSplit> getSplits(JobContext jobContext, boolean singleSplit) throws IOException {
+    List<InputSplit> getSplits(JobContext jobContext, boolean singleSplit) throws IOException {
     	Configuration conf = jobContext.getConfiguration();
     	TableExpr expr = getInputExpr( conf );
     	if( getSorted(conf) ) {
@@ -938,9 +985,20 @@ public class TableInputFormat extends In
     			return new ArrayList<InputSplit>();
     		}
 
-    		return sorted ? 
-    				singleSplit ? getSortedSplits( conf, 1, expr, readers, status) : getSortedSplits(conf, -1, expr, readers, status) : 
-    					getRowSplits( conf, expr, readers, status);
+        List<InputSplit> result;
+        if (sorted)
+        {
+          if (singleSplit)
+            result = getSortedSplits( conf, 1, expr, readers, status);
+          else if (globalOrderingRequired(jobContext))
+            result = getSortedSplits(conf, -1, expr, readers, status);
+          else
+            result = getRowSplits( conf, expr, readers, status);
+        } else
+          result = getRowSplits( conf, expr, readers, status);
+
+        return result;
+
     	} catch (ParseException e) {
     		throw new IOException("Projection parsing failed : "+e.getMessage());
     	} finally {
@@ -1166,15 +1224,17 @@ class RowTableSplit extends InputSplit i
   /**
      * 
      */
-String path = null;
+  String path = null;
+  int tableIndex = 0;
   RowSplit split = null;
   String[] hosts = null;
   long length = 1;
 
-  public RowTableSplit(Reader reader, RowSplit split, Configuration conf)
+  public RowTableSplit(Reader reader, RowSplit split, int tableIndex, Configuration conf)
       throws IOException {
     this.path = reader.getPath();
     this.split = split;
+    this.tableIndex = tableIndex;
     BlockDistribution dataDist = reader.getBlockDistribution(split);
     if (dataDist != null) {
       length = dataDist.getLength();
@@ -1199,6 +1259,7 @@ String path = null;
 
   @Override
   public void readFields(DataInput in) throws IOException {
+    tableIndex = WritableUtils.readVInt(in); 
     path = WritableUtils.readString(in);
     int bool = WritableUtils.readVInt(in);
     if (bool == 1) {
@@ -1214,6 +1275,7 @@ String path = null;
 
   @Override
   public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, tableIndex);
     WritableUtils.writeString(out, path);
     if (split == null) {
       WritableUtils.writeVInt(out, 0);
@@ -1233,4 +1295,8 @@ String path = null;
   public RowSplit getSplit() {
     return split;
   }
+  
+  public int getTableIndex() {
+    return tableIndex;
+  }
 }

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java Sun Mar 28 12:16:54 2010
@@ -53,19 +53,18 @@ public class TableRecordReader extends R
    */
   public TableRecordReader(TableExpr expr, String projection,
       InputSplit split, JobContext jobContext) throws IOException, ParseException {
-	Configuration conf = jobContext.getConfiguration();
-    if (expr.sortedSplitRequired()) {
+	  Configuration conf = jobContext.getConfiguration();
+	  if (split instanceof RowTableSplit) {
+      RowTableSplit rowSplit = (RowTableSplit) split;
+      if ((!expr.sortedSplitRequired() || rowSplit.getTableIndex() == -1) &&
+          Projection.getVirtualColumnIndices(projection) != null)
+        throw new IllegalArgumentException("virtual column requires union of multiple sorted tables");
+      scanner = expr.getScanner(rowSplit, projection, conf);
+	  } else {
       SortedTableSplit tblSplit = (SortedTableSplit) split;
       scanner =
           expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection,
               conf);
-    } else if (split != null && split instanceof RowTableSplit) {
-      RowTableSplit rowSplit = (RowTableSplit) split;
-      scanner = expr.getScanner(rowSplit, projection, conf);
-    }
-    else {
-      UnsortedTableSplit tblSplit = (UnsortedTableSplit) split;
-      scanner = expr.getScanner(tblSplit, projection, conf);
     }
   }
   

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java Sun Mar 28 12:16:54 2010
@@ -29,7 +29,6 @@ import java.util.concurrent.PriorityBloc
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.zebra.io.BasicTable;
-import org.apache.hadoop.zebra.io.BasicTableStatus;
 import org.apache.hadoop.zebra.io.TableScanner;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.types.Projection;
@@ -157,6 +156,13 @@ class TableUnionExpr extends CompositeTa
       throw new IllegalArgumentException("virtual column requires union of multiple tables");
     return new SortedTableUnionScanner(scanners, Projection.getVirtualColumnIndices(projection));
   }
+  
+  @Override
+  public TableScanner getScanner(RowTableSplit split, String projection,
+      Configuration conf) throws IOException, ParseException {
+    BasicTableExpr expr = (BasicTableExpr) composite.get(split.getTableIndex());
+    return expr.getScanner(split, projection, conf);
+  }
 }
 
 /**

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Sun Mar 28 12:16:54 2010
@@ -38,6 +38,8 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.zebra.io.BasicTable;
 import org.apache.hadoop.zebra.mapreduce.TableInputFormat;
 import org.apache.hadoop.zebra.mapreduce.TableRecordReader;
+import org.apache.hadoop.zebra.mapreduce.TableInputFormat.SplitMode;
+import org.apache.hadoop.zebra.mapreduce.ZebraSortInfo;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.schema.ColumnType;
 import org.apache.hadoop.zebra.schema.Schema;
@@ -57,15 +59,17 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.hadoop.zebra.pig.comparator.*;
 import org.apache.pig.IndexableLoadFunc;
+import org.apache.pig.CollectableLoadFunc;
 
 /**
  * Pig IndexableLoadFunc and Slicer for Zebra Table
  */
 public class TableLoader extends LoadFunc implements LoadMetadata, LoadPushDown,
-        IndexableLoadFunc{
+        IndexableLoadFunc, CollectableLoadFunc {
     static final Log LOG = LogFactory.getLog(TableLoader.class);
 
     private static final String UDFCONTEXT_PROJ_STRING = "zebra.UDFContext.projectionString";
+    private static final String UDFCONTEXT_GLOBAL_SORTING = "zebra.UDFContext.globalSorting";
 
     private String projectionString;
 
@@ -170,14 +174,19 @@ public class TableLoader extends LoadFun
      * @throws IOException
      */
     private void setProjection(Job job) throws IOException {
+      Properties properties = UDFContext.getUDFContext().getUDFProperties( 
+          this.getClass(), new String[]{ udfContextSignature } );
+      boolean requireGlobalOrder = "true".equals(properties.getProperty( UDFCONTEXT_GLOBAL_SORTING));
+      if (requireGlobalOrder && !sorted)
+        throw new IOException("Global sorting can be only asked on table loaded as sorted");
         if( sorted ) {
-            TableInputFormat.requireSortedTable( job, null );
+            SplitMode splitMode = 
+              requireGlobalOrder ? SplitMode.GLOBALLY_SORTED : SplitMode.LOCALLY_SORTED;
+            TableInputFormat.setSplitMode(job, splitMode, null);
             sortInfo = TableInputFormat.getSortInfo( job );
         }
         
         try {
-            Properties properties = UDFContext.getUDFContext().getUDFProperties( 
-                    this.getClass(), new String[]{ udfContextSignature } );
             String prunedProjStr = properties.getProperty( UDFCONTEXT_PROJ_STRING );
             
             if( prunedProjStr != null ) {
@@ -414,4 +423,11 @@ public class TableLoader extends LoadFun
         public void setUDFContextSignature(String signature) {
             udfContextSignature = signature;
         }
+        
+    @Override
+    public void ensureAllKeyInstancesInSameSplit() throws IOException {
+      Properties properties = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+          new String[] { udfContextSignature } );
+      properties.setProperty(UDFCONTEXT_GLOBAL_SORTING, "true");
+    }
 }

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java Sun Mar 28 12:16:54 2010
@@ -20,6 +20,9 @@ package org.apache.hadoop.zebra;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 
 import junit.framework.Assert;
 
@@ -31,6 +34,7 @@ import org.apache.hadoop.conf.Configured
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.Tuple;
 
 public class BaseTestCase extends Configured {
   protected static PigServer pigServer = null;
@@ -126,6 +130,45 @@ public class BaseTestCase extends Config
     }
   }
   
+  /**
+   * Verify union output table with expected results
+   * 
+   */
+  protected int verifyTable(HashMap<Integer, ArrayList<ArrayList<Object>>> resultTable,
+      int keyColumn, int tblIdxCol, Iterator<Tuple> it) throws IOException {
+    int numbRows = 0;
+    int index = 0, rowIndex = -1, rowCount = -1, prevIndex = -1;
+    Object value;
+    boolean first = true;
+    ArrayList<ArrayList<Object>> rows = null;
+    
+    while (it.hasNext()) {
+      Tuple rowValues = it.next();
+      
+      if (first) {
+        index = (Integer) rowValues.get(tblIdxCol);
+        Assert.assertNotSame(prevIndex, index);
+        rows = resultTable.get(index);
+        rowIndex = 0;
+        rowCount = rows.size();
+        first = false;
+      }
+      value = rows.get(rowIndex++).get(keyColumn);
+      Assert.assertEquals("Table comparison error for row : " + numbRows + " - no key found for : "
+          + rowValues.get(keyColumn), value, rowValues.get(keyColumn));
+      
+      if (rowIndex == rowCount)
+      {
+        // current table is run out; start on a new table for next iteration
+        first = true;
+        prevIndex = index;
+      }
+      
+      ++numbRows;
+    }
+    return numbRows;
+  }
+  
   public static void checkTableExists(boolean expected, String strDir) throws IOException {  
     File theDir = null; 
     boolean actual = false;

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java Sun Mar 28 12:16:54 2010
@@ -37,7 +37,7 @@ public class ToolTestComparator extends 
 
   final static String TABLE_SCHEMA = "count:int,seed:int,int1:int,int2:int,str1:string,str2:string,byte1:bytes,"
       + "byte2:bytes,float1:float,long1:long,double1:double,m1:map(string),r1:record(f1:string, f2:string),"
-      + "c1:collection(a:string, b:string)";
+      + "c1:collection(record(a:string, b:string))";
   final static String TABLE_STORAGE = "[count,seed,int1,int2,str1,str2,byte1,byte2,float1,long1,double1];[m1#{a}];[r1,c1]";
 
   private static Random generator = new Random();
@@ -399,13 +399,15 @@ public class ToolTestComparator extends 
       tuple.set(12, tupRecord1);
 
       // insert collection1
-      tupColl1.set(0, "c1 a " + seed);
-      tupColl1.set(1, "c1 a " + i);
-      bag1.add(tupColl1); // first collection item
-
-      tupColl2.set(0, "c1 b " + seed);
-      tupColl2.set(1, "c1 b " + i);
-      bag1.add(tupColl2); // second collection item
+      // tupColl1.set(0, "c1 a " + seed);
+      // tupColl1.set(1, "c1 a " + i);
+      // bag1.add(tupColl1); // first collection item
+      bag1.add(tupRecord1); // first collection item
+      bag1.add(tupRecord1); // second collection item
+
+      // tupColl2.set(0, "c1 b " + seed);
+      // tupColl2.set(1, "c1 b " + i);
+      // bag1.add(tupColl2); // second collection item
 
       tuple.set(13, bag1);
 

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java Sun Mar 28 12:16:54 2010
@@ -19,45 +19,32 @@
 package org.apache.hadoop.zebra.mapreduce;
 
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
-import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.zebra.io.BasicTable;
 import org.apache.hadoop.zebra.io.TableInserter;
 import org.apache.hadoop.zebra.io.TableScanner;
-import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
 import org.apache.hadoop.zebra.mapreduce.SortedTableSplit;
 import org.apache.hadoop.zebra.mapreduce.TableInputFormat;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.pig.TableStorer;
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.types.TypesUtils;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
+import org.apache.hadoop.zebra.BaseTestCase;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.test.MiniCluster;
 import org.junit.Assert;
 
 /**
@@ -66,25 +53,15 @@ import org.junit.Assert;
  * Utility for verifying tables created during Zebra Stress Testing
  * 
  */
-public class ToolTestComparator {
+public class ToolTestComparator extends BaseTestCase {
 	final static String TABLE_SCHEMA = "count:int,seed:int,int1:int,int2:int,str1:string,str2:string,byte1:bytes,"
 		+ "byte2:bytes,float1:float,long1:long,double1:double,m1:map(string),r1:record(f1:string, f2:string),"
-		+ "c1:collection(a:string, b:string)";
+		+ "c1:collection(record(a:string, b:string))";
 	final static String TABLE_STORAGE = "[count,seed,int1,int2,str1,str2,byte1,byte2,float1,long1,double1];[m1#{a}];[r1,c1]";
 
 	private static Random generator = new Random();
 
-	private static Configuration conf;
-	private static FileSystem fs;
-
-	protected static ExecType execType = ExecType.MAPREDUCE;
-	private static MiniCluster cluster;
-	protected static PigServer pigServer;
 	protected static ExecJob pigJob;
-	private static Path path;
-
-	private static String zebraJar;
-	private static String whichCluster;
 
 	private static int totalNumbCols;
 	private static long totalNumbVerifiedRows;
@@ -93,65 +70,7 @@ public class ToolTestComparator {
 	 * Setup and initialize environment
 	 */
 	public static void setUp() throws Exception {
-		System.out.println("setUp()");
-		if (System.getProperty("hadoop.log.dir") == null) {
-			String base = new File(".").getPath(); // getAbsolutePath();
-			System
-			.setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
-		}
-
-
-		if (System.getProperty("whichCluster") == null) {
-			System.setProperty("whichCluster", "miniCluster");
-			whichCluster = System.getProperty("whichCluster");
-		} else {
-			whichCluster = System.getProperty("whichCluster");
-		}
-
-		System.out.println("cluster: " + whichCluster);
-		if (whichCluster.equalsIgnoreCase("realCluster")
-				&& System.getenv("HADOOP_HOME") == null) {
-			System.out.println("Please set HADOOP_HOME");
-			System.exit(0);
-		}
-
-		conf = new Configuration();
-
-		if (whichCluster.equalsIgnoreCase("realCluster")
-				&& System.getenv("USER") == null) {
-			System.out.println("Please set USER");
-			System.exit(0);
-		}
-		zebraJar = System.getenv("HADOOP_HOME") + "/../jars/zebra.jar";
-		File file = new File(zebraJar);
-		if (!file.exists() && whichCluster.equalsIgnoreCase("realCulster")) {
-			System.out.println("Please put zebra.jar at hadoop_home/../jars");
-			System.exit(0);
-		}
-
-		if (whichCluster.equalsIgnoreCase("realCluster")) {
-			System.out.println("Running realCluster");
-			pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
-					.toProperties(conf));
-			pigServer.registerJar(zebraJar);
-			path = new Path("/user/" + System.getenv("USER") + "/TestComparator");
-			// removeDir(path);
-			fs = path.getFileSystem(conf);
-		}
-
-		if (whichCluster.equalsIgnoreCase("miniCluster")) {
-			System.out.println("Running miniCluster");
-			if (execType == ExecType.MAPREDUCE) {
-				cluster = MiniCluster.buildCluster();
-				pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-				fs = cluster.getFileSystem();
-				path = new Path(fs.getWorkingDirectory() + "/TestComparator");
-				// removeDir(path);
-				System.out.println("path1 =" + path);
-			} else {
-				pigServer = new PigServer(ExecType.LOCAL);
-			}
-		}
+		init();
 	}
 
 	/**
@@ -389,7 +308,7 @@ public class ToolTestComparator {
 
 			while (it1.hasNext()) {
 				++numbRows1; // increment row count
-				Tuple rowValue = it1.next();
+				it1.next();
 			}
 			numbRows.add(numbRows1);
 			numbUnionRows += numbRows1;
@@ -499,13 +418,15 @@ public class ToolTestComparator {
 			tuple.set(12, tupRecord1);
 
 			// insert collection1
-			tupColl1.set(0, "c1 a " + seed);
-			tupColl1.set(1, "c1 a " + i);
-			bag1.add(tupColl1); // first collection item
-
-			tupColl2.set(0, "c1 b " + seed);
-			tupColl2.set(1, "c1 b " + i);
-			bag1.add(tupColl2); // second collection item
+			// tupColl1.set(0, "c1 a " + seed);
+			// tupColl1.set(1, "c1 a " + i);
+			// bag1.add(tupColl1); // first collection item
+      bag1.add(tupRecord1); // first collection item
+      bag1.add(tupRecord1); // second collection item
+
+			// tupColl2.set(0, "c1 b " + seed);
+			// tupColl2.set(1, "c1 b " + i);
+			// bag1.add(tupColl2); // second collection item
 
 			tuple.set(13, bag1);
 
@@ -604,7 +525,7 @@ public class ToolTestComparator {
 		TableInputFormat.requireSortedTable(job, null);
 		TableInputFormat tif = new TableInputFormat();
 
-		SortedTableSplit split = (SortedTableSplit) tif.getSplits(job).get(0);
+		SortedTableSplit split = (SortedTableSplit) tif.getSplits(job, true).get(0);
 
 		TableScanner scanner = reader.getScanner(split.getBegin(), split.getEnd(), true);
 		BytesWritable key = new BytesWritable();
@@ -731,12 +652,9 @@ public class ToolTestComparator {
 		TableInputFormat.setInputPaths(job, new Path(pathTable1));
 
 		TableInputFormat.requireSortedTable(job, null);
-		TableInputFormat tif = new TableInputFormat();
-
 
 		TableScanner scanner = reader.getScanner(null, null, true);
 		BytesWritable key = new BytesWritable();
-		Tuple rowValue = TypesUtils.createTuple(scanner.getSchema());
 
 		while (!scanner.atEnd()) {
 			++numbRows;
@@ -746,41 +664,6 @@ public class ToolTestComparator {
 		System.out.println("\nTable Path : " + pathTable1);
 		System.out.println("Table Row number : " + numbRows);
 	}
-	/**
-	 * Compare table rows
-	 * 
-	 */
-	private static boolean compareRow(Tuple rowValues1, Tuple rowValues2)
-	throws IOException {
-		boolean result = true;
-		Assert.assertEquals(rowValues1.size(), rowValues2.size());
-		for (int i = 0; i < rowValues1.size(); ++i) {
-			if (!compareObj(rowValues1.get(i), rowValues2.get(i))) {
-				System.out.println("DEBUG: " + " RowValue.get(" + i
-						+ ") value compare error : " + rowValues1.get(i) + " : "
-						+ rowValues2.get(i));
-				result = false;
-				break;
-			}
-		}
-		return result;
-	}
-
-	/**
-	 * Compare table values
-	 * 
-	 */
-	private static boolean compareObj(Object object1, Object object2) {
-		if (object1 == null) {
-			if (object2 == null)
-				return true;
-			else
-				return false;
-		} else if (object1.equals(object2))
-			return true;
-		else
-			return false;
-	}
 
 	/**
 	 * Compares two objects that implement the Comparable interface
@@ -837,28 +720,6 @@ public class ToolTestComparator {
 	}
 
 	/**
-	 * Remove directory
-	 * 
-	 */
-	public static void removeDir(Path outPath) throws IOException {
-		String command = null;
-		if (whichCluster.equalsIgnoreCase("realCluster")) {
-			command = System.getenv("HADOOP_HOME") + "/bin/hadoop fs -rmr "
-			+ outPath.toString();
-		} else {
-			command = "rm -rf " + outPath.toString();
-		}
-		Runtime runtime = Runtime.getRuntime();
-		Process proc = runtime.exec(command);
-		int exitVal = -1;
-		try {
-			exitVal = proc.waitFor();
-		} catch (InterruptedException e) {
-			System.err.println(e);
-		}
-	}
-
-	/**
 	 * Calculate elapsed time
 	 * 
 	 */
@@ -1021,7 +882,6 @@ public class ToolTestComparator {
 				// Verify merge-join table is in sorted order
 				verifyMergeJoin(pathTable1, sortCol, sortString, numbCols, rowMod,verifyDataColName);
 			} else if (verifyOption.equals("sorted-union")) {
-				Object lastVal = null;
 
 				// Verify sorted-union table is in sorted order
 				verifySortedUnion(unionPaths, pathTable1, sortCol, sortString,
@@ -1045,7 +905,6 @@ public class ToolTestComparator {
 				// Create sorted table
 				createsortedtable(pathTable1, pathTable2, sortString, debug);
 			}else if (verifyOption.equals("printrownumber")) {
-				Object lastVal = null;
 				//print total number of rows of the table
 				printRowNumber(pathTable1,sortString);
 			}

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java Sun Mar 28 12:16:54 2010
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Iterator;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.StringTokenizer;
 
 import org.apache.hadoop.fs.Path;
@@ -159,7 +160,7 @@ public class TestOrderPreserveMultiTable
 			paths += ++fileId + ",";
 		paths = paths.substring(0, paths.lastIndexOf(","));  // remove trailing comma
     paths += "}";
-		
+    
 		String queryLoad = "records1 = LOAD '"
 	        + paths
 	        +	"' USING org.apache.hadoop.zebra.pig.TableLoader('" + columns + "', 'sorted');";
@@ -185,25 +186,30 @@ public class TestOrderPreserveMultiTable
 		}
 		
 		// Test with input tables and provided output columns
-		testOrderPreserveUnion(inputTables, "int1", "int1, str1, byte1");
+		testOrderPreserveUnion(inputTables, "int1", "int1, str1, byte1, source_table");
 		
 		// Create results table for verification
-		ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+		HashMap<Integer, ArrayList<ArrayList<Object>>> resultTable = 
+		  new HashMap<Integer, ArrayList<ArrayList<Object>>>();
+		
+		// The ordering from FileInputFormat glob expansion.
+		int[] tblIndexList = {0, 9, 1, 2, 3, 4, 5, 6, 7, 8};
+		
 		for (int i=0; i<NUMB_TABLE; ++i) {
+		  ArrayList<ArrayList<Object>> rows = new ArrayList<ArrayList<Object>>();
 			for (int j=0; j<NUMB_TABLE_ROWS; ++j) {
 				ArrayList<Object> resultRow = new ArrayList<Object>();
-				
-				resultRow.add(i);	// int1
+				resultRow.add(tblIndexList[i]);	// int1
 				resultRow.add(new String("string" + j));	// str1
 				resultRow.add(new DataByteArray("byte" + (NUMB_TABLE_ROWS - j)));	// byte1
-				
-				resultTable.add(resultRow);
+				rows.add(resultRow);
 			}
+			resultTable.put(i, rows);
 		}
 		
 		// Verify union table
 		Iterator<Tuple> it = pigServer.openIterator("records1");
-		int numbRows = verifyTable(resultTable, 0, it);
+		int numbRows = verifyTable(resultTable, 0, 3, it);
 		
 		Assert.assertEquals(totalTableRows, numbRows);
 		
@@ -212,100 +218,6 @@ public class TestOrderPreserveMultiTable
 	}
 	
 	/**
-	 * Verify union output table with expected results
-	 * 
-	 */
-	private int verifyTable(ArrayList<ArrayList<Object>> resultTable, int keyColumn, Iterator<Tuple> it) throws IOException {
-		int numbRows = 0;
-		int index = 0;
-		Object value = resultTable.get(index).get(keyColumn);  // get value of primary key
-		
-		while (it.hasNext()) {
-			Tuple rowValues = it.next();
-			
-			// If last primary sort key does match then search for next matching key
-			if (! compareObj(value, rowValues.get(keyColumn))) {
-				int subIndex = index + 1;
-				while (subIndex < resultTable.size()) {
-					if ( ! compareObj(value, resultTable.get(subIndex).get(keyColumn)) ) {  // found new key
-						index = subIndex;
-						value = resultTable.get(index).get(keyColumn);
-						break;
-					}
-					++subIndex;
-				}
-				Assert.assertEquals("Table comparison error for row : " + numbRows + " - no key found for : "
-					+ rowValues.get(keyColumn), value, rowValues.get(keyColumn));
-			}
-			// Search for matching row with this primary key
-			int subIndex = index;
-			
-			while (subIndex < resultTable.size()) {
-				// Compare row
-				ArrayList<Object> resultRow = resultTable.get(subIndex);
-				if ( compareRow(rowValues, resultRow) )
-					break; // found matching row
-				++subIndex;
-				Assert.assertEquals("Table comparison error for row : " + numbRows + " - no matching row found for : "
-					+ rowValues.get(keyColumn), value, resultTable.get(subIndex).get(keyColumn));
-			}
-			++numbRows;
-		}
-		Assert.assertEquals(resultTable.size(), numbRows);  // verify expected row count
-		return numbRows;
-	}
-	
-	/**
-	 * Compare table rows
-	 * 
-	 */
-	private boolean compareRow(Tuple rowValues, ArrayList<Object> resultRow) throws IOException {
-		boolean result = true;
-		Assert.assertEquals(resultRow.size(), rowValues.size());
-		for (int i = 0; i < rowValues.size(); ++i) {
-			if (! compareObj(rowValues.get(i), resultRow.get(i)) ) {
-				result = false;
-				break;
-			}
-		}
-		return result;
-	}
-	
-	/**
-	 * Compare table values
-	 * 
-	 */
-	private boolean compareObj(Object object1, Object object2) {
-		if (object1 == null) {
-			if (object2 == null)
-				return true;
-			else
-				return false;
-		} else if (object1.equals(object2))
-			return true;
-		else
-			return false;
-	}
-	
-	/**
-	 * Print Pig Table (for debugging)
-	 * 
-	 */
-	private int printTable(String tablename) throws IOException {
-		Iterator<Tuple> it1 = pigServer.openIterator(tablename);
-		int numbRows = 0;
-		while (it1.hasNext()) {
-			Tuple RowValue1 = it1.next();
-			++numbRows;
-			System.out.println();
-			for (int i = 0; i < RowValue1.size(); ++i)
-				System.out.println("DEBUG: " + tablename + " RowValue.get(" + i + ") = " + RowValue1.get(i));
-		}
-		System.out.println("\nRow count : " + numbRows);
-		return numbRows;
-	}
-	
-	/**
 	 * Return the name of the routine that called getCurrentMethodName
 	 * 
 	 */