You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/02/13 01:06:17 UTC

svn commit: r909667 [2/9] - in /hadoop/pig/branches/load-store-redesign/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/ src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/ src/java/org/apache/hadoop/zebra/mapreduce/ src/ja...

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,1021 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.zebra.tfile.RawComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.BasicTableStatus;
+import org.apache.hadoop.zebra.io.BlockDistribution;
+import org.apache.hadoop.zebra.io.KeyDistribution;
+import org.apache.hadoop.zebra.io.BasicTable.Reader;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RowSplit;
+import org.apache.hadoop.zebra.mapreduce.TableExpr.LeafTableInfo;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.SortInfo;
+import org.apache.hadoop.zebra.tfile.TFile;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+/**
+ * {@link org.apache.hadoop.mapreduce.InputFormat} class for reading one or more
+ * BasicTables.
+ * 
+ * Usage Example:
+ * <p>
+ * In the main program, add the following code.
+ * 
+ * <pre>
+ * job.setInputFormatClass(TableInputFormat.class);
+ * TableInputFormat.setInputPaths(jobContext, new Path(&quot;path/to/table1&quot;, new Path(&quot;path/to/table2&quot;);
+ * TableInputFormat.setProjection(jobContext, &quot;Name, Salary, BonusPct&quot;);
+ * </pre>
+ * 
+ * The above code does the following things:
+ * <UL>
+ * <LI>Set the input format class to TableInputFormat.
+ * <LI>Set the paths to the BasicTables to be consumed by user's Mapper code.
+ * <LI>Set the projection on the input tables. In this case, the Mapper code is
+ * only interested in three fields: "Name", "Salary", "BonusPct". "Salary"
+ * (perhaps for the purpose of calculating the person's total payout). If no
+ * project is specified, then all columns from the input tables will be
+ * retrieved. If input tables have different schemas, then the input contains
+ * the union of all columns from all the input tables. Absent fields will be
+ * left as nul in the input tuple.
+ * </UL>
+ * The user Mapper code should look like the following:
+ * 
+ * <pre>
+ * static class MyMapClass implements Mapper&lt;BytesWritable, Tuple, K, V&gt; {
+ *   // keep the tuple object for reuse.
+ *   // indices of various fields in the input Tuple.
+ *   int idxName, idxSalary, idxBonusPct;
+ * 
+ *   &#064;Override
+ *   public void configure(Job job) {
+ *     Schema projection = TableInputFormat.getProjection(job);
+ *     // determine the field indices.
+ *     idxName = projection.getColumnIndex(&quot;Name&quot;);
+ *     idxSalary = projection.getColumnIndex(&quot;Salary&quot;);
+ *     idxBonusPct = projection.getColumnIndex(&quot;BonusPct&quot;);
+ *   }
+ * 
+ *   &#064;Override
+ *   public void map(BytesWritable key, Tuple value, OutputCollector&lt;K, V&gt; output,
+ *       Reporter reporter) throws IOException {
+ *     try {
+ *       String name = (String) value.get(idxName);
+ *       int salary = (Integer) value.get(idxSalary);
+ *       double bonusPct = (Double) value.get(idxBonusPct);
+ *       // do something with the input data
+ *     } catch (ExecException e) {
+ *       e.printStackTrace();
+ *     }
+ *   }
+ * 
+ *   &#064;Override
+ *   public void close() throws IOException {
+ *     // no-op
+ *   }
+ * }
+ * </pre>
+ * 
+ * A little bit more explanation on the PIG {@link Tuple} objects. A Tuple is an
+ * ordered list of PIG datum objects. The permitted PIG datum types can be
+ * categorized as Scalar types and Composite types.
+ * <p>
+ * Supported Scalar types include seven native Java types: Boolean, Byte,
+ * Integer, Long, Float, Double, String, as well as one PIG class called
+ * {@link DataByteArray} that represents type-less byte array.
+ * <p>
+ * Supported Composite types include:
+ * <UL>
+ * <LI>{@link Map} : It is the same as Java Map class, with the additional
+ * restriction that the key-type must be one of the scalar types PIG recognizes,
+ * and the value-type any of the scaler or composite types PIG understands.
+ * <LI>{@link DataBag} : A DataBag is a collection of Tuples.
+ * <LI>{@link Tuple} : Yes, Tuple itself can be a datum in another Tuple.
+ * </UL>
+ */
+public class TableInputFormat extends InputFormat<BytesWritable, Tuple> {
+  static Log LOG = LogFactory.getLog(TableInputFormat.class);
+  
+  private static final String INPUT_EXPR = "mapreduce.lib.table.input.expr";
+  private static final String INPUT_PROJ = "mapreduce.lib.table.input.projection";
+  private static final String INPUT_SORT = "mapreduce.lib.table.input.sort";
+
+  /**
+   * Set the paths to the input table.
+   * 
+   * @param conf
+   *          JobContext object.
+   * @param paths
+   *          one or more paths to BasicTables. The InputFormat class will
+   *          produce splits on the "union" of these BasicTables.
+   */
+  public static void setInputPaths(JobContext jobContext, Path... paths) {
+    if (paths.length < 1) {
+      throw new IllegalArgumentException("Requring at least one input path");
+    }
+    
+    Configuration conf = jobContext.getConfiguration();
+    
+    if (paths.length == 1) {
+      setInputExpr(conf, new BasicTableExpr(paths[0]));
+    }
+    else {
+      TableUnionExpr expr = new TableUnionExpr();
+      for (Path path : paths) {
+        expr.add(new BasicTableExpr(path));
+      }
+      setInputExpr(conf, expr);
+    }
+  }
+  
+  /**
+   * Set the input expression in the Configuration object.
+   * 
+   * @param conf
+   *          Configuration object.
+   * @param expr
+   *          The input table expression.
+   */
+  static void setInputExpr(Configuration conf, TableExpr expr) {
+    StringBuilder out = new StringBuilder();
+    expr.encode(out);
+    conf.set(INPUT_EXPR, out.toString());
+  }
+
+  static TableExpr getInputExpr(JobContext jobContext) throws IOException {
+    Configuration conf = jobContext.getConfiguration();
+    String expr = conf.get(INPUT_EXPR);
+    if (expr == null) {
+      // try setting from input path
+      Path[] paths = FileInputFormat.getInputPaths( jobContext );
+      if (paths != null) {
+        setInputPaths(jobContext, paths);
+      }
+      expr = conf.get(INPUT_EXPR);
+    }
+      
+    if (expr == null) {
+      throw new IllegalArgumentException("Input expression not defined.");
+    }
+    StringReader in = new StringReader(expr);
+    return TableExpr.parse(in);
+  }
+  
+  /**
+   * Get the schema of a table expr
+   * 
+   * @param jobContext
+   *          JobContext object.
+   *                    
+   */
+  public static Schema getSchema(JobContext jobContext) throws IOException
+  {
+	  TableExpr expr = getInputExpr( jobContext );
+	  return expr.getSchema( jobContext.getConfiguration() );
+  }  
+  
+  /**
+   * Set the input projection in the JobContext object.
+   * 
+   * @param jobContext
+   *          JobContext object.
+   * @param projection
+   *          A common separated list of column names. If we want select all
+   *          columns, pass projection==null. The syntax of the projection
+   *          conforms to the {@link Schema} string.
+   * @deprecated Use {@link #setProjection(JobContext, ZebraProjection)} instead.
+   */
+  public static void setProjection(JobContext jobContext, String projection) throws ParseException {
+	  setProjection( jobContext.getConfiguration(), projection );
+  }
+  
+  /**
+   * Set the input projection in the JobContext object.
+   * 
+   * @param conf
+   *          Configuration object.
+   * @param projection
+   *          A common separated list of column names. If we want select all
+   *          columns, pass projection==null. The syntax of the projection
+   *          conforms to the {@link Schema} string.
+   */
+  private static void setProjection(Configuration conf, String projection) throws ParseException {
+      conf.set(INPUT_PROJ, Schema.normalize(projection));
+
+    // virtual source_table columns require sorted table
+    if (Projection.getVirtualColumnIndices(projection) != null && !getSorted( conf ))
+        throw new ParseException("The source_table virtual column is only availabe for sorted table unions.");
+  }
+
+  /**
+   * Set the input projection in the JobContext object.
+   * 
+   * @param jobContext
+   *          JobContext object.
+   * @param projection
+   *          A common separated list of column names. If we want select all
+   *          columns, pass projection==null. The syntax of the projection
+   *          conforms to the {@link Schema} string.
+   *
+   */
+  public static void setProjection(JobContext jobContext, ZebraProjection projection) throws ParseException {
+    /* validity check on projection */
+    Schema schema = null;
+    String normalizedProjectionString = Schema.normalize(projection.toString());
+    try {
+      schema = getSchema( jobContext );
+      new org.apache.hadoop.zebra.types.Projection(schema, normalizedProjectionString);
+    } catch (ParseException e) {
+      throw new ParseException("[" + projection + "] " + "is not a valid Zebra projection string " + e.getMessage());
+    } catch (IOException e) {
+      throw new ParseException("[" + projection + "] " + "is not a valid Zebra projection string " + e.getMessage());
+    }
+    
+    Configuration conf = jobContext.getConfiguration();
+    conf.set(INPUT_PROJ, normalizedProjectionString);
+
+    // virtual source_table columns require sorted table
+    if (Projection.getVirtualColumnIndices(projection.toString()) != null && !getSorted( conf ))
+      throw new ParseException("The source_table virtual column is only availabe for sorted table unions.");
+  }  
+
+  /**
+   * Get the projection from the JobContext
+   * 
+   * @param jobContext
+   *          The JobContext object
+   * @return The projection schema. If projection has not been defined, or is
+   *         not known at this time, null will be returned. Note that by the time
+   *         when this method is called in Mapper code, the projection must
+   *         already be known.
+   * @throws IOException
+   *  
+   */
+  public static String getProjection(JobContext jobContext) throws IOException, ParseException {
+    Configuration conf = jobContext.getConfiguration();
+    String strProj = conf.get(INPUT_PROJ);
+    // TODO: need to be revisited
+    if (strProj != null) return strProj;
+    TableExpr expr = getInputExpr( jobContext );
+    if (expr != null) {
+      return expr.getSchema(conf).toProjectionString();
+    }
+    return null;
+  }
+      
+  /**
+   * Set requirement for sorted table
+   *
+   *@param conf
+   *          Configuration object.
+   */
+  private static void setSorted(Configuration conf) {
+    conf.setBoolean(INPUT_SORT, true);
+  }
+  
+  /**
+   * Get the SortInfo object regarding a Zebra table
+   *
+   * @param jobContext
+   *          JobContext object
+   * @return the zebra tables's SortInfo; null if the table is unsorted.
+   */
+  public static SortInfo getSortInfo(JobContext jobContext) throws IOException
+  {
+      Configuration conf = jobContext.getConfiguration();
+	  TableExpr expr = getInputExpr(jobContext);
+	  SortInfo result = null;
+	  int sortSize = 0;
+	  if (expr instanceof BasicTableExpr)
+    {
+      BasicTable.Reader reader = new BasicTable.Reader(((BasicTableExpr) expr).getPath(), conf);
+      SortInfo sortInfo = reader.getSortInfo();
+      reader.close();
+      result = sortInfo;
+	  } else {
+      List<LeafTableInfo> leaves = expr.getLeafTables(null);
+      for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext(); )
+      {
+        LeafTableInfo leaf = it.next();
+        BasicTable.Reader reader = new BasicTable.Reader(leaf.getPath(), conf);
+        SortInfo sortInfo = reader.getSortInfo();
+        reader.close();
+        if (sortSize == 0)
+        {
+          sortSize = sortInfo.size();
+          result = sortInfo;
+        } else if (sortSize != sortInfo.size()) {
+          throw new IOException("Tables of the table union do not possess the same sort property.");
+        }
+		  }
+	  }
+	  return result;
+  }
+
+  /**
+   * Requires sorted table or table union
+   * 
+   * @param jobContext
+   *          JobContext object.
+   * @param sortInfo
+   *          ZebraSortInfo object containing sorting information.
+   *        
+   */
+  public static void requireSortedTable(JobContext jobContext, ZebraSortInfo sortInfo) throws IOException {
+	 TableExpr expr = getInputExpr( jobContext );
+     Configuration conf = jobContext.getConfiguration();
+	 String comparatorName = null;
+ 	 String[] sortcolumns = null;
+         if (sortInfo != null)
+         {
+           comparatorName = TFile.COMPARATOR_JCLASS+sortInfo.getComparator();
+           String sortColumnNames = sortInfo.getSortColumns();
+           if (sortColumnNames != null)
+             sortcolumns =  sortColumnNames.trim().split(SortInfo.SORTED_COLUMN_DELIMITER);
+           if (sortcolumns == null)
+             throw new IllegalArgumentException("No sort columns specified.");
+         }
+
+	 if (expr instanceof BasicTableExpr)
+	 {
+		 BasicTable.Reader reader = new BasicTable.Reader(((BasicTableExpr) expr).getPath(), conf);
+		 SortInfo mySortInfo = reader.getSortInfo();
+
+		 reader.close();
+		 if (mySortInfo == null)
+       throw new IOException("The table is not sorted");
+		 if (comparatorName == null)
+			 // cheat the equals method's comparator comparison
+			 comparatorName = mySortInfo.getComparator();
+		 if (sortcolumns != null && !mySortInfo.equals(sortcolumns, comparatorName))
+		 {
+			 throw new IOException("The table is not properly sorted");
+		 }
+		 setSorted( conf );
+	 } else {
+		 List<LeafTableInfo> leaves = expr.getLeafTables(null);
+		 for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext(); )
+		 {
+			 LeafTableInfo leaf = it.next();
+			 BasicTable.Reader reader = new BasicTable.Reader(leaf.getPath(), conf);
+			 SortInfo mySortInfo = reader.getSortInfo();
+			 reader.close();
+			 if (mySortInfo == null)
+			   throw new IOException("The table is not sorted");
+			 if (comparatorName == null)
+				 comparatorName = mySortInfo.getComparator(); // use the first table's comparator as comparison base
+			 if (sortcolumns == null)
+       {
+         sortcolumns = mySortInfo.getSortColumnNames();
+         comparatorName = mySortInfo.getComparator();
+       } else {
+         if (!mySortInfo.equals(sortcolumns, comparatorName))
+         {
+           throw new IOException("The table is not properly sorted");
+         }
+       }
+		 }
+		 // need key range input splits for sorted table union
+		 setSorted( conf );
+	 }
+  }
+  
+  /**
+   * Get requirement for sorted table
+   *
+   *@param jobContext JobContext object.
+   */
+   private static boolean getSorted(Configuration conf) {
+    return conf.getBoolean(INPUT_SORT, false);
+  }
+
+    /**
+     * @see InputFormat#createRecordReader(InputSplit, TaskAttemptContext)
+     */
+    @Override
+    public RecordReader<BytesWritable, Tuple> createRecordReader(InputSplit split,
+    		TaskAttemptContext taContext) throws IOException,InterruptedException {
+        return createRecordReaderFromJobContext( split, taContext );
+    }
+
+    private RecordReader<BytesWritable, Tuple> createRecordReaderFromJobContext(InputSplit split,
+                JobContext jobContext) throws IOException,InterruptedException {
+        Configuration conf = jobContext.getConfiguration();
+
+    	TableExpr expr = getInputExpr( conf );
+    	if (expr == null) {
+    		throw new IOException("Table expression not defined");
+    	}
+
+    	if ( getSorted( conf ) )
+    		expr.setSortedSplit();
+
+    	String strProj = conf.get(INPUT_PROJ);
+    	String projection = null;
+    	try {
+    		if (strProj == null) {
+    			projection = expr.getSchema(conf).toProjectionString();
+    			ZebraProjection proj = ZebraProjection.createZebraProjection( projection );
+    			TableInputFormat.setProjection( jobContext, proj );
+    		} else {
+    			projection = strProj;
+    		}
+    	} catch (ParseException e) {
+    		throw new IOException("Projection parsing failed : "+e.getMessage());
+    	}
+
+    	try {
+    		return new TableRecordReader(expr, projection, split, jobContext);
+    	} catch (ParseException e) {
+    		throw new IOException("Projection parsing faile : "+e.getMessage());
+    	}
+    }
+  
+    private static TableExpr getInputExpr(Configuration conf) throws IOException {
+    	String expr = conf.get(INPUT_EXPR);
+    	if (expr == null) {
+    		throw new IllegalArgumentException("Input expression not defined.");
+    	}
+    	StringReader in = new StringReader(expr);
+    	return TableExpr.parse(in);
+
+    }
+
+    /**
+     * Get a TableRecordReader on a single split
+     * 
+     * @param jobContext
+     *          JobContext object.
+     * @param projection
+     *          comma-separated column names in projection. null means all columns in projection
+     */
+    public static TableRecordReader createTableRecordReader(JobContext jobContext, String projection)
+    throws IOException, ParseException, InterruptedException {
+    	if (projection != null)
+    		setProjection( jobContext, ZebraProjection.createZebraProjection( projection ) );
+    	TableInputFormat inputFormat = new TableInputFormat();
+
+    	// a single split is needed
+    	List<InputSplit> splits = inputFormat.getSplits( jobContext, true );
+    	if( splits.size() != 1 )
+    		throw new IOException("Unable to generated one single split for the sorted table (internal error)" );
+    	return (TableRecordReader)inputFormat.createRecordReaderFromJobContext( splits.get( 0 ), jobContext );
+    }
+
+  private static List<InputSplit> getSortedSplits(Configuration conf, int numSplits,
+      TableExpr expr, List<BasicTable.Reader> readers,
+      List<BasicTableStatus> status) throws IOException {
+    if (expr.sortedSplitRequired() && !expr.sortedSplitCapable()) {
+      throw new IOException("Unable to created sorted splits");
+    }
+
+    long totalBytes = 0;
+    for (Iterator<BasicTableStatus> it = status.iterator(); it.hasNext();) {
+      BasicTableStatus s = it.next();
+      totalBytes += s.getSize();
+    }
+
+    long maxSplits = totalBytes / getMinSplitSize( conf );
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    if (maxSplits == 0)                                                                               
+        numSplits = 1;                 
+    else if (numSplits > maxSplits) {
+        numSplits = -1;                
+    }    
+
+    for (Iterator<BasicTable.Reader> it = readers.iterator(); it.hasNext();) {
+      BasicTable.Reader reader = it.next();
+      if (!reader.isSorted()) {
+        throw new IOException("Attempting sorted split on unsorted table");
+      }
+    }
+
+    if (numSplits == 1) {
+      BlockDistribution bd = null;
+      for (Iterator<BasicTable.Reader> it = readers.iterator(); it.hasNext();) {
+        BasicTable.Reader reader = it.next();
+        bd = BlockDistribution.sum(bd, reader.getBlockDistribution((RangeSplit)null));
+      }
+      
+      SortedTableSplit split = new SortedTableSplit(null, null, bd, conf);
+      splits.add(split);
+      return splits;
+    }
+    
+    // TODO: Does it make sense to interleave keys for all leaf tables if
+    // numSplits <= 0 ?
+    int nLeaves = readers.size();
+    KeyDistribution keyDistri = null;
+    for (int i = 0; i < nLeaves; ++i) {
+      KeyDistribution btKeyDistri =
+          readers.get(i).getKeyDistribution(
+              (numSplits <= 0) ? -1 :
+              Math.max(numSplits * 5 / nLeaves, numSplits));
+      keyDistri = KeyDistribution.sum(keyDistri, btKeyDistri);
+    }
+    
+    if (keyDistri == null) {
+      // should never happen.
+       return splits;
+    }
+    
+    if (numSplits > 0) {
+      keyDistri.resize(numSplits);
+    }
+    
+    RawComparable[] rawKeys = keyDistri.getKeys();
+    BytesWritable[] keys = new BytesWritable[rawKeys.length];
+    for (int i=0; i<keys.length; ++i) {
+      RawComparable rawKey = rawKeys[i];
+      keys[i] = new BytesWritable();
+      keys[i].setSize(rawKey.size());
+      System.arraycopy(rawKey.buffer(), rawKey.offset(), keys[i].getBytes(), 0,
+          rawKey.size());
+    }
+    
+    // TODO: Should we change to RawComparable to avoid the creation of
+    // BytesWritables?
+    for (int i = 0; i < keys.length; ++i) {
+      BytesWritable begin = (i == 0) ? null : keys[i - 1];
+      BytesWritable end = (i == keys.length - 1) ? null : keys[i];
+      BlockDistribution bd = keyDistri.getBlockDistribution(keys[i]);
+      SortedTableSplit split = new SortedTableSplit(begin, end, bd, conf);
+      splits.add(split);
+    }
+    return splits;
+  }
+  
+  static long getMinSplitSize(Configuration conf) {
+    return conf.getLong("table.input.split.minSize", 1 * 1024 * 1024L);
+  }
+
+  /**
+   * Set the minimum split size.
+   * 
+   * @param jobContext
+   *          The job conf object.
+   * @param minSize
+   *          Minimum size.
+   */
+  public static void setMinSplitSize(JobContext jobContext, long minSize) {
+    jobContext.getConfiguration().setLong("table.input.split.minSize", minSize);
+  }
+  
+  private static class DummyFileInputFormat extends FileInputFormat<BytesWritable, Tuple> {
+    public DummyFileInputFormat(Job job, long minSplitSize) {
+      super.setMinInputSplitSize( job, minSplitSize );
+    }
+    
+
+	@Override
+	public RecordReader<BytesWritable, Tuple> createRecordReader(
+			InputSplit arg0, TaskAttemptContext arg1) throws IOException,
+			InterruptedException {
+		// no-op
+		return null;
+	}
+  }
+  
+  private static List<InputSplit> getRowSplits(Configuration conf,
+      TableExpr expr, List<BasicTable.Reader> readers, 
+      List<BasicTableStatus> status) throws IOException {
+    ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
+    Job job = new Job(conf);
+    DummyFileInputFormat helper = new DummyFileInputFormat(job, getMinSplitSize(conf));
+
+    for (int i = 0; i < readers.size(); ++i) {
+      BasicTable.Reader reader = readers.get(i);
+      /* Get the index of the column group that will be used for row-split.*/
+      int splitCGIndex = reader.getRowSplitCGIndex();
+      
+      /* We can create input splits only if there does exist a valid column group for split.
+       * Otherwise, we do not create input splits. */
+      if (splitCGIndex >= 0) {        
+        Path path = new Path (reader.getPath().toString() + "/" + reader.getName(splitCGIndex));
+        DummyFileInputFormat.setInputPaths(job, path);
+        PathFilter filter = reader.getPathFilter( job.getConfiguration() );
+        DummyFileInputFormat.setInputPathFilter(job, filter.getClass());
+        
+        List<InputSplit> inputSplits = helper.getSplits( job );
+        
+        long starts[] = new long[inputSplits.size()];
+        long lengths[] = new long[inputSplits.size()];
+        Path paths[] = new Path [inputSplits.size()];
+        for (int j=0; j<inputSplits.size(); j++) {
+          FileSplit fileSplit = (FileSplit) inputSplits.get( j );
+          Path p = fileSplit.getPath();
+          long start = fileSplit.getStart();
+          long length = fileSplit.getLength();
+
+          starts[j] = start;
+          lengths[j] = length;
+          paths[j] = p;
+        }
+        
+        List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex);
+  
+        for (Iterator<RowSplit> it = subSplits.iterator(); it.hasNext();) {
+          RowSplit subSplit = it.next();
+          RowTableSplit split = new RowTableSplit(reader, subSplit, conf);
+          ret.add(split);
+        }
+      }
+    }
+    
+    LOG.info("getSplits : returning " + ret.size() + " row splits.");
+    return ret;
+  }
+
+    /**
+     * @see InputFormat#getSplits(JobContext)
+     */
+    @Override
+    public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
+	    return getSplits( jobContext, false );
+    }
+
+    private List<InputSplit> getSplits(JobContext jobContext, boolean singleSplit) throws IOException {
+    	Configuration conf = jobContext.getConfiguration();
+    	TableExpr expr = getInputExpr( conf );
+    	if( getSorted(conf) ) {
+    		expr.setSortedSplit();
+    	} else if( singleSplit ) {
+    		throw new IOException( "Table must be sorted in order to get a single sorted split" );
+    	}
+
+    	if( expr.sortedSplitRequired() && !expr.sortedSplitCapable() ) {
+    		throw new IOException( "Unable to created sorted splits" );
+    	}
+
+    	String projection;
+    	try {
+    		projection = getProjection(jobContext);
+    	} catch (ParseException e) {
+    		throw new IOException("getProjection failed : "+e.getMessage());
+    	}
+    	List<LeafTableInfo> leaves = expr.getLeafTables(projection);
+    	int nLeaves = leaves.size();
+    	ArrayList<BasicTable.Reader> readers =
+    		new ArrayList<BasicTable.Reader>(nLeaves);
+    	ArrayList<BasicTableStatus> status =
+    		new ArrayList<BasicTableStatus>(nLeaves);
+
+    	try {
+    		for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext();) {
+    			LeafTableInfo leaf = it.next();
+    			BasicTable.Reader reader =
+    				new BasicTable.Reader(leaf.getPath(), conf );
+    			reader.setProjection(leaf.getProjection());
+    			BasicTableStatus s = reader.getStatus();
+    			readers.add(reader);
+    			status.add(s);
+    		}
+
+    		if( readers.isEmpty() ) {
+    			// I think we should throw exception here.
+    			return new ArrayList<InputSplit>();
+    		}
+
+    		return expr.sortedSplitRequired() ? 
+    				singleSplit ? getSortedSplits( conf, 1, expr, readers, status ) : getSortedSplits(conf, -1, expr, readers, status) : 
+    					getRowSplits( conf, expr, readers, status );
+    	} catch (ParseException e) {
+    		throw new IOException("Projection parsing failed : "+e.getMessage());
+    	} finally {
+    		for (Iterator<BasicTable.Reader> it = readers.iterator(); it.hasNext();) {
+    			try {
+    				it.next().close();
+    			}
+    			catch (Exception e) {
+    				e.printStackTrace();
+    				// TODO: log the error here.
+    			}
+    		}
+    	}
+    }
+
+  @Deprecated
+  public synchronized void validateInput(JobContext jobContext) throws IOException {
+    // Validating imports by opening all Tables.
+    TableExpr expr = getInputExpr(jobContext);
+    try {
+      String projection = getProjection(jobContext);
+      List<LeafTableInfo> leaves = expr.getLeafTables(projection);
+      Iterator<LeafTableInfo> iterator = leaves.iterator();
+      while (iterator.hasNext()) {
+        LeafTableInfo leaf = iterator.next();
+        BasicTable.Reader reader =
+            new BasicTable.Reader(leaf.getPath(), jobContext.getConfiguration());
+        reader.setProjection(projection);
+        reader.close();
+      }
+    } catch (ParseException e) {
+    	throw new IOException("Projection parsing failed : "+e.getMessage());
+    }
+  }
+}
+
+/**
+ * Adaptor class for sorted InputSplit for table.
+ */
+class SortedTableSplit extends InputSplit implements Writable {
+
+	BytesWritable begin = null, end = null;
+  
+  String[] hosts;
+  long length = 1;
+
+  public SortedTableSplit()
+  {
+    // no-op for Writable construction
+  }
+  
+  public SortedTableSplit(BytesWritable begin, BytesWritable end,
+      BlockDistribution bd, Configuration conf) {
+    if (begin != null) {
+      this.begin = new BytesWritable();
+      this.begin.set(begin.getBytes(), 0, begin.getLength());
+    }
+    if (end != null) {
+      this.end = new BytesWritable();
+      this.end.set(end.getBytes(), 0, end.getLength());
+    }
+    
+    if (bd != null) {
+      length = bd.getLength();
+      hosts =
+        bd.getHosts( conf.getInt("mapred.lib.table.input.nlocation", 5) );
+    }
+  }
+  
+  @Override
+  public long getLength() throws IOException {
+    return length;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException {
+    if (hosts == null)
+    {
+      String[] tmp = new String[1];
+      tmp[0] = "";
+      return tmp;
+    }
+    return hosts;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    begin = end = null;
+    int bool = WritableUtils.readVInt(in);
+    if (bool == 1) {
+      begin = new BytesWritable();
+      begin.readFields(in);
+    }
+    bool = WritableUtils.readVInt(in);
+    if (bool == 1) {
+      end = new BytesWritable();
+      end.readFields(in);
+    }
+    length = WritableUtils.readVLong(in);
+    int size = WritableUtils.readVInt(in);
+    if (size > 0)
+      hosts = new String[size];
+    for (int i = 0; i < size; i++)
+    	hosts[i] = WritableUtils.readString(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (begin == null) {
+      WritableUtils.writeVInt(out, 0);
+    }
+    else {
+      WritableUtils.writeVInt(out, 1);
+      begin.write(out);
+    }
+    if (end == null) {
+      WritableUtils.writeVInt(out, 0);
+    }
+    else {
+      WritableUtils.writeVInt(out, 1);
+      end.write(out);
+    }
+    WritableUtils.writeVLong(out, length);
+    WritableUtils.writeVInt(out, hosts == null ? 0 : hosts.length);
+    for (int i = 0; i < hosts.length; i++)
+    {
+    	WritableUtils.writeString(out, hosts[i]);
+    }
+  }
+  
+  public BytesWritable getBegin() {
+    return begin;
+  }
+
+  public BytesWritable getEnd() {
+    return end;
+  }
+}
+
+/**
+ * Adaptor class for unsorted InputSplit for table.
+ */
+class UnsortedTableSplit extends InputSplit implements Writable {
+  String path = null;
+  RangeSplit split = null;
+  String[] hosts = null;
+  long length = 1;
+
+  public UnsortedTableSplit(Reader reader, RangeSplit split, Configuration conf)
+      throws IOException {
+    this.path = reader.getPath();
+    this.split = split;
+    BlockDistribution dataDist = reader.getBlockDistribution(split);
+    if (dataDist != null) {
+      length = dataDist.getLength();
+      hosts =
+          dataDist.getHosts(conf.getInt("mapred.lib.table.input.nlocation", 5));
+    }
+  }
+  
+  public UnsortedTableSplit() {
+    // no-op for Writable construction
+  }
+  
+  @Override
+  public long getLength() throws IOException {
+    return length;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException {
+    if (hosts == null)
+    {
+      String[] tmp = new String[1];
+      tmp[0] = "";
+      return tmp;
+    }
+    return hosts;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    path = WritableUtils.readString(in);
+    int bool = WritableUtils.readVInt(in);
+    if (bool == 1) {
+      if (split == null) split = new RangeSplit();
+      split.readFields(in);
+    }
+    else {
+      split = null;
+    }
+    hosts = WritableUtils.readStringArray(in);
+    length = WritableUtils.readVLong(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeString(out, path);
+    if (split == null) {
+      WritableUtils.writeVInt(out, 0);
+    }
+    else {
+      WritableUtils.writeVInt(out, 1);
+      split.write(out);
+    }
+    WritableUtils.writeStringArray(out, hosts);
+    WritableUtils.writeVLong(out, length);
+  }
+
+  public String getPath() {
+    return path;
+  }
+  
+  public RangeSplit getSplit() {
+    return split;
+  }
+}
+
+/**
+ * Adaptor class for unsorted InputSplit for table.
+ */
+class RowTableSplit extends InputSplit implements Writable{
+  /**
+     * 
+     */
+String path = null;
+  RowSplit split = null;
+  String[] hosts = null;
+  long length = 1;
+
+  public RowTableSplit(Reader reader, RowSplit split, Configuration conf)
+      throws IOException {
+    this.path = reader.getPath();
+    this.split = split;
+    BlockDistribution dataDist = reader.getBlockDistribution(split);
+    if (dataDist != null) {
+      length = dataDist.getLength();
+      hosts =
+          dataDist.getHosts(conf.getInt("mapred.lib.table.input.nlocation", 5));
+    }
+  }
+  
+  public RowTableSplit() {
+    // no-op for Writable construction
+  }
+  
+  @Override
+  public long getLength() throws IOException {
+    return length;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException {
+    return hosts;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    path = WritableUtils.readString(in);
+    int bool = WritableUtils.readVInt(in);
+    if (bool == 1) {
+      if (split == null) split = new RowSplit();
+      split.readFields(in);
+    }
+    else {
+      split = null;
+    }
+    hosts = WritableUtils.readStringArray(in);
+    length = WritableUtils.readVLong(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeString(out, path);
+    if (split == null) {
+      WritableUtils.writeVInt(out, 0);
+    }
+    else {
+      WritableUtils.writeVInt(out, 1);
+      split.write(out);
+    }
+    WritableUtils.writeStringArray(out, hosts);
+    WritableUtils.writeVLong(out, length);
+  }
+
+  public String getPath() {
+    return path;
+  }
+  
+  public RowSplit getSplit() {
+    return split;
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Adaptor class to implement RecordReader on top of Scanner.
+ */
+public class TableRecordReader extends RecordReader<BytesWritable, Tuple> {
+  private final TableScanner scanner;
+  private long count = 0;
+  private BytesWritable key = null;
+  private Tuple value = null;
+
+  /**
+   * 
+   * @param expr
+   *          Table expression
+   * @param projection
+   *          projection schema. Should never be null.
+   * @param split
+   *          the split to work on
+   * @param jobContext
+   *          JobContext object
+   * @throws IOException
+   */
+  public TableRecordReader(TableExpr expr, String projection,
+      InputSplit split, JobContext jobContext) throws IOException, ParseException {
+	Configuration conf = jobContext.getConfiguration();
+    if (expr.sortedSplitRequired()) {
+      SortedTableSplit tblSplit = (SortedTableSplit) split;
+      scanner =
+          expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection,
+              conf);
+    } else if (split != null && split instanceof RowTableSplit) {
+      RowTableSplit rowSplit = (RowTableSplit) split;
+      scanner = expr.getScanner(rowSplit, projection, conf);
+    }
+    else {
+      UnsortedTableSplit tblSplit = (UnsortedTableSplit) split;
+      scanner = expr.getScanner(tblSplit, projection, conf);
+    }
+  }
+  
+  @Override
+  public void close() throws IOException {
+    scanner.close();
+  }
+
+  public long getPos() throws IOException {
+    return count;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return  (float)((scanner.atEnd()) ? 1.0 : 0);
+  }
+  /**
+   * Seek to the position at the first row which has the key
+   * or just after the key; only applicable for sorted Zebra table
+   *
+   * @param key
+   *          the key to seek on
+   */
+  public boolean seekTo(BytesWritable key) throws IOException {
+    return scanner.seekTo(key);
+  }
+  
+  /**
+   * Check if the end of the input has been reached
+   *
+   * @return true if the end of the input is reached
+   */
+  public boolean atEnd() throws IOException {
+	  return scanner.atEnd();
+  }
+
+	@Override
+	public BytesWritable getCurrentKey() throws IOException, InterruptedException {
+		return key;
+	}
+	
+	@Override
+	public Tuple getCurrentValue() throws IOException, InterruptedException {
+		return value;
+	}
+	
+	@Override
+	public void initialize(org.apache.hadoop.mapreduce.InputSplit arg0,
+			TaskAttemptContext arg1) throws IOException, InterruptedException {
+		// no-op
+	}
+	
+	@Override
+	public boolean nextKeyValue() throws IOException, InterruptedException {
+	    if( scanner.atEnd() ) {
+	    	key = null;
+	    	value = null;
+	        return false;
+	    }
+	    
+	    if( key == null ) {
+	        key = new BytesWritable();
+	    }
+	    if (value == null) {
+	        value = TypesUtils.createTuple(Projection.getNumColumns(scanner.getProjection()));
+	    }
+	    scanner.getKey(key);
+	    scanner.getValue(value);
+	    scanner.advance();
+	    count++;
+	    return true;
+	}
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.BasicTableStatus;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Table expression supporting a union of BasicTables.
+ * 
+ * @see <a href="doc-files/examples/ReadTableUnion.java">Usage example for
+ *      UnionTableExpr</a>
+ */
+class TableUnionExpr extends CompositeTableExpr {
+  /**
+   * Add another BasicTable into the table-union.
+   * 
+   * @param expr
+   *          The expression for the BasicTable to be added.
+   * @return self.
+   */
+  public TableUnionExpr add(BasicTableExpr expr) {
+    super.addCompositeTable(expr);
+    return this;
+  }
+
+  /**
+   * Add an array of BasicTables into the table-union.
+   * 
+   * @param exprs
+   *          the expressions representing the BasicTables to be added.
+   * @return self.
+   */
+  public TableUnionExpr add(BasicTableExpr[] exprs) {
+    super.addCompositeTables(exprs);
+    return this;
+  }
+
+  /**
+   * Add a Collection of BasicTables into the table-union.
+   * 
+   * @param exprs
+   *          the expressions representing the BasicTables to be added.
+   * @return self.
+   */
+  public TableUnionExpr add(Collection<? extends BasicTableExpr> exprs) {
+    super.addCompositeTables(exprs);
+    return this;
+  }
+  
+  @Override
+  protected TableUnionExpr decodeParam(StringReader in) throws IOException {
+    super.decodeParam(in);
+    int n = composite.size();
+    for (int i = 0; i < n; ++i) {
+      if (!(composite.get(i) instanceof BasicTableExpr)) {
+        throw new RuntimeException("Not a BasicTableExpr");
+      }
+    }
+    return this;
+  }
+
+  @Override
+  protected TableUnionExpr encodeParam(StringBuilder out) {
+    super.encodeParam(out);
+    return this;
+  }
+
+  @Override
+  public TableScanner getScanner(BytesWritable begin, BytesWritable end,
+      String projection, Configuration conf) throws IOException {
+    int n = composite.size();
+    if (n==0) {
+      throw new IllegalArgumentException("Union of 0 table");
+    }
+    ArrayList<BasicTable.Reader> readers = new ArrayList<BasicTable.Reader>(n);
+    final ArrayList<BasicTableStatus> status =
+        new ArrayList<BasicTableStatus>(n);
+    for (int i = 0; i < n; ++i) {
+      BasicTableExpr expr = (BasicTableExpr) composite.get(i);
+      BasicTable.Reader reader =
+          new BasicTable.Reader(expr.getPath(), conf);
+      readers.add(reader);
+      status.add(reader.getStatus());
+    }
+
+    String actualProjection = projection;
+    if (actualProjection == null) {
+      // Perform a union on all column names.
+      LinkedHashSet<String> colNameSet = new LinkedHashSet<String>();
+      for (int i = 0; i < n; ++i) {
+        String[] cols = readers.get(i).getSchema().getColumns();
+        for (String col : cols) {
+          colNameSet.add(col);
+        }
+      }
+
+      actualProjection = 
+          Projection.getProjectionStr(colNameSet.toArray(new String[colNameSet.size()]));
+    }
+    
+    ArrayList<TableScanner> scanners = new ArrayList<TableScanner>(n);
+    try {
+      for (int i=0; i<n; ++i) {
+        BasicTable.Reader reader = readers.get(i);
+        reader.setProjection(actualProjection);
+        TableScanner scanner = readers.get(i).getScanner(begin, end, true);
+        scanners.add(scanner);
+      }
+    } catch (ParseException e) {
+    	throw new IOException("Projection parsing failed : "+e.getMessage());
+    }
+    
+    if (scanners.isEmpty()) {
+      return new NullScanner(actualProjection);
+    }
+
+    Integer[] virtualColumnIndices = Projection.getVirtualColumnIndices(projection);
+    if (virtualColumnIndices != null && n == 1)
+      throw new IllegalArgumentException("virtual column requires union of multiple tables");
+    return new SortedTableUnionScanner(scanners, Projection.getVirtualColumnIndices(projection));
+  }
+}
+
+/**
+ * Union scanner.
+ */
+class SortedTableUnionScanner implements TableScanner {
+  CachedTableScanner[] scanners;
+  PriorityBlockingQueue<CachedTableScanner> queue;
+  boolean synced = false;
+  boolean hasVirtualColumns = false;
+  Integer[] virtualColumnIndices = null;
+  CachedTableScanner scanner = null; // the working scanner
+
+  SortedTableUnionScanner(List<TableScanner> scanners, Integer[] vcolindices) throws IOException {
+    if (scanners.isEmpty()) {
+      throw new IllegalArgumentException("Zero-sized table union");
+    }
+    
+    this.scanners = new CachedTableScanner[scanners.size()];
+    queue =
+        new PriorityBlockingQueue<CachedTableScanner>(scanners.size(),
+            new Comparator<CachedTableScanner>() {
+
+              @Override
+              public int compare(CachedTableScanner o1, CachedTableScanner o2) {
+                try {
+                  return o1.getKey().compareTo(o2.getKey());
+                }
+                catch (IOException e) {
+                  throw new RuntimeException("IOException: " + e.toString());
+                }
+              }
+
+            });
+    
+    for (int i = 0; i < this.scanners.length; ++i) {
+      TableScanner scanner = scanners.get(i);
+      this.scanners[i] = new CachedTableScanner(scanner, i);
+    }
+    // initial fill-ins
+    if (!atEnd())
+    	scanner = queue.poll();
+    virtualColumnIndices = vcolindices;
+    hasVirtualColumns = (vcolindices != null && vcolindices.length != 0);
+  }
+  
+
+  private void sync() throws IOException {
+    if (synced == false) {
+      queue.clear();
+      for (int i = 0; i < scanners.length; ++i) {
+        if (!scanners[i].atEnd()) {
+          queue.add(scanners[i]);
+        }
+      }
+      synced = true;
+    }
+  }
+  
+  @Override
+  public boolean advance() throws IOException {
+    sync();
+    scanner.advance();
+    if (!scanner.atEnd()) {
+      queue.add(scanner);
+    }
+    scanner = queue.poll();
+    return (scanner != null);
+  }
+
+  @Override
+  public boolean atEnd() throws IOException {
+    sync();
+    return (scanner == null && queue.isEmpty());
+  }
+
+  @Override
+  public String getProjection() {
+    return scanners[0].getProjection();
+  }
+  
+  @Override
+  public Schema getSchema() {
+    return scanners[0].getSchema();
+  }
+
+  @Override
+  public void getKey(BytesWritable key) throws IOException {
+    if (atEnd()) {
+      throw new EOFException("No more rows to read");
+    }
+    
+    key.set(scanner.getKey());
+  }
+
+  @Override
+  public void getValue(Tuple row) throws IOException {
+    if (atEnd()) {
+      throw new EOFException("No more rows to read");
+    }
+  
+    Tuple tmp = scanner.getValue();
+    if (hasVirtualColumns)
+    {
+       for (int i = 0; i < virtualColumnIndices.length; i++)
+       {
+         tmp.set(virtualColumnIndices[i], scanner.getIndex());
+       }
+    }
+    row.reference(tmp);
+  }
+
+  @Override
+  public boolean seekTo(BytesWritable key) throws IOException {
+    boolean rv = false;
+    for (CachedTableScanner scanner : scanners) {
+      rv = rv || scanner.seekTo(key);
+    }
+    synced = false;
+    if (!atEnd())
+      scanner = queue.poll();
+    return rv;
+  }
+
+  @Override
+  public void seekToEnd() throws IOException {
+    for (CachedTableScanner scanner : scanners) {
+      scanner.seekToEnd();
+    }
+    scanner = null;
+    synced = false;
+  }
+
+  @Override
+  public void close() throws IOException {
+    for (CachedTableScanner scanner : scanners) {
+      scanner.close();
+    }
+    queue.clear();
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraOutputPartition.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraOutputPartition.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraOutputPartition.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraOutputPartition.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.pig.data.Tuple;
+
+
+public abstract class ZebraOutputPartition  implements Configurable{
+
+  protected  Configuration conf;	
+	
+  @Override
+  public void setConf( Configuration conf) {
+	  this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf( ) {
+	  return conf;
+  }
+  
+  public abstract int getOutputPartition(BytesWritable key, Tuple Value)
+  throws IOException;
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraProjection.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraProjection.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraProjection.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraProjection.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+/**
+ * A wrapper class for Projection.
+ */
+public class ZebraProjection {
+  private String projectionStr = null;
+
+  private ZebraProjection(String projStr) {
+    projectionStr = projStr;
+  }
+  
+  public static ZebraProjection createZebraProjection(String projStr) {
+    return new ZebraProjection(projStr);
+  }
+  
+  public String toString() {
+    return projectionStr;
+  }
+}
\ No newline at end of file

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSchema.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSchema.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSchema.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import org.apache.hadoop.zebra.schema.Schema;
+
+/**
+ * A wrapper class for Schema.
+ */
+public class ZebraSchema {
+  private String schemaStr = null;
+
+  private ZebraSchema(String str) {
+    String normalizedStr = Schema.normalize(str);
+    schemaStr = normalizedStr; 
+  }
+  
+  public static ZebraSchema createZebraSchema(String str) {
+    return new ZebraSchema(str);
+  }
+  
+  public String toString() {
+    return schemaStr;
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSortInfo.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSortInfo.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSortInfo.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraSortInfo.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.zebra.tfile.TFile;
+
+/**
+ * A wrapper class for SortInfo.
+ */
+public class ZebraSortInfo {
+  private String sortColumnsStr = null;
+  private String comparatorStr = null;
+  
+  private ZebraSortInfo(String sortColumns, Class<? extends RawComparator<Object>> comparatorClass) {
+    if (comparatorClass != null)
+      comparatorStr = TFile.COMPARATOR_JCLASS+comparatorClass.getName();
+    sortColumnsStr = sortColumns;
+  }
+  
+  public static ZebraSortInfo createZebraSortInfo(String sortColumns, Class<? extends RawComparator<Object>> comparatorClass) {
+    return new ZebraSortInfo(sortColumns, comparatorClass);
+  }
+  
+  public String getSortColumns() {
+    return sortColumnsStr;
+  }
+
+  public String getComparator() {
+    return comparatorStr;
+  }    
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraStorageHint.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraStorageHint.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraStorageHint.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/ZebraStorageHint.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+/**
+ * A wrapper class for StorageHint.
+ */
+public class ZebraStorageHint {
+  private String storageHintStr = null;
+
+  private ZebraStorageHint(String shStr) {
+    storageHintStr = shStr;
+  }
+  
+  public static ZebraStorageHint createZebraStorageHint(String shStr) {
+    return new ZebraStorageHint(shStr);
+  }
+  
+  public String toString() {
+    return storageHintStr;
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/package-info.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/package-info.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/package-info.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Providing {@link org.apache.hadoop.mapreduce.InputFormat} and
+ * {@link org.apache.hadoop.mapreduce.OutputFormat} adaptor classes for Hadoop
+ * Zebra Table.
+ * 
+ * This package is based on org.apache.hadoop.mapred with adaptation of Hadoop 20 API. The
+ * original package is now depreciated.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/package-info.java Sat Feb 13 00:06:15 2010
@@ -39,16 +39,16 @@
  * index-based access).
  * <p>
  * Typically, applications use
- * {@link org.apache.hadoop.zebra.mapred.BasicTableOutputFormat} (which implements
+ * {@link org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat} (which implements
  * the Hadoop {@link org.apache.hadoop.mapred.OutputFormat} interface) to create
  * BasicTables through MapReduce. And they use
- * {@link org.apache.hadoop.zebra.mapred.TableInputFormat} (which implements the
+ * {@link org.apache.hadoop.zebra.mapreduce.TableInputFormat} (which implements the
  * Hadoop {@link org.apache.hadoop.mapred.InputFormat} to feed the data as their
  * MapReduce input.
  * <p>
  * The API is structured in three packages:
  * <UL>
- * <LI> {@link org.apache.hadoop.zebra.mapred} : The MapReduce layer. It contains
+ * <LI> {@link org.apache.hadoop.zebra.mapreduce} : The MapReduce layer. It contains
  * two classes: BasicTableOutputFormat for creating BasicTable; and
  * TableInputFormat for readding table.
  * 

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java?rev=909667&r1=909666&r2=909667&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java Sat Feb 13 00:06:15 2010
@@ -18,98 +18,163 @@
 
 package org.apache.hadoop.zebra.pig;
 
-import java.util.Iterator;
-
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.schema.ColumnType;
+import org.apache.hadoop.zebra.schema.Schema.ColumnSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.data.DataType;
+import org.apache.pig.ResourceSchema;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 
 class SchemaConverter {
-  public static ColumnType toTableType(byte ptype)
-  {
-    ColumnType ret;
-    switch (ptype) {
-      case DataType.INTEGER:
-        ret = ColumnType.INT; 
-        break;
-      case DataType.LONG:
-        ret = ColumnType.LONG; 
-        break;
-      case DataType.FLOAT:
-        ret = ColumnType.FLOAT; 
-        break;
-      case DataType.DOUBLE:
-        ret = ColumnType.DOUBLE; 
-        break;
-      case DataType.BOOLEAN:
-        ret = ColumnType.BOOL; 
-        break;
-      case DataType.BAG:
-        ret = ColumnType.COLLECTION; 
-        break;
-      case DataType.MAP:
-        ret = ColumnType.MAP; 
-        break;
-      case DataType.TUPLE:
-        ret = ColumnType.RECORD; 
-        break;
-      case DataType.CHARARRAY:
-        ret = ColumnType.STRING; 
-        break;
-      case DataType.BYTEARRAY:
-        ret = ColumnType.BYTES; 
-        break;
-      default:
-        ret = null;
+    public static ColumnType toTableType(byte ptype) {
+        ColumnType ret;
+        switch (ptype) {
+        case DataType.INTEGER:
+            ret = ColumnType.INT; 
+            break;
+        case DataType.LONG:
+            ret = ColumnType.LONG; 
+            break;
+        case DataType.FLOAT:
+            ret = ColumnType.FLOAT; 
+            break;
+        case DataType.DOUBLE:
+            ret = ColumnType.DOUBLE; 
+            break;
+        case DataType.BOOLEAN:
+            ret = ColumnType.BOOL; 
+            break;
+        case DataType.BAG:
+            ret = ColumnType.COLLECTION; 
+            break;
+        case DataType.MAP:
+            ret = ColumnType.MAP; 
+            break;
+        case DataType.TUPLE:
+            ret = ColumnType.RECORD; 
+            break;
+        case DataType.CHARARRAY:
+            ret = ColumnType.STRING; 
+            break;
+        case DataType.BYTEARRAY:
+            ret = ColumnType.BYTES; 
+            break;
+        default:
+            ret = null;
         break;
+        }
+        return ret;
+    }
+
+    public static Schema toPigSchema(
+            org.apache.hadoop.zebra.schema.Schema tschema)
+    throws FrontendException {
+        Schema ret = new Schema();
+        for (String col : tschema.getColumns()) {
+            org.apache.hadoop.zebra.schema.Schema.ColumnSchema columnSchema = 
+                tschema.getColumn(col);
+            if (columnSchema != null) {
+                ColumnType ct = columnSchema.getType();
+                if (ct == org.apache.hadoop.zebra.schema.ColumnType.RECORD ||
+                        ct == org.apache.hadoop.zebra.schema.ColumnType.COLLECTION)
+                    ret.add(new FieldSchema(col, toPigSchema(columnSchema.getSchema()), ct.pigDataType()));
+                else
+                    ret.add(new FieldSchema(col, ct.pigDataType()));
+            } else {
+                ret.add(new FieldSchema(null, null));
+            }
+        }
+        return ret;
     }
-    return ret;
-  }
-  
-  public static Schema toPigSchema(
-      org.apache.hadoop.zebra.schema.Schema tschema)
-      throws FrontendException {
-    Schema ret = new Schema();
-    for (String col : tschema.getColumns()) {
-    	org.apache.hadoop.zebra.schema.Schema.ColumnSchema columnSchema = 
-    		tschema.getColumn(col);
-			if (columnSchema != null) {
-        ColumnType ct = columnSchema.getType();
-        if (ct == org.apache.hadoop.zebra.schema.ColumnType.RECORD ||
-            ct == org.apache.hadoop.zebra.schema.ColumnType.COLLECTION)
-          ret.add(new FieldSchema(col, toPigSchema(columnSchema.getSchema()), ct.pigDataType()));
-        else
-          ret.add(new FieldSchema(col, ct.pigDataType()));
-			} else {
-				ret.add(new FieldSchema(null, null));
-			}
-    }
-    return ret;
-  }
-
-  public static org.apache.hadoop.zebra.schema.Schema fromPigSchema(
-      Schema pschema) throws FrontendException, ParseException {
-    org.apache.hadoop.zebra.schema.Schema tschema = new org.apache.hadoop.zebra.schema.Schema();
-    Schema.FieldSchema columnSchema;
-    for (int i = 0; i < pschema.size(); i++) {
-    	columnSchema = pschema.getField(i);
-    	if (columnSchema != null) {
-    		if (DataType.isSchemaType(columnSchema.type))
-    			tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias, 
-    					fromPigSchema(columnSchema.schema), toTableType(columnSchema.type)));
-    		else if (columnSchema.type == DataType.MAP)
-    			tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias, 
-    					new org.apache.hadoop.zebra.schema.Schema(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(null, 
-    							org.apache.hadoop.zebra.schema.ColumnType.BYTES)), toTableType(columnSchema.type)));
-    		else
-    			tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias, toTableType(columnSchema.type)));
-		  } else {
-		  	tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(null, ColumnType.ANY));
-		  }
+
+    public static org.apache.hadoop.zebra.schema.Schema fromPigSchema(
+            Schema pschema) throws FrontendException, ParseException {
+        org.apache.hadoop.zebra.schema.Schema tschema = new org.apache.hadoop.zebra.schema.Schema();
+        Schema.FieldSchema columnSchema;
+        for (int i = 0; i < pschema.size(); i++) {
+            columnSchema = pschema.getField(i);
+            if (columnSchema != null) {
+                if (DataType.isSchemaType(columnSchema.type))
+                    tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias, 
+                            fromPigSchema(columnSchema.schema), toTableType(columnSchema.type)));
+                else if (columnSchema.type == DataType.MAP)
+                    tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias, 
+                            new org.apache.hadoop.zebra.schema.Schema(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(null, 
+                                    org.apache.hadoop.zebra.schema.ColumnType.BYTES)), toTableType(columnSchema.type)));
+                else
+                    tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(columnSchema.alias, toTableType(columnSchema.type)));
+            } else {
+                tschema.add(new org.apache.hadoop.zebra.schema.Schema.ColumnSchema(null, ColumnType.ANY));
+            }
+        }
+        return tschema;
     }
-    return tschema;
-  }
+
+    public static org.apache.hadoop.zebra.schema.Schema convertFromResourceSchema(ResourceSchema rSchema)
+    throws ParseException {
+        if( rSchema == null )
+            return null;
+
+        org.apache.hadoop.zebra.schema.Schema schema = new org.apache.hadoop.zebra.schema.Schema();
+        ResourceSchema.ResourceFieldSchema[] fields = rSchema.getFields();
+        for( ResourceSchema.ResourceFieldSchema field : fields ) {
+            String name = field.getName();
+            ColumnType type = toTableType( field.getType() );
+            org.apache.hadoop.zebra.schema.Schema cSchema = convertFromResourceSchema( field.getSchema() );
+            if( type == ColumnType.MAP && cSchema == null ) {
+                cSchema = new org.apache.hadoop.zebra.schema.Schema();
+                cSchema.add( new org.apache.hadoop.zebra.schema.Schema.ColumnSchema( "", ColumnType.BYTES ) );
+            }
+            org.apache.hadoop.zebra.schema.Schema.ColumnSchema columnSchema = 
+                new org.apache.hadoop.zebra.schema.Schema.ColumnSchema( name, cSchema, type );
+            schema.add( columnSchema );
+        }
+
+        return schema;
+    }
+
+    public static ResourceSchema convertToResourceSchema(org.apache.hadoop.zebra.schema.Schema tSchema) {
+        if( tSchema == null )
+            return null;
+
+        ResourceSchema rSchema = new ResourceSchema();
+        int fieldCount = tSchema.getNumColumns();
+        ResourceFieldSchema[] rFields = new ResourceFieldSchema[fieldCount];
+        for( int i = 0; i < fieldCount; i++ ) {
+            org.apache.hadoop.zebra.schema.Schema.ColumnSchema cSchema = tSchema.getColumn( i );
+            if( cSchema != null ) 
+                rFields[i] = convertToResourceFieldSchema( cSchema );
+            else
+                rFields[i] = new ResourceFieldSchema();
+        }
+        rSchema.setFields( rFields );
+        return rSchema;
+    }
+
+    public static ResourceFieldSchema convertToResourceFieldSchema(
+            ColumnSchema cSchema) {
+        ResourceFieldSchema field = new ResourceFieldSchema();
+
+        if( cSchema.getType() == ColumnType.COLLECTION && cSchema.getSchema().getNumColumns() > 1 ) {
+            field.setType( ColumnType.RECORD.pigDataType() );
+            field.setSchema( convertToResourceSchema( cSchema.getSchema() ) );
+        } else if( cSchema.getType() ==ColumnType.ANY && cSchema.getName().isEmpty() ) { // For anonymous column
+            field.setName( null );
+            field.setType(  DataType.UNKNOWN );
+            field.setSchema( null );
+        } else {
+            field.setName( cSchema.getName() );
+            field.setType( cSchema.getType().pigDataType() );
+            if( cSchema.getType() == ColumnType.MAP )
+                field.setSchema( null );
+            else
+                field.setSchema( convertToResourceSchema( cSchema.getSchema() ) );
+        }
+
+        return field;
+    }
+
 }