You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/02/13 01:06:17 UTC

svn commit: r909667 [1/9] - in /hadoop/pig/branches/load-store-redesign/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/ src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/ src/java/org/apache/hadoop/zebra/mapreduce/ src/ja...

Author: yanz
Date: Sat Feb 13 00:06:15 2010
New Revision: 909667

URL: http://svn.apache.org/viewvc?rev=909667&view=rev
Log:
PIG-1140 Support of Hadoop 2.0 API (xuefuz via yanz)

Added:
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CachedTableScanner.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CompositeTableExpr.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/NullScanner.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExprUtils.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraOutputPartition.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraProjection.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSchema.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSortInfo.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraStorageHint.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/package-info.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ArticleGenerator.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/Dictionary.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSample.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSample1.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSample2.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSampleSortedTable.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMRSortedTableZebraKeyGenerator.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TableMapReduceExample.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestBasicTableIOFMoreLocal.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestBasicTableIOFormatDFS.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestBasicTableIOFormatLocalFS.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestCheckin.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputs.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputs2.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputs2TypedApi.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputs3.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputs3TypedApi.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputs4.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputs4TypedApi.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputsTypeApi.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestMultipleOutputsTypedApiNeg.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTypedApi.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTypedApi2.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java
Modified:
    hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt
    hadoop/pig/branches/load-store-redesign/contrib/zebra/build.xml
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs2TypedApi.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs3TypedApi.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputs4TypedApi.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypeApi.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestMultipleOutputsTypedApiNeg.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestTypedApi2.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoin.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinPartial.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjection.java
    hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt Sat Feb 13 00:06:15 2010
@@ -12,6 +12,8 @@
 
   IMPROVEMENTS
 
+    PIG-1140 Support of Hadoop 2.0 API (xuefuz via yanz)
+
     PIG-1170 new end-to-end and stress test cases (jing1234 via yanz)
 
     PIG-1136 Support of map split on hash keys with leading underscore (xuefuz via yanz)

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/build.xml?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/build.xml (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/build.xml Sat Feb 13 00:06:15 2010
@@ -106,6 +106,8 @@
     <echo message="contrib: ${name}"/>
     <delete dir="${pig.log.dir}"/>
     <mkdir dir="${pig.log.dir}"/>
+    <delete dir="${build.test}/data"/>
+    <mkdir dir="${build.test}/data"/>
     <junit
       printsummary="yes" showoutput="${test.output}" 
       haltonfailure="no" fork="yes" maxmemory="1024m"

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Sat Feb 13 00:06:15 2010
@@ -49,7 +49,6 @@
 import org.apache.hadoop.zebra.tfile.MetaBlockAlreadyExists;
 import org.apache.hadoop.zebra.tfile.MetaBlockDoesNotExist;
 import org.apache.hadoop.zebra.tfile.Utils.Version;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRangeSplit;
 import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRowSplit;
 import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGScanner;
@@ -90,8 +89,6 @@
       new Version((short) 1, (short) 1);
   // name of the BasicTable meta-data file
   private final static String BT_META_FILE = ".btmeta";
-  // column group prefix
-  private final static String CGPathPrefix = "CG";
 
   private final static String DELETED_CG_PREFIX = ".deleted-";
   
@@ -815,7 +812,8 @@
      * A row-based split on the zebra table;
      */
     public static class RowSplit implements Writable {
-      int cgIndex;  // column group index where split lies on;
+		
+	int cgIndex;  // column group index where split lies on;
       CGRowSplit slice; 
 
       RowSplit(int cgidx, CGRowSplit split) {
@@ -1451,7 +1449,7 @@
         }
       }
     }
-
+    
     /**
      * Get the schema of the table.
      * 

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Sat Feb 13 00:06:15 2010
@@ -392,10 +392,10 @@
             "Cannot get key-bounded scanner for unsorted table");
       }
       RawComparable begin =
-          (beginKey != null) ? new ByteArray(beginKey.get(), 0, beginKey
-              .getSize()) : null;
+          (beginKey != null) ? new ByteArray(beginKey.getBytes(), 0, beginKey
+              .getLength()) : null;
       RawComparable end =
-          (endKey != null) ? new ByteArray(endKey.get(), 0, endKey.getSize())
+          (endKey != null) ? new ByteArray(endKey.getBytes(), 0, endKey.getLength())
               : null;
       if (begin != null && end != null) {
         if (comparator.compare(begin, end) >= 0) {
@@ -800,7 +800,8 @@
             scanner = reader.createScannerByRecordNum(rowRange.startRow, 
                                          rowRange.startRow + rowRange.numRows);
           } else {
-            /* using deprecated API just so that zebra can work with 
+            /* TODO: more investigation is needed for the following.
+             *  using deprecated API just so that zebra can work with 
              * hadoop jar that does not contain HADOOP-6218 (Record ids for
              * TFile). This is expected to be temporary. Later we should 
              * use the undeprecated API.
@@ -865,7 +866,7 @@
       }
 
       boolean seekTo(BytesWritable key) throws IOException {
-        return scanner.seekTo(key.get(), 0, key.getSize());
+        return scanner.seekTo(key.getBytes(), 0, key.getLength());
       }
 
       boolean advance() throws IOException {
@@ -1093,7 +1094,7 @@
           return false;
         }
         int index =
-            cgindex.lowerBound(new ByteArray(key.get(), 0, key.getSize()),
+            cgindex.lowerBound(new ByteArray(key.getBytes(), 0, key.getLength()),
                 comparator);
         if (index >= endIndex) {
           seekToEnd();
@@ -1596,9 +1597,9 @@
       @Override
       public void insert(BytesWritable key, Tuple row) throws IOException {
         TypesUtils.checkCompatible(row, getSchema());
-        DataOutputStream outKey = tfileWriter.prepareAppendKey(key.getSize());
+        DataOutputStream outKey = tfileWriter.prepareAppendKey(key.getLength());
         try {
-          outKey.write(key.get(), 0, key.getSize());
+          outKey.write(key.getBytes(), 0, key.getLength());
         }
         finally {
           outKey.close();

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java Sat Feb 13 00:06:15 2010
@@ -17,8 +17,6 @@
 
 package org.apache.hadoop.zebra.io;
 
-import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.PrintStream;
 
 /**

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java Sat Feb 13 00:06:15 2010
@@ -150,7 +150,7 @@
    * Get the block distribution of all data that maps to the key bucket.
    */
   public BlockDistribution getBlockDistribution(BytesWritable key) {
-    ByteArray key0 = new ByteArray(key.get(), 0, key.getSize());
+    ByteArray key0 = new ByteArray(key.getBytes(), 0, key.getLength());
     BlockDistribution bInfo = data.get(key0);
     if (bInfo == null) {
       throw new IllegalArgumentException("Invalid key");

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java Sat Feb 13 00:06:15 2010
@@ -22,7 +22,6 @@
 import java.io.IOException;
 
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.zebra.types.Projection;
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.pig.data.Tuple;
 

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java Sat Feb 13 00:06:15 2010
@@ -140,7 +140,10 @@
  * 
  * }
  * </pre>
+ * 
+ * @Deprecated Use (@link org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat) instead
  */
+@Deprecated
 public class BasicTableOutputFormat implements
     OutputFormat<BytesWritable, Tuple> {
   private static final String OUTPUT_PATH = "mapred.lib.table.output.dir";

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Sat Feb 13 00:06:15 2010
@@ -136,7 +136,10 @@
  * <LI>{@link DataBag} : A DataBag is a collection of Tuples.
  * <LI>{@link Tuple} : Yes, Tuple itself can be a datum in another Tuple.
  * </UL>
+ * 
+ * @Deprecated Use (@link org.apache.hadoop.zebra.mapreduce.TableInputFormat) instead
  */
+@Deprecated
 public class TableInputFormat implements InputFormat<BytesWritable, Tuple> {
   static Log LOG = LogFactory.getLog(TableInputFormat.class);
   

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java Sat Feb 13 00:06:15 2010
@@ -30,7 +30,10 @@
 
 /**
  * Adaptor class to implement RecordReader on top of Scanner.
+ * 
+ * @Deprecated Use (@link org.apache.hadoop.zebra.mapreduce.TableRecordReader) instead
  */
+@Deprecated
 public class TableRecordReader implements RecordReader<BytesWritable, Tuple> {
   private final TableScanner scanner;
   private long count = 0;

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/package-info.java Sat Feb 13 00:06:15 2010
@@ -19,6 +19,8 @@
  * Providing {@link org.apache.hadoop.mapred.InputFormat} and
  * {@link org.apache.hadoop.mapred.OutputFormat} adaptor classes for Hadoop
  * Zebra Table.
+ * 
+ * This package is deprecated. Please use org.apache.hadoop.zebra.mapreduce instead.
  */
 package org.apache.hadoop.zebra.mapred;
 

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+
+/**
+ * Table expression for reading a BasicTable.
+ * 
+ * @see <a href="doc-files/examples/ReadABasicTable.java">Usage example for
+ *      BasicTableExpr</a>
+ */
+class BasicTableExpr extends TableExpr {
+  private Path path;
+
+  /**
+   * default constructor.
+   */
+  public BasicTableExpr() {
+    // no-op
+  }
+
+  /**
+   * Constructor.
+   * 
+   * @param path
+   *          Path of the BasicTable.
+   */
+  public BasicTableExpr(Path path) {
+    this.path = path;
+  }
+
+  /**
+   * Set the path.
+   * 
+   * @param path
+   *          path to the BasicTable.
+   * @return self.
+   */
+  public BasicTableExpr setPath(Path path) {
+    this.path = path;
+    return this;
+  }
+
+  /**
+   * Get the path.
+   * 
+   * @return the path to the BasicTable.
+   */
+  public Path getPath() {
+    return path;
+  }
+
+  @Override
+  protected BasicTableExpr decodeParam(StringReader in) throws IOException {
+    String strPath = TableExprUtils.decodeString(in);
+    if (strPath == null) {
+      throw new RuntimeException("Incomplete expression");
+    }
+    path = new Path(strPath);
+    return this;
+  }
+
+  @Override
+  protected BasicTableExpr encodeParam(StringBuilder out) {
+    if (path == null) {
+      throw new RuntimeException("Incomplete expression");
+    }
+    TableExprUtils.encodeString(out, path.toString());
+    return this;
+  }
+
+  @Override
+  public List<LeafTableInfo> getLeafTables(String projection) {
+    ArrayList<LeafTableInfo> ret = new ArrayList<LeafTableInfo>(1);
+    ret.add(new LeafTableInfo(path, projection));
+    return ret;
+  }
+
+  @Override
+  public TableScanner getScanner(BytesWritable begin, BytesWritable end,
+      String projection, Configuration conf) throws IOException {
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    try {
+      reader.setProjection(projection);
+    } catch (ParseException e) {
+    	throw new IOException("Projection parsing failed : "+e.getMessage());
+    } 
+    return reader.getScanner(begin, end, true);
+  } 
+
+  @Override
+  public Schema getSchema(Configuration conf) throws IOException {
+    return BasicTable.Reader.getSchema(path, conf);
+  }
+
+  @Override
+  public boolean sortedSplitCapable() {
+    return true;
+  }
+  
+  @Override
+  protected void dumpInfo(PrintStream ps, Configuration conf, int indent) throws IOException
+  {
+    BasicTable.dumpInfo(path.toString(), ps, conf, indent);
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.Partition;
+import org.apache.hadoop.zebra.types.SortInfo;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.tfile.TFile;
+import org.apache.pig.data.Tuple;
+import org.apache.hadoop.zebra.pig.comparator.*;
+
+
+/**
+ * {@link org.apache.hadoop.mapreduce.OutputFormat} class for creating a
+ * BasicTable.
+ * 
+ * Usage Example:
+ * <p>
+ * In the main program, add the following code.
+ * 
+ * <pre>
+ * job.setOutputFormatClass(BasicTableOutputFormat.class);
+ * Path outPath = new Path(&quot;path/to/the/BasicTable&quot;);
+ * BasicTableOutputFormat.setOutputPath(job, outPath);
+ * BasicTableOutputFormat.setSchema(job, &quot;Name, Age, Salary, BonusPct&quot;);
+ * </pre>
+ * 
+ * The above code does the following things:
+ * <UL>
+ * <LI>Set the output format class to BasicTableOutputFormat.
+ * <LI>Set the single path to the BasicTable to be created.
+ * <LI>Set the schema of the BasicTable to be created. In this case, the
+ * to-be-created BasicTable contains three columns with names "Name", "Age",
+ * "Salary", "BonusPct".
+ * </UL>
+ * 
+ * To create multiple output paths. ZebraOutputPartitoner interface needs to be implemented
+ * <pre>
+ * String multiLocs = &quot;commaSeparatedPaths&quot;    
+ * job.setOutputFormatClass(BasicTableOutputFormat.class);
+ * BasicTableOutputFormat.setMultipleOutputPaths(job, multiLocs);
+ * job.setOutputFormat(BasicTableOutputFormat.class);
+ * BasicTableOutputFormat.setSchema(job, &quot;Name, Age, Salary, BonusPct&quot;);
+ * BasicTableOutputFormat.setZebraOutputPartitionClass(
+ * 		job, MultipleOutputsTest.OutputPartitionerClass.class);
+ * </pre>
+ * 
+ * 
+ * The user ZebraOutputPartitionClass should like this
+ * 
+ * <pre>
+ * 
+ *   static class OutputPartitionerClass implements ZebraOutputPartition {
+ *   &#064;Override
+ *	  public int getOutputPartition(BytesWritable key, Tuple value) {		 
+ *
+ *        return someIndexInOutputParitionlist0;
+ *	  }
+ * 
+ * </pre>
+ * 
+ * 
+ * The user Reducer code (or similarly Mapper code if it is a Map-only job)
+ * should look like the following:
+ * 
+ * <pre>
+ * static class MyReduceClass implements Reducer&lt;K, V, BytesWritable, Tuple&gt; {
+ *   // keep the tuple object for reuse.
+ *   Tuple outRow;
+ *   // indices of various fields in the output Tuple.
+ *   int idxName, idxAge, idxSalary, idxBonusPct;
+ * 
+ *   &#064;Override
+ *   public void configure(Job job) {
+ *     Schema outSchema = BasicTableOutputFormat.getSchema(job);
+ *     // create a tuple that conforms to the output schema.
+ *     outRow = TypesUtils.createTuple(outSchema);
+ *     // determine the field indices.
+ *     idxName = outSchema.getColumnIndex(&quot;Name&quot;);
+ *     idxAge = outSchema.getColumnIndex(&quot;Age&quot;);
+ *     idxSalary = outSchema.getColumnIndex(&quot;Salary&quot;);
+ *     idxBonusPct = outSchema.getColumnIndex(&quot;BonusPct&quot;);
+ *   }
+ * 
+ *   &#064;Override
+ *   public void reduce(K key, Iterator&lt;V&gt; values,
+ *       OutputCollector&lt;BytesWritable, Tuple&gt; output, Reporter reporter)
+ *       throws IOException {
+ *     String name;
+ *     int age;
+ *     int salary;
+ *     double bonusPct;
+ *     // ... Determine the value of the individual fields of the row to be inserted.
+ *     try {
+ *       outTuple.set(idxName, name);
+ *       outTuple.set(idxAge, new Integer(age));
+ *       outTuple.set(idxSalary, new Integer(salary));
+ *       outTuple.set(idxBonusPct, new Double(bonusPct));
+ *       output.collect(new BytesWritable(name.getBytes()), outTuple);
+ *     }
+ *     catch (ExecException e) {
+ *       // should never happen
+ *     }
+ *   }
+ * 
+ *   &#064;Override
+ *   public void close() throws IOException {
+ *     // no-op
+ *   }
+ * 
+ * }
+ * </pre>
+ */
+public class BasicTableOutputFormat extends OutputFormat<BytesWritable, Tuple> {
+	private static final String OUTPUT_PATH = "mapreduce.lib.table.output.dir";
+	public static final String MULTI_OUTPUT_PATH = "mapreduce.lib.table.multi.output.dirs";
+	private static final String OUTPUT_SCHEMA = "mapreduce.lib.table.output.schema";
+	private static final String OUTPUT_STORAGEHINT = "mapreduce.lib.table.output.storagehint";
+	private static final String OUTPUT_SORTCOLUMNS = "mapreduce.lib.table.output.sortcolumns";
+	private static final String OUTPUT_COMPARATOR =  "mapreduce.lib.table.output.comparator";
+	static final String IS_MULTI = "multi";
+	public static final String ZEBRA_OUTPUT_PARTITIONER_CLASS = "zebra.output.partitioner.class";
+
+
+	/**
+	 * Set the multiple output paths of the BasicTable in JobContext
+	 * 
+	 * @param jobContext
+	 *          The JobContext object.
+	 * @param commaSeparatedLocations
+	 *          The comma separated output paths to the tables. 
+	 *          The path must either not existent, or must be an empty directory.
+	 * @param theClass
+	 * 	      Zebra output partitoner class
+	 */
+
+	public static void setMultipleOutputs(JobContext jobContext, String commaSeparatedLocations, Class<? extends ZebraOutputPartition> theClass)
+	throws IOException {
+		Configuration conf = jobContext.getConfiguration();
+		conf.set(MULTI_OUTPUT_PATH, commaSeparatedLocations);
+
+		if(conf.getBoolean(IS_MULTI, true) == false) {
+			throw new IllegalArgumentException("Job has been setup as single output path");
+		}
+		conf.setBoolean(IS_MULTI, true);
+		setZebraOutputPartitionClass(jobContext, theClass);	  
+	}
+
+	/**
+	 * Set the multiple output paths of the BasicTable in JobContext
+	 * 
+	 * @param jobContext
+	 *          The JobContext object.
+	 * @param paths
+	 *          The list of paths 
+	 *          The path must either not existent, or must be an empty directory.
+	 * @param theClass
+	 * 	      Zebra output partitioner class
+	 */
+
+	public static void setMultipleOutputs(JobContext jobContext, Class<? extends ZebraOutputPartition> theClass, Path... paths)
+	throws IOException {
+		Configuration conf = jobContext.getConfiguration();
+		FileSystem fs = FileSystem.get( conf );
+		Path path = paths[0].makeQualified(fs);
+		StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
+		for(int i = 1; i < paths.length;i++) {
+			str.append(StringUtils.COMMA_STR);
+			path = paths[i].makeQualified(fs);
+			str.append(StringUtils.escapeString(path.toString()));
+		}	  
+		conf.set(MULTI_OUTPUT_PATH, str.toString());
+
+		if(conf.getBoolean(IS_MULTI, true) == false) {
+			throw new IllegalArgumentException("Job has been setup as single output path");
+		}
+		conf.setBoolean(IS_MULTI, true);
+		setZebraOutputPartitionClass(jobContext, theClass);
+
+	}
+
+
+	/**
+	 * Set the multiple output paths of the BasicTable in JobContext
+	 * 
+	 * @param jobContext
+	 *          The JobContext object.
+	 * @return path
+	 *          The comma separated output paths to the tables. 
+	 *          The path must either not existent, or must be an empty directory.
+	 */
+
+	public static Path[] getOutputPaths(JobContext jobContext)
+	throws IOException {
+		Configuration conf = jobContext.getConfiguration();
+
+		Path[] result;
+		String paths = conf.get(MULTI_OUTPUT_PATH);
+		String path = conf.get(OUTPUT_PATH);
+
+		if(paths != null && path != null) {
+			throw new IllegalArgumentException("Illegal output paths specs. Both multi and single output locs are set");
+		}	
+
+		if(conf.getBoolean(IS_MULTI, false) == true) {	  
+			if (paths == null || paths.equals("")) {
+				throw new IllegalArgumentException("Illegal multi output paths");
+			}	    
+			String [] list = StringUtils.split(paths);
+			result = new Path[list.length];
+			for (int i = 0; i < list.length; i++) {
+				result[i] = new Path(StringUtils.unEscapeString(list[i]));
+			}
+		} else {
+			if (path == null || path.equals("")) {
+				throw new IllegalArgumentException("Cannot find output path");
+			}	    
+			result = new Path[1];
+			result[0] = new Path(path);
+		}
+
+		return result;
+
+	}
+
+
+	private static void setZebraOutputPartitionClass(
+			JobContext jobContext, Class<? extends ZebraOutputPartition> theClass) throws IOException {
+		if (!ZebraOutputPartition.class.isAssignableFrom(theClass))
+			throw new IOException(theClass+" not "+ZebraOutputPartition.class.getName());
+		jobContext.getConfiguration().set(ZEBRA_OUTPUT_PARTITIONER_CLASS, theClass.getName());
+	}
+
+
+	public static Class<? extends ZebraOutputPartition> getZebraOutputPartitionClass(JobContext jobContext) throws IOException {
+		Configuration conf = jobContext.getConfiguration();
+
+		Class<?> theClass;	  
+		String valueString = conf.get(ZEBRA_OUTPUT_PARTITIONER_CLASS);
+		if (valueString == null)
+			throw new IOException("zebra output partitioner class not found");
+		try {
+			theClass = conf.getClassByName(valueString);
+		} catch (ClassNotFoundException e) {
+			throw new IOException(e);
+		}	  
+
+		if (theClass != null && !ZebraOutputPartition.class.isAssignableFrom(theClass))
+			throw new IOException(theClass+" not "+ZebraOutputPartition.class.getName());
+		else if (theClass != null)
+			return theClass.asSubclass(ZebraOutputPartition.class);
+		else
+			return null;
+
+	}
+
+
+
+	/**
+	 * Set the output path of the BasicTable in JobContext
+	 * 
+	 * @param jobContext
+	 *          The JobContext object.
+	 * @param path
+	 *          The output path to the table. The path must either not existent,
+	 *          or must be an empty directory.
+	 */
+	public static void setOutputPath(JobContext jobContext, Path path) {
+		Configuration conf = jobContext.getConfiguration();
+		conf.set(OUTPUT_PATH, path.toString());
+		if(conf.getBoolean(IS_MULTI, false) == true) {
+			throw new IllegalArgumentException("Job has been setup as multi output paths");
+		}
+		conf.setBoolean(IS_MULTI, false);
+
+	}
+
+	/**
+	 * Get the output path of the BasicTable from JobContext
+	 * 
+	 * @param jobContext
+	 *          jobContext object
+	 * @return The output path.
+	 */
+	public static Path getOutputPath(JobContext jobContext) {
+		Configuration conf = jobContext.getConfiguration();
+		String path = conf.get(OUTPUT_PATH);
+		return (path == null) ? null : new Path(path);
+	}
+
+	/**
+	 * Set the table schema in JobContext
+	 * 
+	 * @param jobContext
+	 *          The JobContext object.
+	 * @param schema
+	 *          The schema of the BasicTable to be created. For the initial
+	 *          implementation, the schema string is simply a comma separated list
+	 *          of column names, such as "Col1, Col2, Col3".
+	 * 
+	 * @deprecated Use {@link #setStorageInfo(JobContext, ZebraSchema, ZebraStorageHint, ZebraSortInfo)} instead.
+	 */
+	public static void setSchema(JobContext jobContext, String schema) {
+		Configuration conf = jobContext.getConfiguration();
+		conf.set(OUTPUT_SCHEMA, Schema.normalize(schema));
+	}
+
+	/**
+	 * Get the table schema in JobContext.
+	 * 
+	 * @param jobContext
+	 *          The JobContext object.
+	 * @return The output schema of the BasicTable. If the schema is not defined
+	 *         in the jobContext object at the time of the call, null will be returned.
+	 */
+	public static Schema getSchema(JobContext jobContext) throws ParseException {
+		Configuration conf = jobContext.getConfiguration();
+		String schema = conf.get(OUTPUT_SCHEMA);
+		if (schema == null) {
+			return null;
+		}
+		//schema = schema.replaceAll(";", ",");
+		return new Schema(schema);
+	}
+
+	private static KeyGenerator makeKeyBuilder(byte[] elems) {
+		ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
+		for (int i = 0; i < elems.length; ++i) {
+			exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
+		}
+		return new KeyGenerator(ExprUtils.tupleComparator(exprs));
+	}
+
+	/**
+	 * Generates a zebra specific sort key generator which is used to generate BytesWritable key 
+	 * Sort Key(s) are used to generate this object
+	 * 
+	 * @param jobContext
+	 *          The JobContext object.
+	 * @return Object of type zebra.pig.comaprator.KeyGenerator. 
+	 *         
+	 */
+	public static Object getSortKeyGenerator(JobContext jobContext) throws IOException, ParseException {
+		SortInfo sortInfo = getSortInfo( jobContext );
+		Schema schema     = getSchema(jobContext);
+		String[] sortColNames = sortInfo.getSortColumnNames();
+
+		byte[] types = new byte[sortColNames.length];
+		for(int i =0 ; i < sortColNames.length; ++i){
+			types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
+		}
+		KeyGenerator builder = makeKeyBuilder(types);
+		return builder;
+
+	}
+
+
+	/**
+	 * Generates a BytesWritable key for the input key
+	 * using keygenerate provided. Sort Key(s) are used to generate this object
+	 *
+	 * @param builder
+	 *         Opaque key generator created by getSortKeyGenerator() method
+	 * @param t
+	 *         Tuple to create sort key from
+	 * @return ByteWritable Key 
+	 *
+	 */
+	public static BytesWritable getSortKey(Object builder, Tuple t) throws Exception {
+		KeyGenerator kg = (KeyGenerator) builder;
+		return kg.generateKey(t);
+	}
+
+
+
+
+	/**
+	 * Set the table storage hint in JobContext, should be called after setSchema is
+	 * called.
+	 * <br> <br>
+	 * 
+	 * Note that the "secure by" feature is experimental now and subject to
+	 * changes in the future.
+	 *
+	 * @param jobContext
+	 *          The JobContext object.
+	 * @param storehint
+	 *          The storage hint of the BasicTable to be created. The format would
+	 *          be like "[f1, f2.subfld]; [f3, f4]".
+	 * 
+	 * @deprecated Use {@link #setStorageInfo(JobContext, ZebraSchema, ZebraStorageHint, ZebraSortInfo)} instead.
+	 */
+	public static void setStorageHint(JobContext jobContext, String storehint) throws ParseException, IOException {
+		Configuration conf = jobContext.getConfiguration();
+		String schema = conf.get(OUTPUT_SCHEMA);
+
+		if (schema == null)
+			throw new ParseException("Schema has not been set");
+
+		// for sanity check purpose only
+		new Partition(schema, storehint, null);
+
+		conf.set(OUTPUT_STORAGEHINT, storehint);
+	}
+
+	/**
+	 * Get the table storage hint in JobContext.
+	 * 
+	 * @param jobContext
+	 *          The JobContext object.
+	 * @return The storage hint of the BasicTable. If the storage hint is not
+	 *         defined in the jobContext object at the time of the call, an empty string
+	 *         will be returned.
+	 */
+	public static String getStorageHint(JobContext jobContext) throws ParseException {
+		Configuration conf = jobContext.getConfiguration();
+		String storehint = conf.get(OUTPUT_STORAGEHINT);
+		return storehint == null ? "" : storehint;
+	}
+
+	/**
+	 * Set the sort info
+	 *
+	 * @param jobContext
+	 *          The JobContext object.
+	 *          
+	 * @param sortColumns
+	 *          Comma-separated sort column names
+	 *          
+	 * @param comparatorClass
+	 *          comparator class name; null for default
+	 *
+	 * @deprecated Use {@link #setStorageInfo(JobContext, ZebraSchema, ZebraStorageHint, ZebraSortInfo)} instead.
+	 */
+	public static void setSortInfo(JobContext jobContext, String sortColumns, Class<? extends RawComparator<Object>> comparatorClass) {
+		Configuration conf = jobContext.getConfiguration();
+		conf.set(OUTPUT_SORTCOLUMNS, sortColumns);
+		if (comparatorClass != null)
+			conf.set(OUTPUT_COMPARATOR, TFile.COMPARATOR_JCLASS+comparatorClass.getName());
+	}
+
+	/**
+	 * Set the sort info
+	 *
+	 * @param jobContext
+	 *          The JobContext object.
+	 *          
+	 * @param sortColumns
+	 *          Comma-separated sort column names
+	 *          
+	 * @deprecated Use {@link #setStorageInfo(JobContext, ZebraSchema, ZebraStorageHint, ZebraSortInfo)} instead.          
+	 */
+	public static void setSortInfo(JobContext jobContext, String sortColumns) {
+		jobContext.getConfiguration().set(OUTPUT_SORTCOLUMNS, sortColumns);
+	}  
+
+	/**
+	 * Set the table storage info including ZebraSchema, 
+	 *
+	 * @param jobcontext
+	 *          The JobContext object.
+	 *          
+	 * @param zSchema The ZebraSchema object containing schema information.
+	 *  
+	 * @param zStorageHint The ZebraStorageHint object containing storage hint information.
+	 * 
+	 * @param zSortInfo The ZebraSortInfo object containing sorting information.
+	 *          
+	 */
+	public static void setStorageInfo(JobContext jobContext, ZebraSchema zSchema, ZebraStorageHint zStorageHint, ZebraSortInfo zSortInfo) 
+	throws ParseException, IOException {
+		String schemaStr = null;
+		String storageHintStr = null;
+
+		/* validity check on schema*/
+		if (zSchema == null) {
+			throw new IllegalArgumentException("ZebraSchema object cannot be null.");
+		} else {
+			schemaStr = zSchema.toString();
+		}
+
+		Schema schema = null;
+		try {
+			schema = new Schema(schemaStr);
+		} catch (ParseException e) {
+			throw new ParseException("[" + zSchema + "] " + " is not a valid schema string: " + e.getMessage());
+		}
+
+		/* validity check on storage hint*/
+		if (zStorageHint == null) {
+			storageHintStr = "";
+		} else {
+			storageHintStr = zStorageHint.toString();
+		}
+
+		try {
+			new Partition(schemaStr, storageHintStr, null);
+		} catch (ParseException e) {
+			throw new ParseException("[" + zStorageHint + "] " + " is not a valid storage hint string: " + e.getMessage()  ); 
+		} catch (IOException e) {
+			throw new ParseException("[" + zStorageHint + "] " + " is not a valid storage hint string: " + e.getMessage()  );
+		}
+
+		Configuration conf = jobContext.getConfiguration();
+		conf.set(OUTPUT_SCHEMA, schemaStr);
+		conf.set(OUTPUT_STORAGEHINT, storageHintStr);
+
+		/* validity check on sort info if user specifies it */
+		if (zSortInfo != null) {
+			String sortColumnsStr = zSortInfo.getSortColumns();
+			String comparatorStr = zSortInfo.getComparator();      
+
+			/* Check existence of comparable class if user specifies it */
+			if (comparatorStr != null && comparatorStr != "") {
+				try {
+					conf.getClassByName(comparatorStr.substring(TFile.COMPARATOR_JCLASS.length()).trim());
+				} catch (ClassNotFoundException e) {
+					throw new IOException("comparator Class cannot be found : " + e.getMessage());        
+				}
+			}
+
+			try {
+				SortInfo.parse(sortColumnsStr, schema, comparatorStr);
+			} catch (IOException e) {
+				throw new IOException("[" + sortColumnsStr + " + " + comparatorStr + "] " 
+						+ "is not a valid sort configuration: " + e.getMessage());
+			}
+
+			if (sortColumnsStr != null)
+				conf.set(OUTPUT_SORTCOLUMNS, sortColumnsStr);
+			if (comparatorStr != null)
+				conf.set(OUTPUT_COMPARATOR, comparatorStr);
+		}
+	}
+
+	/**
+	 * Get the SortInfo object 
+	 *
+	 * @param jobContext
+	 *          The JobContext object.
+	 * @return SortInfo object; null if the Zebra table is unsorted 
+	 *
+	 */
+	public static SortInfo getSortInfo(JobContext jobContext)throws IOException
+	{
+		Configuration conf = jobContext.getConfiguration();
+		String sortColumns = conf.get(OUTPUT_SORTCOLUMNS);
+		if (sortColumns == null)
+			return null;
+		Schema schema = null;
+		try {
+			schema = getSchema(jobContext);
+		} catch (ParseException e) {
+			throw new IOException("Schema parsing failure : "+e.getMessage());
+		}
+		if (schema == null)
+			throw new IOException("Schema not defined");
+		String comparator = getComparator(jobContext);
+		return SortInfo.parse(sortColumns, schema, comparator);
+	}
+
+	/**
+	 * Get the  comparator for sort columns
+	 *
+	 * @param jobContext
+	 *          The JobContext object.
+	 * @return  comparator String
+	 *
+	 */
+	private static String getComparator(JobContext jobContext)
+	{
+		return jobContext.getConfiguration().get(OUTPUT_COMPARATOR);
+	}
+
+	/**
+	 * Get the output table as specified in JobContext. It is useful for applications
+	 * to add more meta data after all rows have been added to the table.
+	 * 
+	 * @param conf
+	 *          The JobContext object.
+	 * @return The output BasicTable.Writer object.
+	 * @throws IOException
+	 */
+	private static BasicTable.Writer[] getOutput(JobContext jobContext) throws IOException {
+		Path[] paths = getOutputPaths(jobContext);
+		BasicTable.Writer[] writers = new BasicTable.Writer[paths.length]; 
+		for(int i = 0; i < paths.length; i++) {
+			writers[i] = new BasicTable.Writer(paths[i], jobContext.getConfiguration());
+		}
+
+		return writers;
+	}
+
+	/**
+	 * Note: we perform the Initialization of the table here. So we expect this to
+	 * be called before
+	 * {@link BasicTableOutputFormat#getRecordWriter(FileSystem, JobContext, String, Progressable)}
+	 * 
+	 * @see OutputFormat#checkOutputSpecs(JobContext)
+	 */
+	@Override
+	public void checkOutputSpecs(JobContext jobContext)
+	throws IOException {
+		Configuration conf = jobContext.getConfiguration();
+		String schema = conf.get(OUTPUT_SCHEMA);
+		if (schema == null) {
+			throw new IllegalArgumentException("Cannot find output schema");
+		}
+		String storehint, sortColumns, comparator;
+		try {
+			storehint = getStorageHint(jobContext);
+			sortColumns = (getSortInfo(jobContext) == null ? null : SortInfo.toSortString(getSortInfo(jobContext).getSortColumnNames()));
+			comparator = getComparator( jobContext );
+		}
+		catch (ParseException e) {
+			throw new IOException(e);
+		}
+
+		Path [] paths = getOutputPaths(jobContext);
+
+		for(int i = 0; i < paths.length; ++i) {
+			BasicTable.Writer writer =
+				new BasicTable.Writer(paths[i], schema, storehint, sortColumns, comparator, conf);
+			writer.finish();
+		}	
+	}
+
+	/**
+	 * @see OutputFormat#getRecordWriter(TaskAttemptContext)
+	 */
+	@Override
+	public RecordWriter<BytesWritable, Tuple> getRecordWriter(TaskAttemptContext taContext)
+	throws IOException {
+		String path = taContext.getConfiguration().get(OUTPUT_PATH);
+		return new TableRecordWriter(path, taContext);
+	}
+
+	/**
+	 * Close the output BasicTable, No more rows can be added into the table. A
+	 * BasicTable is not visible for reading until it is "closed".
+	 * 
+	 * @param jobContext
+	 *          The JobContext object.
+	 * @throws IOException
+	 */
+	public static void close(JobContext jobContext) throws IOException {
+		BasicTable.Writer tables[] = getOutput(jobContext);
+		for(int i =0; i < tables.length; ++i) {
+			tables[i].close();    	
+		}
+	}
+
+	@Override
+	public OutputCommitter getOutputCommitter(TaskAttemptContext taContext)
+	throws IOException, InterruptedException {
+		return new TableOutputCommitter( taContext ) ;
+	}
+}
+
+class TableOutputCommitter extends OutputCommitter {
+	public TableOutputCommitter(TaskAttemptContext taContext) {
+
+	}
+
+	@Override
+	public void abortTask(TaskAttemptContext taContext) throws IOException {
+	}
+
+	@Override
+	public void cleanupJob(JobContext jobContext) throws IOException {
+	}
+
+	@Override
+	public void commitTask(TaskAttemptContext taContext) throws IOException {
+	}
+
+	@Override
+	public boolean needsTaskCommit(TaskAttemptContext taContext)
+	throws IOException {
+		return false;
+	}
+
+	@Override
+	public void setupJob(JobContext jobContext) throws IOException {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void setupTask(TaskAttemptContext taContext) throws IOException {
+		// TODO Auto-generated method stub
+
+	}
+
+}
+
+/**
+ * Adaptor class for BasicTable RecordWriter.
+ */
+class TableRecordWriter extends RecordWriter<BytesWritable, Tuple> {
+	private final TableInserter inserter[];
+	private org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition op = null;
+
+
+	public TableRecordWriter(String path, TaskAttemptContext context) throws IOException {	
+		Configuration conf = context.getConfiguration();
+		if(conf.getBoolean(BasicTableOutputFormat.IS_MULTI, false) == true) {	  
+			op = (org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition) 
+			ReflectionUtils.newInstance(BasicTableOutputFormat.getZebraOutputPartitionClass(context), conf);
+
+		}  
+		Path [] paths = BasicTableOutputFormat.getOutputPaths(context);
+		inserter = new TableInserter[paths.length];
+                String inserterName = "part-" + context.getTaskAttemptID().getTaskID().getId();
+		for(int i = 0; i < paths.length; ++i) {
+			BasicTable.Writer writer =
+				new BasicTable.Writer(paths[i], conf);
+			this.inserter[i] = writer.getInserter( inserterName, true);
+		}	
+	}
+
+	@Override
+	public void close(TaskAttemptContext context) throws IOException {
+		for(int i = 0; i < this.inserter.length; ++i) {  
+			inserter[i].close();
+		}  
+	}
+
+	@Override
+	public void write(BytesWritable key, Tuple value) throws IOException {
+		if(op != null ) {	  
+			int idx = op.getOutputPartition(key, value);
+			if(idx < 0 || (idx >= inserter.length)) {
+				throw new IllegalArgumentException("index returned by getOutputPartition is out of range");
+			}
+			inserter[idx].insert(key, value);
+		} else {
+			inserter[0].insert(key, value);
+		}
+	}
+
+}
+

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CachedTableScanner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CachedTableScanner.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CachedTableScanner.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CachedTableScanner.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Utility class for those interested in writing their own TableExpr classes.
+ * This class encapsulates a regular table scanner and changes the way how keys
+ * and rows are accessed (with an internal cache).
+ */
+final class CachedTableScanner implements Closeable {
+  private BytesWritable key;
+  private Tuple row;
+  private boolean keyReady;
+  private boolean rowReady;
+  private int index;
+  private TableScanner scanner;
+
+  /**
+   * Constructor
+   * 
+   * @param scanner
+   *          The scanner to be encapsulated
+   * @throws IOException 
+   */
+  public CachedTableScanner(TableScanner scanner, int index) throws IOException {
+    key = new BytesWritable();
+    row = TypesUtils.createTuple(Projection.getNumColumns(scanner.getProjection()));
+    keyReady = false;
+    rowReady = false;
+    this.index = index;
+    this.scanner = scanner;
+  }
+
+  /**
+   * Get the key at cursor.
+   * 
+   * @return The key at cursor
+   * @throws IOException
+   */
+  public BytesWritable getKey() throws IOException {
+    if (!keyReady) {
+      scanner.getKey(key);
+      keyReady = true;
+    }
+    return key;
+  }
+
+  /**
+   * Get the value (Tuple) at cursor.
+   * 
+   * @return the value at cursor.
+   * @throws IOException
+   */
+  public Tuple getValue() throws IOException {
+    if (!rowReady) {
+      scanner.getValue(row);
+      rowReady = true;
+    }
+    return row;
+  }
+
+  /**
+   * Get the table index in a union
+   * 
+   * @return the table index in union
+   */
+  public int getIndex() {
+    return index;
+    
+  }
+  /**
+   * Seek to a row whose key is greater than or equal to the input key.
+   * 
+   * @param inKey
+   *          the input key.
+   * @return true if an exact matching key is found. false otherwise.
+   * @throws IOException
+   */
+  public boolean seekTo(BytesWritable inKey) throws IOException {
+    boolean ret = scanner.seekTo(inKey);
+    reset();
+    return ret;
+  }
+
+  /**
+   * Advance the cursor.
+   * 
+   * @return whether the cursor actually moves.
+   * @throws IOException
+   */
+  public boolean advance() throws IOException {
+    boolean ret = scanner.advance();
+    reset();
+    return ret;
+  }
+
+  private void reset() throws IOException {
+    row = TypesUtils.createTuple(Projection.getNumColumns(scanner.getProjection()));
+    keyReady = false;
+    rowReady = false;
+  }
+
+  /**
+   * Is cursor at end?
+   * 
+   * @return Whether cursor is at the end.
+   * @throws IOException
+   */
+  public boolean atEnd() throws IOException {
+    return scanner.atEnd();
+  }
+
+  /**
+   * Get the schema of the tuples returned from this scanner.
+   * 
+   * @return The schema of the tuples returned from this scanner.
+   */
+  public String getProjection() {
+    return scanner.getProjection();
+  }
+  
+  /**
+   * Get the projected schema
+   * @return The projected schema
+   */
+  public Schema getSchema() {
+    return scanner.getSchema();
+  }
+
+  /**
+   * Seek to the end of the scanner.
+   * 
+   * @throws IOException
+   */
+  public void seekToEnd() throws IOException {
+    reset();
+    scanner.seekToEnd();
+  }
+
+  /**
+   * Close the scanner, release all resources.
+   */
+  @Override
+  public void close() throws IOException {
+    scanner.close();
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CompositeTableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CompositeTableExpr.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CompositeTableExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/CompositeTableExpr.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+
+
+/**
+ * An intermediate class to build concrete composite table expression classes.
+ * Lengthy names are chosen to avoid naming conflicts.
+ */
+abstract class CompositeTableExpr extends TableExpr {
+  public final static int INDENT_UNIT = 2;
+  protected ArrayList<TableExpr> composite;
+
+  protected CompositeTableExpr() {
+    composite = new ArrayList<TableExpr>();
+  }
+
+  protected CompositeTableExpr(int n) {
+    composite = new ArrayList<TableExpr>(n);
+    for (int i = 0; i < n; ++i) {
+      composite.add(null);
+    }
+  }
+
+  protected CompositeTableExpr addCompositeTable(TableExpr expr) {
+    composite.add(expr);
+    return this;
+  }
+
+  protected CompositeTableExpr addCompositeTables(TableExpr[] exprs) {
+    composite.ensureCapacity(composite.size() + exprs.length);
+    for (TableExpr expr : exprs) {
+      composite.add(expr);
+    }
+    return this;
+  }
+
+  protected CompositeTableExpr addCompositeTables(
+      Collection<? extends TableExpr> exprs) {
+    composite.addAll(exprs);
+    return this;
+  }
+
+  protected CompositeTableExpr setCompositeTable(int i, TableExpr expr) {
+    composite.set(i, expr);
+    return this;
+  }
+
+  protected List<TableExpr> getCompositeTables() {
+    return composite;
+  }
+
+  protected TableExpr getCompositeTable(int i) {
+    TableExpr expr = composite.get(i);
+    if (expr == null) {
+      throw new RuntimeException("Incomplete Nary expression");
+    }
+
+    return expr;
+  }
+
+  @Override
+  protected CompositeTableExpr decodeParam(StringReader in) throws IOException {
+    composite.clear();
+    int n = TableExprUtils.decodeInt(in);
+    composite.ensureCapacity(n);
+    for (int i = 0; i < n; ++i) {
+      TableExpr expr = TableExpr.parse(in);
+      composite.add(expr);
+    }
+
+    return this;
+  }
+
+  @Override
+  protected CompositeTableExpr encodeParam(StringBuilder out) {
+    int n = composite.size();
+    TableExprUtils.encodeInt(out, n);
+    for (int i = 0; i < composite.size(); ++i) {
+      getCompositeTable(i).encode(out);
+    }
+
+    return this;
+  }
+
+  @Override
+  public List<LeafTableInfo> getLeafTables(String projection) {
+    ArrayList<LeafTableInfo> ret = new ArrayList<LeafTableInfo>();
+
+    int n = composite.size();
+    for (int i = 0; i < n; ++i) {
+      ret.addAll(getCompositeTable(i).getLeafTables(projection));
+    }
+
+    return ret;
+  }
+
+  protected static class RowMappingEntry {
+    final int rowIndex;
+    final int fieldIndex;
+
+    public RowMappingEntry(int ri, int fi) {
+      rowIndex = ri;
+      fieldIndex = fi;
+    }
+
+    public int getRowIndex() {
+      return rowIndex;
+    }
+
+    public int getFieldIndex() {
+      return fieldIndex;
+    }
+  }
+
+  protected static class InferredProjection {
+    /**
+     * projections on each table in the composite. It should be of the same
+     * dimension as composite.
+     */
+    final Schema[] subProjections;
+
+    /**
+     * The actual (adjusted) input projection.
+     */
+    final Schema projection;
+
+    /**
+     * For each column in the input projection, what is the corresponding sub
+     * table (row index) and the corresponding projected column (field index).
+     */
+    final RowMappingEntry[] colMapping;
+
+    InferredProjection(List<String>[] subProj, String[] proj,
+        Map<String, RowMappingEntry> colMap) throws ParseException {
+      subProjections = new Schema[subProj.length];
+      for (int i = 0; i < subProj.length; ++i) {
+        List<String> subProjection = subProj[i];
+        subProjections[i] =
+            new Schema(subProjection.toArray(new String[subProjection.size()]));
+      }
+      
+      projection = new Schema(proj);
+      
+      this.colMapping = new RowMappingEntry[proj.length];
+      
+      for (int i = 0; i < proj.length; ++i) {
+        colMapping[i] = colMap.get(proj[i]);
+      }
+    }
+
+    public Schema[] getSubProjections() {
+      return subProjections;
+    }
+
+    public RowMappingEntry[] getColMapping() {
+      return colMapping;
+    }
+
+    public Schema getProjection() {
+      return projection;
+    }
+  }
+
+  /**
+   * Given the input projection, infer the projections that should be applied on
+   * individual table in the composite.
+   * 
+   * @param projection
+   *          The expected projection
+   * @return The inferred projection with other information.
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  protected InferredProjection inferProjection(String projection,
+      Configuration conf)
+      throws IOException, ParseException {
+    int n = composite.size();
+    ArrayList<String>[] subProjections = new ArrayList[n];
+    for (int i = 0; i < n; ++i) {
+      subProjections[i] = new ArrayList<String>();
+    }
+    
+    String[] columns;
+    LinkedHashMap<String, Integer> colNameMap =
+        new LinkedHashMap<String, Integer>();
+    TreeMap<String, RowMappingEntry> colMapping =
+        new TreeMap<String, RowMappingEntry>();
+
+    /**
+     * Create a collection of all column names and the corresponding index to
+     * the composite.
+     */
+    for (int i = 0; i < n; ++i) {
+      String[] cols = getCompositeTable(i).getSchema(conf).getColumns();
+      for (int j = 0; j < cols.length; ++j) {
+        if (colNameMap.get(cols[j]) != null) {
+          colNameMap.put(cols[j], -1); // special marking on duplicated names.
+        }
+        else {
+          colNameMap.put(cols[j], i);
+        }
+      }
+    }
+
+    if (projection == null) {
+      Set<String> keySet = colNameMap.keySet();
+      columns = keySet.toArray(new String[keySet.size()]);
+    }
+    else {
+      columns = projection.trim().split(Schema.COLUMN_DELIMITER);
+    }
+
+    for (int i = 0; i < columns.length; ++i) {
+      String col = columns[i];
+
+      if (colMapping.get(col) != null) {
+        throw new IllegalArgumentException(
+            "Duplicate column names in projection");
+      }
+
+      Integer rowIndex = colNameMap.get(col);
+      if (rowIndex == null) {
+        colMapping.put(col, null);
+      }
+      else {
+        if (rowIndex < 0) {
+          // The column name appears in more than one table in composite.
+          throw new RuntimeException("Ambiguous column in projection: "
+              + col);
+        }
+        subProjections[rowIndex].add(columns[i]);
+        colMapping.put(col, new RowMappingEntry(rowIndex,
+            subProjections[rowIndex].size() - 1));
+      }
+    }
+
+    return new InferredProjection(subProjections, columns, colMapping);
+  }
+
+  @Override
+  public Schema getSchema(Configuration conf) throws IOException {
+    Schema result = new Schema();
+    for (Iterator<TableExpr> it = composite.iterator(); it.hasNext();) {
+      TableExpr e = it.next();
+      try {
+        result.unionSchema(e.getSchema(conf));
+      } catch (ParseException exc) {
+        throw new IOException("Schema parsing failed :"+exc.getMessage());
+      }
+    }
+    return result;
+  }
+  
+  @Override
+  public boolean sortedSplitCapable() {
+    for (Iterator<TableExpr> it = composite.iterator(); it.hasNext();) {
+      TableExpr e = it.next();
+      if (!e.sortedSplitCapable()) return false;
+    }
+
+    return true;
+  }
+  
+  @Override
+  protected void dumpInfo(PrintStream ps, Configuration conf, int indent) throws IOException
+  {
+    for (Iterator<TableExpr> it = composite.iterator(); it.hasNext();) {
+      TableExpr e = it.next();
+      e.dumpInfo(ps, conf, indent+INDENT_UNIT);
+    }
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/NullScanner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/NullScanner.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/NullScanner.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/NullScanner.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.pig.data.Tuple;
+
+/**
+ * A scanner that contains no rows.
+ */
+class NullScanner implements TableScanner {
+  String projection;
+
+  public NullScanner(String projection) {
+    this.projection = projection;
+  }
+  
+  @Override
+  public boolean advance() throws IOException {
+    return false;
+  }
+
+  @Override
+  public boolean atEnd() throws IOException {
+    return true;
+  }
+
+  @Override
+  public String getProjection() {
+    return projection;
+  }
+  
+  @Override
+  public Schema getSchema() {
+    Schema result;
+    try {
+       result = Projection.toSchema(projection);
+    } catch (ParseException e) {
+      throw new AssertionError("Invalid Projection: "+e.getMessage());
+    }
+    return result;
+  }
+
+  @Override
+  public void getKey(BytesWritable key) throws IOException {
+    throw new EOFException("No more rows to read");
+  }
+
+  @Override
+  public void getValue(Tuple row) throws IOException {
+    throw new EOFException("No more rows to read");
+  }
+
+  @Override
+  public boolean seekTo(BytesWritable key) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void seekToEnd() throws IOException {
+    // Do nothing
+  }
+
+  @Override
+  public void close() throws IOException {
+    // Do nothing
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+
+/**
+ * Table Expression - expression to describe an input table.
+ */
+abstract class TableExpr {
+  private boolean sorted = false;
+  /**
+   * Factory method to create a TableExpr from a string.
+   * 
+   * @param in
+   *          The string stream that is pointed at the beginning of the table
+   *          expression string to be parsed.
+   * @return The instantiated TableExpr object. The string stream will move past
+   *         the table expression string.
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public static TableExpr parse(StringReader in) throws IOException {
+    String clsName = TableExprUtils.decodeString(in);
+    try {
+      Class<? extends TableExpr> tblExprCls =
+          (Class<? extends TableExpr>) Class.forName(clsName);
+      return tblExprCls.newInstance().decodeParam(in);
+    }
+    catch (Exception e) {
+      throw new RuntimeException("Failed to load class: " + e.toString());
+    }
+  }
+
+  /**
+   * Encode the expression to a string stream.
+   * 
+   * @param out
+   *          The string stream that we should encode the expression into.
+   */
+  public final void encode(StringBuilder out) {
+    TableExprUtils.encodeString(out, getClass().getName());
+    encodeParam(out);
+  }
+
+  /**
+   * Decode the parameters of the expression from the input stream.
+   * 
+   * @param in
+   *          The string stream that is pointed at the beginning of the encoded
+   *          parameters.
+   * @return Reference to itself, and the TableExpr that has its parameters set
+   *         from the string stream. The string stream will move past the
+   *         encoded parameters for this expression.
+   */
+  protected abstract TableExpr decodeParam(StringReader in) throws IOException;
+
+  /**
+   * Encode the parameters of the expression into the string stream.
+   * 
+   * @param out
+   *          The string stream that we write encoded parameters into.
+   * @return Reference to itself,
+   */
+  protected abstract TableExpr encodeParam(StringBuilder out);
+
+  /**
+   * Get a TableScanner from the TableExpr object. This method only needs to be
+   * implemented by table expressions that support sorted split.
+   * 
+   * @param begin
+   *          the Begin key (inclusive). Can be null, meaning starting from the
+   *          first row of the table.
+   * @param end
+   *          the End key (exclusive). Can be null, meaning scan to the last row
+   *          of the table.
+   * @param projection
+   *          The projection schema. It should never be null.
+   * @see Schema
+   * @return A TableScanner object.
+   */
+  public TableScanner getScanner(BytesWritable begin,
+      BytesWritable end, String projection, Configuration conf)
+      throws IOException {
+    return null;
+  }
+
+  /**
+   * Get a scanner with an unsorted split.
+   * 
+   * @param split
+   *          The range split.
+   * @param projection
+   *          The projection schema. It should never be null.
+   * @param conf
+   *          The configuration
+   * @return A table scanner.
+   * @throws IOException
+   */
+  public TableScanner getScanner(UnsortedTableSplit split, String projection,
+      Configuration conf) throws IOException, ParseException {
+    BasicTable.Reader reader =
+        new BasicTable.Reader(new Path(split.getPath()), conf);
+    reader.setProjection(projection);
+    return reader.getScanner(split.getSplit(), true);
+  }
+  
+  /**
+   * Get a scanner with an unsorted split.
+   * 
+   * @param split
+   *          The range split.
+   * @param projection
+   *          The projection schema. It should never be null.
+   * @param conf
+   *          The configuration
+   * @return A table scanner.
+   * @throws IOException
+   */
+  public TableScanner getScanner(RowTableSplit split, String projection,
+      Configuration conf) throws IOException, ParseException, ParseException {
+    BasicTable.Reader reader =
+        new BasicTable.Reader(new Path(split.getPath()), conf);
+    reader.setProjection(projection);
+    return reader.getScanner(true, split.getSplit());
+  }
+  
+  /**
+   * A leaf table corresponds to a materialized table. It is represented by the
+   * path to the BasicTable and the projection.
+   */
+  public static final class LeafTableInfo {
+    private final Path path;
+    private final String projection;
+
+    public LeafTableInfo(Path path, String projection) {
+      this.path = path;
+      this.projection = projection;
+    }
+
+    public Path getPath() {
+      return path;
+    }
+
+    public String getProjection() {
+      return projection;
+    }
+  }
+
+  /**
+   * Get the information of all leaf tables that will be accessed by this table
+   * expression.
+   * 
+   * @param projection
+   *          The projection that is applied to the table expression.
+   */
+  public abstract List<LeafTableInfo> getLeafTables(
+      String projection);
+
+  /**
+   * Get the schema of the table.
+   * 
+   * @param conf
+   *          The configuration object.
+   */
+  public abstract Schema getSchema(Configuration conf) throws IOException;
+
+  /**
+   * Does this expression requires sorted split? If yes, we require all
+   * underlying BasicTables to be sorted and we split by key sampling. If this
+   * method returns true, we expect sortedSplitCapable() also return true.
+   * 
+   * @return Whether this expression may only be split by key.
+   */
+  public boolean sortedSplitRequired() {
+    return sorted;
+  }
+
+  /**
+   * Set the requirement for sorted table
+   */
+  public void setSortedSplit() {
+    sorted = true;
+  }
+
+  /**
+   * Is this expression capable of sorted split? If false, getScanner() should
+   * return null; otherwise, getScanner() should return a valid Scanner object.
+   * 
+   * This function should be overridden by sub classes that is capable of sorted
+   * split. Note that this method should not perform any actual I/O operation,
+   * such as checking whether the leaf tables (BasicTables) is in fact sorted or
+   * not. When this method returns true, while at least one of the leaf tables
+   * is not sorted, an {@link IOException} will be thrown in split generation
+   * time.
+   * 
+   * @return Whether the "table view" represented by the expression is sorted
+   *         and is thus splittable by key (sorted split).
+   */
+  public boolean sortedSplitCapable() {
+    return false;
+  }
+
+  /**
+   * dump table info
+   */
+  public final void dumpInfo(PrintStream ps, Configuration conf) throws IOException
+  {
+    dumpInfo(ps, conf, 0);
+  }
+  
+  /**
+   * dump table info with indent
+   */
+  protected abstract void dumpInfo(PrintStream ps, Configuration conf, int indent) throws IOException;
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExprUtils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExprUtils.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExprUtils.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExprUtils.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+import java.io.StringReader;
+
+/**
+ * Utility methods for people interested in writing their own TableExpr classes.
+ */
+class TableExprUtils {
+  private TableExprUtils() {
+    // prevent instantiation
+  }
+
+  /**
+   * Encode a string into a StringBuilder.
+   * 
+   * @param out
+   *          output string builder.
+   * @param s
+   *          String to be encoded
+   */
+  public static void encodeString(StringBuilder out, String s) {
+    if (s == null) {
+      encodeInt(out, null);
+      return;
+    }
+    
+    encodeInt(out, s.length());
+    out.append(s);
+  }
+  
+  /**
+   * Decode a string previously encoded through encodeString.
+   * 
+   * @param in
+   *          The input StringReader.
+   * @return The decoded string object.
+   * @throws IOException
+   */
+  public static String decodeString(StringReader in) throws IOException {
+    Integer len = decodeInt(in);
+    if (len == null) {
+      return null;
+    }
+
+    char[] chars = new char[len];
+    in.read(chars);
+    return new String(chars);
+  }
+  
+  public static void encodeLong(StringBuilder out, Long l) {
+    if (l != null) {
+      out.append(l);
+    }
+    out.append(':');
+  }
+
+  public static Long decodeLong(StringReader in) throws IOException {
+    int c = in.read();
+    if (c == ':') {
+      return null;
+    }
+    if (Character.isDigit(c) == false) {
+      throw new IllegalArgumentException("Bad encoded string");
+    }
+    StringBuilder sb = new StringBuilder();
+    sb.append((char) c);
+
+    boolean colonSeen = false;
+    while ((c = in.read()) != -1) {
+      if (c == ':') {
+        colonSeen = true;
+        break;
+      }
+      if (!Character.isDigit(c)) {
+        break;
+      }
+      sb.append((char) c);
+    }
+    if (colonSeen == false) {
+      throw new IllegalArgumentException("Bad encoded string");
+    }
+    return new Long(sb.toString());
+  }
+  
+  public static void encodeInt(StringBuilder out, Integer i) {
+    if (i == null) {
+      encodeLong(out, null);
+    }
+    else {
+      encodeLong(out, (long) i.intValue());
+    }
+  }
+
+  public static Integer decodeInt(StringReader in) throws IOException {
+    Long l = decodeLong(in);
+    if (l == null) return null;
+    return (int) l.longValue();
+  }
+}