You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/08/12 00:27:47 UTC

svn commit: r803312 [4/16] - in /hadoop/pig/trunk: ./ contrib/zebra/ contrib/zebra/docs/ contrib/zebra/src/ contrib/zebra/src/java/ contrib/zebra/src/java/org/ contrib/zebra/src/java/org/apache/ contrib/zebra/src/java/org/apache/hadoop/ contrib/zebra/s...

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+//import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.mapred.TableInputFormat;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.Slice;
+import org.apache.pig.Slicer;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Pig LoadFunc and Slicer for Table
+ */
+public class TableLoader implements LoadFunc, Slicer {
+	static final Log LOG = LogFactory.getLog(TableLoader.class);
+	private TableInputFormat inputFormat;
+	private JobConf jobConf;
+	private String projectionString;
+	private Path[] paths;
+  
+	/**
+	 * default constructor
+	 */
+	public TableLoader() {
+		inputFormat = new TableInputFormat();
+	}
+
+	/**
+	 * @param projectionStr
+	 * 		  projection string passed from pig query.
+	 */
+	public TableLoader(String projectionStr) {
+		inputFormat = new TableInputFormat();
+		projectionString = projectionStr;	  
+	}
+
+	@Override
+	public void bindTo(String fileName, BufferedPositionedInputStream is,
+			long offset, long end) throws IOException {
+		throw new IOException("Not implemented");
+	}
+
+	/**
+	 * @param storage
+	 * @param location
+	 *        The location format follows the same convention as
+	 *        FileInputFormat's comma-separated multiple path format.
+	 * @throws IOException
+	*/
+	private void checkConf(DataStorage storage, String location) throws IOException {
+		if (jobConf == null) {
+			Configuration conf =
+				ConfigurationUtil.toConfiguration(storage.getConfiguration());
+			jobConf = new JobConf(conf);
+			jobConf.setInputFormat(TableInputFormat.class);
+			// TODO: the following code may better be moved to TableInputFormat.
+			// Hack: use FileInputFormat to decode comma-separated multiple path
+			// format.
+			
+			FileInputFormat.setInputPaths(jobConf, location);
+			paths = FileInputFormat.getInputPaths(jobConf);
+			
+			/**
+			 * Performing glob pattern matching
+			 */
+			List<Path> result = new ArrayList<Path>(paths.length);
+			for (Path p : paths) {
+				FileSystem fs = p.getFileSystem(jobConf);
+				FileStatus[] matches = fs.globStatus(p);
+				if (matches == null) {
+					LOG.warn("Input path does not exist: " + p);
+				}
+				else if (matches.length == 0) {
+					LOG.warn("Input Pattern " + p + " matches 0 files");
+				} else {
+					for (FileStatus globStat: matches) {
+						if (globStat.isDir()) {
+							result.add(globStat.getPath());
+						} else {
+							LOG.warn("Input path " + p + " is not a directory");
+						}
+					}
+				}
+			}
+			if (result.isEmpty()) {
+				throw new IOException("No table specified for input");
+			}
+			
+			LOG.info("Total input tables to process : " + result.size()); 
+			TableInputFormat.setInputPaths(jobConf, result.toArray(new Path[result.size()]));
+			try {
+				if (projectionString != null) {    		  
+					TableInputFormat.setProjection(jobConf, projectionString);
+				}
+			} catch (ParseException e) {
+				throw new RuntimeException("Schema parsing failed : "+e.getMessage());
+			}
+		}
+	}
+  
+	@Override
+	public Schema determineSchema(String fileName, ExecType execType,
+			DataStorage storage) throws IOException {
+		checkConf(storage, fileName);
+
+		Projection projection;
+		org.apache.hadoop.zebra.types.Schema projectionSchema;
+		
+		if (!fileName.contains(",")) { // one table;
+			org.apache.hadoop.zebra.types.Schema tschema = BasicTable.Reader.getSchema(new Path(fileName), jobConf);
+			try {
+				projection = new org.apache.hadoop.zebra.types.Projection(tschema, TableInputFormat.getProjection(jobConf));
+				projectionSchema = projection.getProjectionSchema();
+			} catch (ParseException e) {
+				throw new IOException("Schema parsing failed : "+e.getMessage());
+			}
+		} else { // table union;
+			org.apache.hadoop.zebra.types.Schema unionSchema = new org.apache.hadoop.zebra.types.Schema();
+			for (Path p : paths) {
+				org.apache.hadoop.zebra.types.Schema schema = BasicTable.Reader.getSchema(p, jobConf);
+				try {
+					unionSchema.unionSchema(schema);
+				} catch (ParseException e) {
+					throw new IOException(e.getMessage());
+				}
+			}
+			
+			try {
+				projection = new org.apache.hadoop.zebra.types.Projection(unionSchema, TableInputFormat.getProjection(jobConf));
+				projectionSchema = projection.getProjectionSchema();
+			} catch (ParseException e) {
+				throw new IOException("Schema parsing failed : "+e.getMessage());
+			}
+		}		
+    
+		if (projectionSchema == null) {
+			throw new IOException("Cannot determine table projection schema");
+		}
+    
+		try {
+			return SchemaConverter.toPigSchema(projectionSchema);
+		} catch (FrontendException e) {
+			throw new IOException("FrontendException", e);
+		}
+	}
+
+	@Override
+	public void fieldsToRead(Schema schema) {
+		// chaow: this function never gets triggered in pig loader/storer test cases;
+	  		
+		System.out.println("*************************fieldsToRead is invoked.");
+		try {
+		// TODO
+		//TableInputFormat.setProjection(jobConf, SchemaConverter.fromPigSchema(
+		//   schema)
+		//   .toString());
+		// chaow
+			//Assert.assertEquals(schema.getFields().size(), projection.getColumns().length);
+			//TableInputFormat.setProjection(jobConf, projection.toString());
+			TableInputFormat.setProjection(jobConf, projectionString);
+		} catch (ParseException e) {
+			throw new RuntimeException("Schema parsing failed : "+e.getMessage());
+		}
+	}
+
+	@Override
+	public Tuple getNext() throws IOException {
+		throw new IOException("Not implemented");
+	}
+
+	@Override
+	public Slice[] slice(DataStorage store, String location) throws IOException {
+		checkConf(store, location);
+		// TableInputFormat accepts numSplits < 0 (special case for no-hint)
+		InputSplit[] splits = inputFormat.getSplits(jobConf, -1);
+		Slice[] slices = new Slice[splits.length];
+		for (int nx = 0; nx < slices.length; nx++) {
+			slices[nx] = new TableSlice(jobConf, splits[nx]);
+		}
+
+		return slices;
+	}
+
+	@Override
+	public void validate(DataStorage store, String location) throws IOException {
+		checkConf(store, location);
+	}
+  
+	static class TableSlice implements Slice {
+		private static final long serialVersionUID = 1L;
+		private static final Class[] emptyArray = new Class[] {};
+    
+		private TreeMap<String, String> configMap;
+		private InputSplit split;
+    
+		transient private JobConf conf;
+		transient private String projection;
+		transient private RecordReader<BytesWritable, Tuple> scanner;
+		transient private BytesWritable key;
+
+		TableSlice(JobConf conf, InputSplit split) {
+			// hack: expecting JobConf contains nothing but a <string, string>
+			// key-value pair store.
+			configMap = new TreeMap<String, String>();
+			for (Iterator<Map.Entry<String, String>> it = conf.iterator(); it.hasNext();) {
+				Map.Entry<String, String> e = it.next();
+				configMap.put(e.getKey(), e.getValue());
+			}
+			this.split = split;
+		}
+
+		@Override
+		public void close() throws IOException {
+			if (scanner == null) {
+				throw new IOException("Slice not initialized");
+			}
+			scanner.close();
+		}
+
+		@Override
+		public long getLength() {
+			try {
+				return split.getLength();
+			} catch (IOException e) {
+				throw new RuntimeException("IOException", e);
+			}
+		}
+
+		@Override
+		public String[] getLocations() {
+			try {
+				return split.getLocations();
+			} catch (IOException e) {
+				throw new RuntimeException("IOException", e);
+			}
+		}
+
+		@Override
+		public long getPos() throws IOException {
+			if (scanner == null) {
+				throw new IOException("Slice not initialized");
+			}
+			return scanner.getPos();
+		}
+
+		@Override
+		public float getProgress() throws IOException {
+			if (scanner == null) {
+				throw new IOException("Slice not initialized");
+			}
+			return scanner.getProgress();
+		}
+
+		@Override
+		public long getStart() {
+			return 0;
+		}
+
+		@Override
+		public void init(DataStorage store) throws IOException {
+			Configuration localConf = new Configuration();
+			for (Iterator<Map.Entry<String, String>> it =
+				configMap.entrySet().iterator(); it.hasNext();) {
+				Map.Entry<String, String> e = it.next();
+				localConf.set(e.getKey(), e.getValue());
+			}
+			conf = new JobConf(localConf);
+			try
+			{
+				projection = TableInputFormat.getProjection(conf);
+			} catch (ParseException e) {
+				throw new IOException("Schema parsing failed :"+e.getMessage());
+			}
+			TableInputFormat inputFormat = new TableInputFormat();
+			scanner = inputFormat.getRecordReader(split, conf, Reporter.NULL);
+			key = new BytesWritable();
+		}
+
+		@Override
+		public boolean next(Tuple value) throws IOException {
+			TypesUtils.formatTuple(value, projection);
+			return scanner.next(key, value);
+		}
+    
+		private void writeObject(ObjectOutputStream out) throws IOException {
+			out.writeObject(configMap);
+			out.writeObject(split.getClass().getName());
+			split.write(out);
+		} 
+    
+		@SuppressWarnings("unchecked")
+		private void readObject(ObjectInputStream in) throws IOException,
+        	ClassNotFoundException {
+			configMap = (TreeMap<String, String>) in.readObject();
+			String className = (String) in.readObject();
+			Class<InputSplit> clazz = (Class<InputSplit>) Class.forName(className);
+			try {
+				Constructor<InputSplit> meth = clazz.getDeclaredConstructor(emptyArray);
+				meth.setAccessible(true);
+				split = meth.newInstance();
+			} catch (Exception e) {
+				throw new ClassNotFoundException("Cannot create instance", e);
+			}
+			split.readFields(in);
+		}
+	}
+
+	@Override
+	public DataBag bytesToBag(byte[] b) throws IOException {
+		throw new IOException("Not implemented");
+	}
+
+	@Override
+	public String bytesToCharArray(byte[] b) throws IOException {
+		throw new IOException("Not implemented");
+	}
+
+	@Override
+	public Double bytesToDouble(byte[] b) throws IOException {
+		throw new IOException("Not implemented");
+	}
+
+	@Override
+	public Float bytesToFloat(byte[] b) throws IOException {
+		throw new IOException("Not implemented");
+	}
+
+	@Override
+	public Integer bytesToInteger(byte[] b) throws IOException {
+		throw new IOException("Not implemented");
+	}
+
+	@Override
+	public Long bytesToLong(byte[] b) throws IOException {
+		throw new IOException("Not implemented");
+	}
+
+	public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+		throw new IOException("Not implemented");
+	}
+
+	@Override
+	public Tuple bytesToTuple(byte[] b) throws IOException {
+		throw new IOException("Not implemented");
+	}
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.pig.StoreConfig;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.Tuple;
+
+public class TableStorer implements StoreFunc {
+	private String schemaString;
+	private String storageHintString;
+
+	public TableStorer() {	  
+	}
+
+	public TableStorer(String schemaStr, String storageHintStr) throws ParseException, IOException {
+		schemaString = schemaStr;
+		storageHintString = storageHintStr;
+	}
+  
+	@Override
+	public void bindTo(OutputStream os) throws IOException {
+		// no op
+	}
+
+	@Override
+	public void finish() throws IOException {
+		// no op
+	}
+
+	@Override
+	public void putNext(Tuple f) throws IOException {
+		// no op
+	}
+
+	@Override
+	public Class getStorePreparationClass() throws IOException {
+		return TableOutputFormat.class;
+	}
+
+	public String getSchemaString() {
+		return schemaString;  
+	}
+  
+	public String getStorageHintString() {
+		return storageHintString;  
+	}
+  
+	private static final Class[] emptyArray = new Class[] {};
+
+	static public void main(String[] args) throws SecurityException, NoSuchMethodException {
+		Constructor meth = TableOutputFormat.class.getDeclaredConstructor(emptyArray);
+	}
+}
+
+/**
+ * 
+ * Table OutputFormat
+ * 
+ */
+class TableOutputFormat implements OutputFormat<BytesWritable, Tuple> {
+	@Override
+	public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+		StoreConfig storeConfig = MapRedUtil.getStoreConfig(job);
+		String location = storeConfig.getLocation();
+		TableStorer storeFunc = (TableStorer)MapRedUtil.getStoreFunc(job);   
+		BasicTable.Writer writer = new BasicTable.Writer(new Path(location), 
+				storeFunc.getSchemaString(), storeFunc.getStorageHintString(), false, job);
+		writer.finish();
+	}
+    
+	@Override
+	public RecordWriter<BytesWritable, Tuple> getRecordWriter(FileSystem ignored,
+			JobConf job, String name, Progressable progress) throws IOException {
+		return new TableRecordWriter(name, job);
+	}
+}
+
+/**
+ * 
+ * Table RecordWriter
+ * 
+ */
+class TableRecordWriter implements RecordWriter<BytesWritable, Tuple> {
+	final private BytesWritable KEY0 = new BytesWritable(new byte[0]); 
+	private BasicTable.Writer writer;
+	private TableInserter inserter;
+
+	public TableRecordWriter(String name, JobConf conf) throws IOException {
+		StoreConfig storeConfig = MapRedUtil.getStoreConfig(conf);
+		String location = storeConfig.getLocation();
+
+		// TODO: how to get? 1) column group splits; 2) flag of sorted-ness,
+		// compression, etc.
+		writer = new BasicTable.Writer(new Path(location), conf);
+		inserter = writer.getInserter(name, false);
+	}
+
+	@Override
+	public void close(Reporter reporter) throws IOException {
+		inserter.close();
+		writer.finish();
+	}
+
+	@Override
+	public void write(BytesWritable key, Tuple value) throws IOException {
+		if (key == null) {
+			key = KEY0;
+		}
+		inserter.insert(key, value);
+	}
+}
\ No newline at end of file

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.types;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.file.tfile.Utils.Version;
+import org.apache.hadoop.zebra.types.Schema;
+
+  /**
+   * ColumnGroup Schema. This object is first written to a schema file when the
+   * ColumnGroup is initially created, and is used to communicate meta
+   * information among writers.
+   */
+public class CGSchema {
+	private Version version;
+   private boolean sorted;
+   private String comparator;
+   private Schema schema;
+   private String compressor = "lzo2";
+   private String serializer = "pig";
+   // tmp schema file name, used as a flag of unfinished CG
+   private final static String SCHEMA_FILE = ".schema";
+   private final static String DEFAULT_COMPARATOR = "memcmp";
+	// schema version, should be same as BasicTable's most of the time
+   private final static Version SCHEMA_VERSION =
+     new Version((short) 1, (short) 0);
+
+   @Override
+   public String toString() {
+     StringBuilder sb = new StringBuilder();
+     sb.append("{Compressor = ");
+     sb.append(compressor);
+     sb.append("}\n");
+     sb.append("{Serializer = ");
+     sb.append(serializer);
+     sb.append("}\n");
+     sb.append(schema.toString());
+     
+     return sb.toString();
+   }
+   
+   public static Path makeFilePath(Path parent) {
+     return new Path(parent, SCHEMA_FILE);
+   }
+
+   public static CGSchema load(FileSystem fs, Path parent) throws IOException, ParseException {
+     if (!exists(fs, parent)) return null;
+     CGSchema ret = new CGSchema();
+     ret.read(fs, parent);
+     return ret;
+   }
+
+   public CGSchema() {
+     this.version = SCHEMA_VERSION;
+   }
+
+   public CGSchema(Schema schema, boolean sorted) {
+     this.sorted = sorted;
+     this.comparator = (sorted) ? DEFAULT_COMPARATOR : "";
+     this.schema = schema;
+     this.version = SCHEMA_VERSION;
+   }
+
+   public CGSchema(Schema schema, boolean sorted, String serializer, String compressor) {
+  	this(schema, sorted);
+  	this.serializer = serializer;
+  	this.compressor = compressor;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+     CGSchema other = (CGSchema) obj;
+     return this.sorted == other.sorted
+         && this.comparator.equals(other.comparator)
+         && this.schema.equals(other.schema);
+   }
+
+   public static boolean exists(FileSystem fs, Path parent) {
+     try {
+       return fs.exists(makeFilePath(parent));
+     }
+     catch (IOException e) {
+       return false;
+     }
+   }
+
+   public static void drop(FileSystem fs, Path parent) throws IOException {
+     fs.delete(makeFilePath(parent), true);
+   }
+
+   public boolean isSorted() {
+     return sorted;
+   }
+
+   public String getComparator() {
+     return comparator;
+   }
+
+   public String getSerializer() {
+     return serializer;
+   }
+
+   public String getCompressor() {
+     return compressor;
+   }
+
+   public void create(FileSystem fs, Path parent) throws IOException {
+     FSDataOutputStream outSchema = fs.create(makeFilePath(parent), false);
+	  version.write(outSchema);
+     WritableUtils.writeString(outSchema, schema.toString());
+     WritableUtils.writeVInt(outSchema, sorted ? 1 : 0);
+     WritableUtils.writeString(outSchema, comparator);
+     outSchema.close();
+   }
+
+ public  void read(FileSystem fs, Path parent) throws IOException, ParseException {
+     FSDataInputStream in = fs.open(makeFilePath(parent));
+	  version = new Version(in);
+     // verify compatibility against SCHEMA_VERSION
+     if (!version.compatibleWith(SCHEMA_VERSION)) {
+       new IOException("Incompatible versions, expecting: " + SCHEMA_VERSION
+            + "; found in file: " + version);
+     }
+     String s = WritableUtils.readString(in);
+     schema = new Schema(s);
+     sorted = WritableUtils.readVInt(in) == 1 ? true : false;
+     comparator = WritableUtils.readString(in);
+     in.close();
+   }
+
+   public Schema getSchema() {
+     return schema;
+   }
+ }

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ColumnType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ColumnType.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ColumnType.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ColumnType.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.file.tfile.Utils;
+import org.apache.pig.data.DataType;
+
+public enum ColumnType implements Writable {
+  ANY("any") {
+	  public byte pigDataType() {
+		  return DataType.UNKNOWN;
+	  }
+  },
+  INT("int") {
+    public byte pigDataType() {
+      return DataType.INTEGER;
+    }
+  },
+  LONG("long") {
+    public byte pigDataType() {
+      return DataType.LONG;
+    }
+  },
+  FLOAT("float") {
+    public byte pigDataType() {
+      return DataType.FLOAT;
+    }
+  },
+  DOUBLE("double") {
+    public byte pigDataType() {
+      return DataType.DOUBLE;
+    }
+  },
+  BOOL("bool") {
+    public byte pigDataType() {
+      return DataType.BOOLEAN;
+    }
+  },
+  COLLECTION("collection") {
+    public byte pigDataType() {
+      return DataType.BAG;
+    }
+  },
+  MAP("map") {
+    public byte pigDataType() {
+      return DataType.MAP;
+    }
+  },
+  RECORD("record") {
+    public byte pigDataType() {
+      return DataType.TUPLE;
+    }
+  },
+  STRING("string") {
+    public byte pigDataType() {
+      return DataType.CHARARRAY;
+    }
+  },
+  BYTES("bytes") {
+    public byte pigDataType() {
+      return DataType.BYTEARRAY;
+    }
+  };
+
+  private String name;
+
+  private ColumnType(String name) {
+    this.name = name;
+  }
+
+  public abstract byte pigDataType();
+
+  /**
+   * To get the type based on the type name string.
+   * 
+   * @param name name of the type
+   * 
+   * @return ColumnType Enum for the type
+   */
+  public static ColumnType getTypeByName(String name) {
+    return ColumnType.valueOf(name.toUpperCase());
+  }
+
+  public static ColumnType getTypeByPigDataType(byte dt) {
+    for (ColumnType ct : ColumnType.values()) {
+      if (ct.pigDataType() == dt) {
+        return ct;
+      }
+    }
+    return null;
+  }
+
+  public static String findTypeName(ColumnType columntype) {
+	  return columntype.getName();
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String toString() {
+    return name;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    // no op, instantiated by the caller
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Utils.writeString(out, name);
+  }
+
+  public static boolean isSchemaType(ColumnType columnType) {
+	  return ((columnType == RECORD) || (columnType == MAP) || (columnType == COLLECTION));
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/FieldType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/FieldType.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/FieldType.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/FieldType.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.file.tfile.Utils;
+import org.apache.pig.data.DataType;
+
+public enum FieldType implements Writable {
+  INT("int") {
+    public byte pigDataType() {
+      return DataType.INTEGER;
+    }
+  },
+
+  LONG("long") {
+    public byte pigDataType() {
+      return DataType.LONG;
+    }
+  },
+
+  FLOAT("float") {
+    public byte pigDataType() {
+      return DataType.FLOAT;
+    }
+  },
+
+  DOUBLE("double") {
+    public byte pigDataType() {
+      return DataType.DOUBLE;
+    }
+  },
+
+  BOOL("bool") {
+    public byte pigDataType() {
+      return DataType.BOOLEAN;
+    }
+  },
+
+  COLLECTION("collection") {
+    public byte pigDataType() {
+      return DataType.BAG;
+    }
+
+    public boolean isNested() {
+      return true;
+    }
+
+    public int getMaxNumsNestedField() {
+      return MAX_NUM_FIELDS;
+    }
+  },
+
+  MAP("map") {
+    public byte pigDataType() {
+      return DataType.MAP;
+    }
+
+    public boolean isNested() {
+      return true;
+    }
+
+    public int getMaxNumsNestedField() {
+      // map can only contain one field for values
+      return 1;
+    }
+  },
+
+  RECORD("record") {
+    public byte pigDataType() {
+      return DataType.TUPLE;
+    }
+
+    public boolean isNested() {
+      return true;
+    }
+
+    public int getMaxNumsNestedField() {
+      return MAX_NUM_FIELDS;
+    }
+  },
+
+  STRING("string") {
+    public byte pigDataType() {
+      return DataType.CHARARRAY;
+    }
+  },
+
+  BYTES("bytes") {
+    public byte pigDataType() {
+      return DataType.BIGCHARARRAY;
+    }
+  };
+
+  public /*final*/ static int MAX_NUM_FIELDS = 4096;
+
+  private static Map<Byte, FieldType> mapPigDT2This =
+      new HashMap<Byte, FieldType>(FieldType.values().length);
+
+  private String name;
+
+  static {
+    for (FieldType t : FieldType.values()) {
+      mapPigDT2This.put(t.pigDataType(), t);
+    }
+  }
+
+  private FieldType(String name) {
+    this.name = name;
+  }
+
+  public abstract byte pigDataType();
+
+  /**
+   * To get the type based on the name of the type.
+   * 
+   * @param name name fo the type
+   * 
+   * @return FieldType Enum for the type
+   */
+  public static FieldType getTypeByName(String name) {
+    return FieldType.valueOf(name.toUpperCase());
+  }
+
+  public static FieldType getTypeByPigDataType(byte dt) {
+    return mapPigDT2This.get(dt);
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String toString() {
+    return name;
+  }
+
+  public boolean isNested() {
+    return false;
+  }
+
+  public int getMaxNumsNestedField() {
+    return 0;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    // no op, instantiated by the caller
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Utils.writeString(out, name);
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ParseException.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ParseException.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ParseException.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ParseException.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Generated By:JavaCC: Do not edit this line. ParseException.java Version 4.1 */
+/* JavaCCOptions:KEEP_LINE_COL=null */
+package org.apache.hadoop.zebra.types;
+
+/**
+ * This exception is thrown when parse errors are encountered.
+ * You can explicitly create objects of this exception type by
+ * calling the method generateParseException in the generated
+ * parser.
+ *
+ * You can modify this class to customize your error reporting
+ * mechanisms so long as you retain the public fields.
+ */
+public class ParseException extends Exception {
+
+  /**
+   * The version identifier for this Serializable class.
+   * Increment only if the <i>serialized</i> form of the
+   * class changes.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * This constructor is used by the method "generateParseException"
+   * in the generated parser.  Calling this constructor generates
+   * a new object of this type with the fields "currentToken",
+   * "expectedTokenSequences", and "tokenImage" set.
+   */
+  public ParseException(Token currentTokenVal,
+                        int[][] expectedTokenSequencesVal,
+                        String[] tokenImageVal
+                       )
+  {
+    super(initialise(currentTokenVal, expectedTokenSequencesVal, tokenImageVal));
+    currentToken = currentTokenVal;
+    expectedTokenSequences = expectedTokenSequencesVal;
+    tokenImage = tokenImageVal;
+  }
+
+  /**
+   * The following constructors are for use by you for whatever
+   * purpose you can think of.  Constructing the exception in this
+   * manner makes the exception behave in the normal way - i.e., as
+   * documented in the class "Throwable".  The fields "errorToken",
+   * "expectedTokenSequences", and "tokenImage" do not contain
+   * relevant information.  The JavaCC generated code does not use
+   * these constructors.
+   */
+
+  public ParseException() {
+    super();
+  }
+
+  /** Constructor with message. */
+  public ParseException(String message) {
+    super(message);
+  }
+
+
+  /**
+   * This is the last token that has been consumed successfully.  If
+   * this object has been created due to a parse error, the token
+   * followng this token will (therefore) be the first error token.
+   */
+  public Token currentToken;
+
+  /**
+   * Each entry in this array is an array of integers.  Each array
+   * of integers represents a sequence of tokens (by their ordinal
+   * values) that is expected at this point of the parse.
+   */
+  public int[][] expectedTokenSequences;
+
+  /**
+   * This is a reference to the "tokenImage" array of the generated
+   * parser within which the parse error occurred.  This array is
+   * defined in the generated ...Constants interface.
+   */
+  public String[] tokenImage;
+
+  /**
+   * It uses "currentToken" and "expectedTokenSequences" to generate a parse
+   * error message and returns it.  If this object has been created
+   * due to a parse error, and you do not catch it (it gets thrown
+   * from the parser) the correct error message
+   * gets displayed.
+   */
+  private static String initialise(Token currentToken,
+                           int[][] expectedTokenSequences,
+                           String[] tokenImage) {
+    String eol = System.getProperty("line.separator", "\n");
+    StringBuffer expected = new StringBuffer();
+    int maxSize = 0;
+    for (int i = 0; i < expectedTokenSequences.length; i++) {
+      if (maxSize < expectedTokenSequences[i].length) {
+        maxSize = expectedTokenSequences[i].length;
+      }
+      for (int j = 0; j < expectedTokenSequences[i].length; j++) {
+        expected.append(tokenImage[expectedTokenSequences[i][j]]).append(' ');
+      }
+      if (expectedTokenSequences[i][expectedTokenSequences[i].length - 1] != 0) {
+        expected.append("...");
+      }
+      expected.append(eol).append("    ");
+    }
+    String retval = "Encountered \"";
+    Token tok = currentToken.next;
+    for (int i = 0; i < maxSize; i++) {
+      if (i != 0) retval += " ";
+      if (tok.kind == 0) {
+        retval += tokenImage[0];
+        break;
+      }
+      retval += " " + tokenImage[tok.kind];
+      retval += " \"";
+      retval += add_escapes(tok.image);
+      retval += " \"";
+      tok = tok.next;
+    }
+    retval += "\" at line " + currentToken.next.beginLine + ", column " + currentToken.next.beginColumn;
+    retval += "." + eol;
+    if (expectedTokenSequences.length == 1) {
+      retval += "Was expecting:" + eol + "    ";
+    } else {
+      retval += "Was expecting one of:" + eol + "    ";
+    }
+    retval += expected.toString();
+    return retval;
+  }
+
+  /**
+   * The end of line string for this machine.
+   */
+  protected String eol = System.getProperty("line.separator", "\n");
+
+  /**
+   * Used to convert raw characters to their escaped version
+   * when these raw version cannot be used as part of an ASCII
+   * string literal.
+   */
+  static String add_escapes(String str) {
+      StringBuffer retval = new StringBuffer();
+      char ch;
+      for (int i = 0; i < str.length(); i++) {
+        switch (str.charAt(i))
+        {
+           case 0 :
+              continue;
+           case '\b':
+              retval.append("\\b");
+              continue;
+           case '\t':
+              retval.append("\\t");
+              continue;
+           case '\n':
+              retval.append("\\n");
+              continue;
+           case '\f':
+              retval.append("\\f");
+              continue;
+           case '\r':
+              retval.append("\\r");
+              continue;
+           case '\"':
+              retval.append("\\\"");
+              continue;
+           case '\'':
+              retval.append("\\\'");
+              continue;
+           case '\\':
+              retval.append("\\\\");
+              continue;
+           default:
+              if ((ch = str.charAt(i)) < 0x20 || ch > 0x7e) {
+                 String s = "0000" + Integer.toString(ch, 16);
+                 retval.append("\\u" + s.substring(s.length() - 4, s.length()));
+              } else {
+                 retval.append(ch);
+              }
+              continue;
+        }
+      }
+      return retval.toString();
+   }
+
+}
+/* JavaCC - OriginalChecksum=90dec5ae3b18277c2181d26949c6c608 (do not edit this line) */

Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,1299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.types;
+
+import org.apache.pig.data.Tuple;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.io.StringReader;
+import org.apache.hadoop.io.BytesWritable;
+import java.io.IOException;
+import org.apache.pig.backend.executionengine.ExecException;
+
+/**
+ * Partition is a column group management class. Its responsibilities include
+ * storing column groups, handle column groups spliting and stitching by
+ * insertions and queries respectively.
+ */
+public class Partition {
+  public enum SplitType {
+    NONE, RECORD, COLLECTION, MAP
+  }
+
+  /*
+   * This class holds the column group information as generated by
+   * TableStorageParser.
+   */
+  class PartitionInfo {
+    private Map<Schema.ColumnSchema, PartitionFieldInfo> fieldMap =
+        new HashMap<Schema.ColumnSchema, PartitionFieldInfo>();
+    private Map<String, HashSet<ColumnMappingEntry>> mColMap =
+        new HashMap<String, HashSet<ColumnMappingEntry>>();
+    private Schema mSchema;
+
+    public PartitionInfo(Schema schema) {
+      mSchema = schema;
+    }
+    
+    public Map<String, HashSet<ColumnMappingEntry>> getColMap() {
+        return mColMap;
+    }
+
+    /*
+     * holds a mapping between a column in the table schema and its
+     * corresponding (sub)column group index and the field index inside the
+     * column group
+     */
+    class ColumnMappingEntry implements Comparable<ColumnMappingEntry>{
+      private int cgIndex = -1, fieldIndex = -1;
+      private Schema.ColumnSchema fs;
+      private HashSet<String> keySet;
+
+      public ColumnMappingEntry(int ri, int fi, Schema.ColumnSchema fs) {
+        cgIndex = ri;
+        fieldIndex = fi;
+        this.fs = fs;
+      }
+
+      public ColumnMappingEntry() {
+      }
+
+      /**
+       * add map keys
+       * return false if any key already exists but no rollback!
+       */
+      public boolean addKeys(HashSet<String> keys)
+      {
+        if (keySet == null)
+          keySet = new HashSet<String>();
+        String key;
+        for (Iterator<String> it = keys.iterator(); it.hasNext(); )
+        {
+          key = it.next();
+          if (!keySet.add(key))
+            return false;
+        }
+        return true;
+      }
+
+      public int getCGIndex() {
+        return cgIndex;
+      }
+
+      public int getFieldIndex() {
+        return fieldIndex;
+      }
+
+      public Schema.ColumnSchema getColumnSchema() {
+        return fs;
+      }
+
+      public boolean invalid() {
+        return (cgIndex == -1 && fieldIndex == -1);
+      }
+      
+      public int compareTo(ColumnMappingEntry anotherEntry) {   	  
+    	  int r = anotherEntry.getCGIndex();
+    	  if (r != this.cgIndex)
+    		  return this.cgIndex - r;
+    	  else {
+    		  int f = anotherEntry.getFieldIndex();
+    		  return this.fieldIndex -f;
+    	  }	
+      }
+      
+      HashSet<String> getKeys() {
+        return keySet;
+      }
+      
+
+    }
+
+    /**
+     * This class holds the column group info for a (sub)column which is a unit
+     * in a column group
+     */
+    class PartitionFieldInfo {
+      private HashSet<PartitionInfo.ColumnMappingEntry> mSplitMaps =
+          new HashSet<ColumnMappingEntry>();
+      private HashSet<String> mSplitColumns = new HashSet<String>();
+      private ColumnMappingEntry mCGIndex = null;
+      private String mCGName = null; // fully qualified name
+      private SplitType stype = SplitType.NONE;
+      private boolean splitChild;
+
+      /**
+       * set a MAP key split (sub)column
+       * returns false if sanity check fails
+       */
+      boolean setKeyCGIndex(int ri, int fi, String name, Schema.ColumnSchema fs, HashSet<String> keys) {
+        Partition.PartitionInfo.ColumnMappingEntry cme =
+              new Partition.PartitionInfo.ColumnMappingEntry( ri, fi, fs);
+        mSplitMaps.add(cme);
+        // multiple map splits on one MAP column is allowed!
+        mSplitColumns.add(name);
+        return cme.addKeys(keys);
+      }
+
+      /**
+       * set a record field split (sub)column
+       */
+      boolean setCGIndex(int ri, int fi, String name, Schema.ColumnSchema fs) {
+        if (mCGIndex != null) return false;
+        mCGIndex = new Partition.PartitionInfo.ColumnMappingEntry(ri, fi, fs);
+        mCGName = name;
+        return true;
+      }
+
+      ColumnMappingEntry getCGIndex() {
+        return mCGIndex;
+      }
+
+      String getCGName() {
+        return mCGName;
+      }
+
+      /**
+       * set the split type of a (sub)column
+       */
+      void setSplit(SplitType st, SplitType cst, String name, String childName, boolean splitChild) throws ParseException {
+        if (st == stype)
+        {
+          // multiple MAP splits of a field and its children on different keys are ok
+          if (st == SplitType.MAP || cst == SplitType.MAP || splitChild == this.splitChild)
+            return;
+        }
+        if (stype != SplitType.NONE) {
+          if (childName != null)
+            name = name + "." + childName;
+          throw new ParseException("Different Split Types Set on the same field: " + name);
+        }
+        stype = st;
+        this.splitChild = splitChild;
+        if (mSplitColumns.contains(name)) {
+          if (childName != null)
+            name = name + "." + childName;
+          throw new ParseException("Split on "+name+" are set more than once");
+        }
+        mSplitColumns.add(name);
+      }
+
+      /*
+       * creates a default "catch-all" column schema if necessary
+       */
+      void generateDefaultCGSchema(Schema.ColumnSchema fs0, Schema schema,
+          int defaultCGIndex,
+          Map<String, HashSet<PartitionInfo.ColumnMappingEntry>> colmap, String prefix,
+          Map<Schema.ColumnSchema, PartitionFieldInfo> fmap)
+          throws ParseException {
+        if (schema == null) throw new AssertionError("Interal Logic Error.");
+        if (prefix == null) prefix = fs0.name;
+        else if (fs0.name != null && !fs0.name.isEmpty())
+          prefix += "." + fs0.name;
+        Schema.ColumnSchema fs;
+        for (int i = 0; i < fs0.schema.getNumColumns(); i++) {
+          fs = fs0.schema.getColumn(i);
+          PartitionFieldInfo pi;
+          if ((pi = fmap.get(fs)) == null)
+            fmap.put(fs, pi = new PartitionFieldInfo());
+
+          /*
+           * won't go down for MAP split because only one level MAP split is
+           * supported now
+           */
+          if (pi.stype != SplitType.NONE && pi.stype != SplitType.MAP) {
+            /* go to the lower level */
+            pi.generateDefaultCGSchema(fs, schema, defaultCGIndex, colmap,
+                prefix, fmap);
+          }
+          else if (pi.mCGIndex != null) {
+            HashSet<ColumnMappingEntry> cms = mColMap.get(pi.mCGName);
+            if (cms == null)
+            {
+              cms = new HashSet<ColumnMappingEntry>();
+              colmap.put(pi.mCGName, cms);
+            }
+            cms.add(pi.mCGIndex);
+            if (!pi.mSplitMaps.isEmpty()) {
+              for (Iterator<ColumnMappingEntry> it = pi.mSplitMaps.iterator();
+                   it.hasNext(); )
+              {
+                cms.add(it.next());
+              }
+            }
+          } else {
+            HashSet<ColumnMappingEntry> cms = colmap.get(prefix+"."+fs.name);
+            if (cms == null)
+            {
+              cms = new HashSet<ColumnMappingEntry>();
+              colmap.put(prefix+"."+fs.name, cms);
+            }
+            pi.mCGIndex = new ColumnMappingEntry(defaultCGIndex,
+                  schema.getNumColumns(), fs);
+            pi.mCGName = prefix+"."+fs.name;
+            cms.add(pi.mCGIndex);
+
+            schema.add(new Schema.ColumnSchema(prefix + "." + fs.name,
+                fs.schema, fs.type));
+            if (!pi.mSplitMaps.isEmpty()) {
+              for (Iterator<ColumnMappingEntry> it = pi.mSplitMaps.iterator();
+                   it.hasNext(); )
+              {
+                cms.add(it.next());
+              }
+            }
+          }
+        }
+      }
+    }
+
+    /**
+     * set a MAP key split (sub)column
+     */
+    boolean setKeyCGIndex(Schema.ColumnSchema fs, int ri, int fi, String name, HashSet<String> keys) {
+      PartitionFieldInfo pi;
+      if ((pi = fieldMap.get(fs)) == null) {
+        pi = new PartitionFieldInfo();
+        fieldMap.put(fs, pi);
+      }
+
+      return pi.setKeyCGIndex(ri, fi, name, fs.schema.getColumn(0), keys);
+    }
+
+    /**
+     * set a record field split (sub)column
+     */
+    boolean setCGIndex(Schema.ColumnSchema fs, int ri, int fi, String name) {
+      PartitionFieldInfo pi;
+      if ((pi = fieldMap.get(fs)) == null) {
+        pi = new PartitionFieldInfo();
+        fieldMap.put(fs, pi);
+      }
+      return pi.setCGIndex(ri, fi, name, fs);
+    }
+
+    /**
+     * set the split type of a (sub)column
+     */
+    void setSplit(Schema.ColumnSchema fs, SplitType st, SplitType cst, String name, String childName, boolean splitChild) throws ParseException {
+      PartitionFieldInfo pi;
+      if ((pi = fieldMap.get(fs)) == null) {
+        pi = new PartitionFieldInfo();
+        fieldMap.put(fs, pi);
+      }
+      pi.setSplit(st, cst, name, childName, splitChild);
+    }
+
+    /*
+     * creates a default "catch-all" CG schema if necessary
+     */
+    public CGSchema generateDefaultCGSchema(String compressor,
+        String serializer, final int defaultCGIndex) throws ParseException {
+      Schema schema = new Schema();
+      Schema.ColumnSchema fs;
+      for (int i = 0; i < mSchema.getNumColumns(); i++) {
+        fs = mSchema.getColumn(i);
+        PartitionFieldInfo pi;
+        if ((pi = fieldMap.get(fs)) == null)
+          fieldMap.put(fs, pi = new PartitionFieldInfo());
+
+        /*
+         * won't go down for MAP split because only one level MAP split is
+         * supported now
+         */
+        if (pi.stype != SplitType.NONE && pi.stype != SplitType.MAP)
+        {
+          /* go to the lower level */
+          pi.generateDefaultCGSchema(fs, schema, defaultCGIndex, mColMap, null,
+             fieldMap);
+        } else if (pi.mCGIndex != null) {
+          HashSet<ColumnMappingEntry> cms = mColMap.get(pi.mCGName);
+          if (cms == null)
+          {
+            cms = new HashSet<ColumnMappingEntry>();
+            mColMap.put(pi.mCGName, cms);
+          }
+          cms.add(pi.mCGIndex);
+        }
+        else {
+          HashSet<ColumnMappingEntry> cms = mColMap.get(fs.name);
+          if (cms == null)
+          {
+            cms = new HashSet<ColumnMappingEntry>();
+            mColMap.put(fs.name, cms);
+          }
+          ColumnMappingEntry cme = new ColumnMappingEntry(defaultCGIndex,
+                schema.getNumColumns(), fs);
+          cms.add(cme);
+          setCGIndex(fs, defaultCGIndex, schema.getNumColumns(), fs.name);
+          schema.add(fs);
+          for (Iterator<ColumnMappingEntry> it = pi.mSplitMaps.iterator();
+               it.hasNext(); )
+          {
+            cms.add(it.next());
+          }
+        }
+      }
+      CGSchema defaultSchema =
+          (schema.getNumColumns() == 0 ? null : new CGSchema(schema, false,
+              serializer, compressor));
+      return defaultSchema;
+    }
+
+    /**
+     * returns "hash key-to-(sub)column" map on a (sub)column which is MAP-split
+     * aross different hash keys
+     */
+    public HashSet<PartitionInfo.ColumnMappingEntry> getSplitMap(
+        Schema.ColumnSchema fs) {
+      PartitionFieldInfo pi;
+      if ((pi = mPartitionInfo.fieldMap.get(fs)) == null) {
+        pi = new PartitionFieldInfo();
+        mPartitionInfo.fieldMap.put(fs, pi);
+      }
+      return pi.mSplitMaps;
+    }
+  }
+
+  /*
+   * This class records all column groups required and their corresponding users
+   * of tuples and their projections in turn.
+   */
+  private class CGEntry {
+    /* reference to needing partitioned columns */
+    private ArrayList<PartitionedColumn> mSources = null; // reference to
+    // needing partitioned
+    // columns
+    private ArrayList<String> mProjections = null; // projections
+    private int mSize = 0; // number of users
+    private int mCGIndex = -1;
+    private Tuple mTuple = null;
+    // map of a projection index to a set of interested hash keys
+    private HashMap<Integer, HashSet<String>> mKeyMap;
+
+    CGEntry(int cgindex) {
+      mCGIndex = cgindex;
+      mSources = new ArrayList<PartitionedColumn>();
+      mProjections = new ArrayList<String>();
+    }
+
+    void addUser(Partition.PartitionedColumn src, String projection) {
+      mSources.add(src);
+      mProjections.add(projection);
+      src.setProjIndex(mSize++);
+    }
+
+    void addUser(Partition.PartitionedColumn src, String projection, HashSet<String> keys) {
+      mSources.add(src);
+      mProjections.add(projection);
+      if (mKeyMap == null) {
+        mKeyMap = new HashMap<Integer, HashSet<String>>();
+      }
+      mKeyMap.put(mSize, keys);
+      src.setProjIndex(mSize++);
+    }
+
+    void insert(final BytesWritable key) throws ExecException {
+      for (int i = 0; i < mSize; i++)
+        ((Tuple) mTuple).set(mSources.get(i).getProjIndex(), mSources.get(i)
+            .getRecord());
+    }
+
+    void read() throws ExecException {
+      for (int i = 0; i < mSize; i++)
+        mSources.get(i).setRecord(mTuple.get(mSources.get(i).getProjIndex()));
+    }
+
+    void setSource(Tuple tuple) {
+      mTuple = tuple;
+    }
+
+    int getCGIndex() {
+      return mCGIndex;
+    }
+
+    /**
+     * return untyped projection schema
+     */
+    String getProjection() throws ParseException {
+      String result = new String();
+      HashSet<String> keySet;
+      for (int i = 0; i < mProjections.size(); i++) {
+        if (i > 0) result += ",";
+        result += mProjections.get(i);
+        if (mKeyMap != null && (keySet = mKeyMap.get(i)) != null)
+        {
+          if (keySet.isEmpty())
+            throw new AssertionError(
+              "Internal Logical Error: Empty key map.");
+          result += "#{";
+          int j = 0;
+          for (Iterator<String> it = keySet.iterator(); it.hasNext(); j++)
+          {
+            if (j > 0)
+              result += "|";
+            result += it.next();
+          }
+          result += "}";
+        }
+      }
+      return result;
+    }
+  }
+
+  /**
+   * stitch and split execution class
+   */
+  private class PartitionedColumn {
+    private ArrayList<PartitionedColumn> mChildren = null; // partitioned
+    // children
+    private int mChildrenLen = 0;
+    private int mFieldIndex = -1; // field index in parent
+    private int mProjIndex = -1; // field index in CG projection: only used by
+    // a leaf
+    private Partition.SplitType mSplitType = Partition.SplitType.NONE;
+    private Object mTuple = null;
+    private boolean mNeedTmpTuple;
+    private HashSet<String> mKeys; // interested hash keys
+
+    PartitionedColumn(int fi, boolean needTmpTuple)
+        throws IOException {
+      mFieldIndex = fi;
+      mNeedTmpTuple = needTmpTuple;
+    }
+
+    PartitionedColumn(int fi, Partition.SplitType st,
+        boolean needTmpTuple) throws IOException {
+      this(fi, needTmpTuple);
+      mSplitType = st;
+    }
+
+    void setKeys(HashSet<String> keys) {
+      mKeys = keys;
+    }
+
+    /**
+     * stitch op
+     */
+    @SuppressWarnings("unchecked")
+    void stitch() throws IOException {
+      PartitionedColumn child;
+      if (mSplitType == Partition.SplitType.NONE) {
+        for (int i = 0; i < mChildrenLen; i++) {
+          child = mChildren.get(i);
+          ((Tuple) mTuple).set(child.mFieldIndex, child.getRecord());
+        }
+      }
+      else {
+        // stitch MAP-key partitioned hashes
+        for (int i = 0; i < mChildrenLen; i++) {
+          child = mChildren.get(i);
+          // add the new (key,value) to the existing map
+          ((Map<String, Object>) mTuple)
+              .putAll((Map<String, Object>)child.getRecord());
+        }
+      }
+    }
+
+    /**
+     * split op
+     */
+    @SuppressWarnings("unchecked")
+    void split() throws IOException {
+      PartitionedColumn child;
+      if (mSplitType == SplitType.NONE) {
+        // record split
+        for (int i = 0; i < mChildrenLen; i++) {
+          child = mChildren.get(i);
+          child.setRecord(((Tuple) mTuple).get(child.mFieldIndex));
+        }
+      }
+      else {
+        // split MAP columns excluding the hashes already in split key CGs
+        String key;
+
+        // make a clone so the input MAP is intact after keys are yanked
+        Object newmap = ((HashMap<String, Object>) mTuple).clone();
+        mTuple = newmap;
+        Map<String, Object> map_column =
+            (Map<String, Object>) (mTuple);
+        Map<String, Object> childMap;
+        Object value;
+        for (int i = 0; i < mChildrenLen; i++) {
+          child = mChildren.get(i);
+          childMap = (Map<String, Object>) child.getRecord();
+          for (Iterator<String> it = child.mKeys.iterator(); it.hasNext();)
+          {
+            key = it.next();
+            if ((value = map_column.get(key)) != null)
+            {
+              childMap.put(key, value);
+              map_column.remove(key);
+            }
+          }
+        }
+      }
+    }
+
+    Object getRecord() {
+      return mTuple;
+    }
+
+    void setRecord(Object t) {
+      mTuple = t;
+    }
+
+    void addChild(PartitionedColumn child) {
+      if (mChildren == null) mChildren = new ArrayList<PartitionedColumn>();
+      mChildren.add(child);
+      mChildrenLen++;
+    }
+
+    void setProjIndex(int projindex) {
+      mProjIndex = projindex;
+    }
+
+    int getProjIndex() {
+      return mProjIndex;
+    }
+
+    void createTmpTuple() throws IOException {
+      if (mNeedTmpTuple)
+      {
+        int size = (mChildrenLen > 0 ? mChildrenLen : 1);
+        mTuple = TypesUtils.createTuple(size);
+      }
+    }
+
+    /**
+     * create maps if necessary
+     */
+    void createMap()
+    {
+      mTuple = new HashMap<String, Object>();
+    }
+
+    /**
+     * clear map
+     */
+    @SuppressWarnings("unchecked")
+    void clearMap()
+    {
+      ((Map)mTuple).clear();
+    }
+  }
+
+  private HashMap<Integer, CGEntry> mCGs = null; // involved CGs
+  private ArrayList<PartitionedColumn> mExecs = null; // stitches to be
+  // performed in sequence:
+  // called by LOAD
+  private int mStitchSize = 0; // number of the stitches
+  private int mSplitSize = 0; // number of the splits
+  private Schema mSchema = null;
+  private CGSchema[] mCGSchemas;
+  private PartitionInfo mPartitionInfo;
+  private Projection mProjection = null;
+  private ArrayList<PartitionedColumn> mPCNeedTmpTuple = new ArrayList<PartitionedColumn>();
+  private ArrayList<PartitionedColumn> mPCNeedMap = new ArrayList<PartitionedColumn>();
+
+  /*
+   * ctor used for LOAD
+   */
+  public Partition(Schema schema, Projection projection, String storage)
+      throws ParseException, IOException {
+    mSchema = schema;
+    TableStorageParser sparser =
+        new TableStorageParser(new StringReader(storage), this, mSchema);
+    mPartitionInfo = new PartitionInfo(schema);
+    ArrayList<CGSchema> cgschemas = sparser.StorageSchema();
+    mCGSchemas = cgschemas.toArray(new CGSchema[cgschemas.size()]);
+    mProjection = projection;
+    Schema projSchema = projection.getProjectionSchema();
+    int size = projSchema.getNumColumns();
+    HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> cgindices;
+    PartitionInfo.ColumnMappingEntry cgindex;
+    mCGs = new HashMap<Integer, CGEntry>();
+    Schema.ColumnSchema fs;
+    CGEntry cgentry;
+    Schema.ParsedName pname = new Schema.ParsedName();
+    mExecs = new ArrayList<PartitionedColumn>();
+    PartitionedColumn parCol, curCol = new PartitionedColumn(-1, false); // top
+    // level:
+    // target
+    // of
+    // stitch
+    String name;
+    HashSet<String> projectedKeys;
+    for (int i = 0; i < size; i++) // depth-first
+    {
+      if (projSchema.getColumn(i) == null)
+        continue;
+      name = projSchema.getColumn(i).name;
+      pname.setName(name);
+      projectedKeys = (projection.getKeys() == null ? null :
+                       projection.getKeys().get(projSchema.getColumn(i)));
+      cgindices = getColMapping(schema, name, pname, projectedKeys);
+      if (cgindices != null) {
+        // either needs split of a CG column or the projection is a CG proper
+        fs = schema.getColumnSchema(name);
+        if (getSplitMap(fs).isEmpty()) {
+          if (cgindices.size() != 1)
+            throw new AssertionError( "Internal Logical Error: one RECORD split is expected.");
+          Set<Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>>> entrySet = cgindices.entrySet();
+          Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>> mapentry;
+          mapentry = entrySet.iterator().next();
+          cgindex = mapentry.getKey();
+          if (cgindex == null)
+            throw new AssertionError( "Internal Logical Error: RECORD does not have a CG index.");
+          if (mapentry.getValue() != null)
+            throw new AssertionError( "Internal Logical Error: RECORD should not have a split key map.");
+          cgentry = getCGEntry(cgindex.getCGIndex());
+          parCol = new PartitionedColumn(i, true);
+          mPCNeedTmpTuple.add(parCol);
+          cgentry.addUser(parCol, name);
+          curCol.addChild(parCol);
+        } else {
+          // leaves are not added to exec!
+          if (!getSplitMap(fs).isEmpty()) {
+            // this subtype is MAP-split
+            handleMapStitch(pname, curCol, fs, i,
+                getCGIndex(fs).getFieldIndex(), cgindices);
+          }
+        }
+      }
+      else {
+        // a composite column of CGs
+        fs = schema.getColumnSchema(name);
+        if (fs == null) continue;
+        parCol = new PartitionedColumn(i, true);
+        mPCNeedTmpTuple.add(parCol);
+        buildStitch(fs, pname, parCol); // depth-first
+        mExecs.add(parCol);
+        mStitchSize++;
+        curCol.addChild(parCol);
+      }
+    }
+    for (int i = 0; i < mPCNeedTmpTuple.size(); i++)
+      mPCNeedTmpTuple.get(i).createTmpTuple();
+    mExecs.add(curCol);
+    mStitchSize++;
+  }
+
+  /*
+   * ctor used by STORE
+   */
+  public Partition(final String schema, final String storage)
+      throws ParseException, IOException
+  {
+    TableSchemaParser parser = new TableSchemaParser(new StringReader(schema));
+    mSchema = parser.RecordSchema(null);
+    mPartitionInfo = new PartitionInfo(mSchema);
+    TableStorageParser sparser =
+        new TableStorageParser(new StringReader(storage), this, mSchema);
+    ArrayList<CGSchema> cgschemas = sparser.StorageSchema();    
+    mCGSchemas = cgschemas.toArray(new CGSchema[cgschemas.size()]);
+    int size = mSchema.getNumColumns();
+    PartitionInfo.ColumnMappingEntry cgindex;
+    mCGs = new HashMap<Integer, CGEntry>();
+    CGEntry cgentry;
+    mExecs = new ArrayList<PartitionedColumn>();
+    PartitionedColumn parCol, curCol = new PartitionedColumn(-1, false); // top
+    // level:
+    // target
+    // of
+    // stitch
+    mExecs.add(curCol); // breadth-first
+    mSplitSize++;
+    Schema.ColumnSchema fs;
+    for (int i = 0; i < size; i++) {
+      fs = mSchema.getColumn(i);
+      cgindex = getCGIndex(fs);
+      if (cgindex != null) {
+        // a CG field
+        cgentry = getCGEntry(cgindex.getCGIndex());
+        // leaves are not added to exec!
+        if (getSplitMap(fs).isEmpty()) {
+          parCol = new PartitionedColumn(i, false);
+          cgentry.addUser(parCol, null); // null mean all schema
+          parCol.setProjIndex(cgindex.getFieldIndex());
+          curCol.addChild(parCol);
+        } else {
+          // this subtype is MAP-split
+          // => need to add splits for all split keys
+          handleMapSplit(curCol, fs, i, cgentry);
+        }
+      }
+      else {
+        // a composite column of CGs
+        parCol = new PartitionedColumn(i, false);
+        mExecs.add(parCol); // breadth-first
+        mSplitSize++;
+        buildSplit(fs, parCol);
+        curCol.addChild(parCol);
+      }
+    }
+    
+    for (int i = 0; i < mPCNeedTmpTuple.size(); i++)
+      mPCNeedTmpTuple.get(i).createTmpTuple();
+
+    for (int i = 0; i < mPCNeedMap.size(); i++)
+      mPCNeedMap.get(i).createMap();
+  }
+
+  /**
+   * returns table schema
+   */
+  public Schema getSchema() {
+    return mSchema;
+  }
+
+  /*
+   * returns the partition info created by the parser
+   */
+  public PartitionInfo getPartitionInfo() {
+    return mPartitionInfo;
+  }
+
+  /*
+   * returns all column group schemas
+   */
+  public CGSchema[] getCGSchemas() {
+    return mCGSchemas;
+  }
+
+  /*
+   * returns a particular column group schemas
+   */
+  public CGSchema getCGSchema(int index) {
+    if (mCGSchemas == null) return null;
+    return mCGSchemas[index];
+  }
+
+  /*
+   * search from the most specific name until the least specific: if none found
+   * return null; In addition, the fq name portion after the matched CG's fq
+   * name is returned in pn. For MAP split columns, it returns the catch-all
+   * CG not the CGs with specific keys if this is not a key projection; otherwise
+   * a map of CG fields to projected keys is returned
+   */
+  private HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> getColMapping(Schema schema,
+      String name, Schema.ParsedName pn, HashSet<String> projectedKeys) throws ParseException {
+    Map<String, HashSet<PartitionInfo.ColumnMappingEntry>> colmap =
+        mPartitionInfo.mColMap;
+    int fromIndex = name.length() - 1, lastHashIndex, lastFieldIndex;
+
+    HashSet<PartitionInfo.ColumnMappingEntry> results = colmap.get(name);
+    HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> result = null;
+    HashSet<String> keys;
+    String ancestorName;
+    boolean map = false;
+    if (results != null)
+    {
+      if (projectedKeys != null)
+      {
+        pn.mDT = ColumnType.MAP;
+        map = true;
+      } else {
+        pn.mDT = ColumnType.ANY;
+        PartitionInfo.ColumnMappingEntry cme;
+        for (Iterator<PartitionInfo.ColumnMappingEntry> it = results.iterator(); it.hasNext(); )
+        {
+          cme = it.next();
+          if (cme.getKeys() == null)
+          {
+            // no specific keys are interested. Either a MAP-split without key or a RECORD-split is OK
+            result = new HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>>();
+            result.put(cme, null);
+            return result;
+          }
+        }
+        return null;
+      }
+    }
+
+    while (results == null) {
+      lastHashIndex = name.lastIndexOf('#', fromIndex);
+      lastFieldIndex = name.lastIndexOf('.', fromIndex);
+      if (lastHashIndex == -1 && lastFieldIndex == -1) break;
+      else if (lastHashIndex == -1) {
+        fromIndex = lastFieldIndex;
+      }
+      else if (lastFieldIndex == -1) {
+        fromIndex = lastHashIndex;
+        map = true;
+      }
+      else {
+        if (lastHashIndex == lastFieldIndex - 1
+            || lastFieldIndex == lastHashIndex - 1) break;
+        fromIndex =
+            (lastFieldIndex > lastHashIndex ? lastFieldIndex : lastHashIndex);
+      }
+      if (fromIndex <= 0) break;
+      ancestorName = name.substring(0, fromIndex);
+      results = colmap.get(ancestorName);
+      fromIndex--;
+    }
+    if (results != null) {
+      if (map)
+      {
+        // build a HashMap from ColumnGroupMappingEntry to a set of projected keys for MAP-split
+        if (results.isEmpty())
+          throw new AssertionError( "Internal Logical Error: split is expected.");
+        PartitionInfo.ColumnMappingEntry thisCG, defaultCG = null;
+        boolean found;
+        String projectedKey;
+        for (Iterator<String> projectedKeyIt = projectedKeys.iterator(); projectedKeyIt.hasNext();)
+        {
+          projectedKey = projectedKeyIt.next();
+          found = false;
+          thisCG = null;
+          for (Iterator<PartitionInfo.ColumnMappingEntry> it = results.iterator();
+              it.hasNext(); )
+          {
+            thisCG = it.next();
+            keys = thisCG.getKeys();
+            if (keys == null)
+            {
+              defaultCG = thisCG;
+            } else {
+              if (keys.contains(projectedKey))
+              {
+                found = true;
+                break;
+              }
+            }
+          }
+          
+          if (!found)
+          {
+            if (defaultCG == null)
+              throw new AssertionError( "Internal Logical Error: default MAP split CG is missing.");
+
+            thisCG =defaultCG;
+          }
+          if (result == null)
+            result = new HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>>();
+          if ((keys = result.get(thisCG)) == null)
+          {
+            keys = new HashSet<String>();
+            result.put(thisCG, keys);
+          }
+          keys.add(projectedKey);
+        }
+        if (result == null)
+          if (results.isEmpty())
+           throw new AssertionError( "Internal Logical Error: Default MAP split column is missing.");
+      } else {
+        // a RECORD-split
+        if (results.size() != 1)
+         throw new AssertionError(
+           "Internal Logical Error: A single split is expected.");
+        result = new HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>>();
+        result.put(results.iterator().next(), null);
+        return result;
+      }
+      // discard the CG fq name
+      fromIndex += 2;
+      if (!map)
+      {
+        // no need to crawl down after encountering a MAP-split subcolumn
+        pn.setName(name.substring(fromIndex));
+        pn.parseName(results.iterator().next().getColumnSchema());
+      }
+    }
+    return result;
+  }
+
+  /**
+   * recursively build stitch executions
+   * @throws IOException 
+   */
+  private void buildStitch(Schema.ColumnSchema fs, Schema.ParsedName pn,
+      PartitionedColumn parent) throws ParseException, IOException {
+    // depth-first traversal
+    CGEntry cgentry = null;
+    PartitionedColumn parCol;
+
+    if (fs.schema != null && fs.schema.getNumColumns() > 0) {
+      Schema.ColumnSchema child;
+      pn.parseName(fs);
+      Schema.ParsedName oripn = new Schema.ParsedName();
+      for (int i = 0; i < fs.schema.getNumColumns(); i++) {
+        oripn.setName(new String(pn.mName), pn.mDT);
+        child = fs.schema.getColumn(i);
+        if (getCGIndex(child) == null) {
+          // not a CG: go one level lower
+          parCol = new PartitionedColumn(i, true);
+          mPCNeedTmpTuple.add(parCol);
+          buildStitch(child, oripn, parCol); // depth-first
+          mExecs.add(parCol);
+          mStitchSize++;
+          parent.addChild(parCol);
+        }
+        else {
+          if (getSplitMap(child).isEmpty()) {
+            // this subtype is not MAP-split
+            cgentry = getCGEntry(getCGIndex(child).getCGIndex());
+            parCol = new PartitionedColumn(i, true);
+            mPCNeedTmpTuple.add(parCol);
+            cgentry.addUser(parCol, getCGName(child));
+            parent.addChild(parCol);
+          }
+          else {
+            // this subtype is MAP-split
+            Map<String, HashSet<PartitionInfo.ColumnMappingEntry>> colmap =
+              mPartitionInfo.mColMap;
+            HashSet<PartitionInfo.ColumnMappingEntry> msplits = colmap.get(getCGName(child));
+            HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> cgindices;
+            cgindices = new HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>>();
+            PartitionInfo.ColumnMappingEntry cme;
+            for (Iterator<PartitionInfo.ColumnMappingEntry> it = msplits.iterator(); it.hasNext(); )
+            {
+              cme = it.next();
+
+              // all keys must be included
+              if (cme.getKeys() == null)
+                cgindices.put(cme, cme.getKeys());
+            }
+            handleMapStitch(oripn, parent, child, i, getCGIndex(
+                child).getFieldIndex(), cgindices);
+          }
+        }
+      }
+    }
+    else throw new AssertionError(
+        "Internal Logical Error: Leaf type must have a CG ancestor or is CG itself.");
+  }
+
+  /**
+   * build stitches on a MAP split (sub)column
+   * @throws IOException 
+   */
+  private void handleMapStitch(Schema.ParsedName pn,
+      PartitionedColumn parent, Schema.ColumnSchema child, int i,
+      int fi, HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> cgindices) throws IOException {
+    CGEntry cgentry;
+    if (pn.mDT == ColumnType.ANY) {
+      // this subtype is MAP split and the projection is on the whole MAP:
+      // => need to add stitches for all split keys
+
+      // first the map partitioned column that contain all non-key-partitioned
+      // hashes
+      if (cgindices == null || cgindices.size() != 1) {
+        throw new AssertionError(
+            "Internal Logical Error: Invalid map key size.");
+      }
+      
+      Set<Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>>> entrySet = cgindices.entrySet();
+      Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>> mapentry;
+      mapentry = entrySet.iterator().next();
+      if (mapentry.getValue() != null)
+        throw new AssertionError( "Internal Logical Error: RECORD should not have a split key map.");
+      
+      cgentry = getCGEntry(getCGIndex(child).getCGIndex());
+
+      PartitionedColumn mapParCol =
+          new PartitionedColumn(i, Partition.SplitType.MAP, true);
+      mPCNeedTmpTuple.add(mapParCol);
+      cgentry.addUser(mapParCol, getCGName(child));
+      mExecs.add(mapParCol); // not a leaf : MAP stitch needed
+      mStitchSize++;
+      parent.addChild(mapParCol);
+
+      // the map-key partitioned columns:
+      HashSet<PartitionInfo.ColumnMappingEntry> splitMap =
+          getSplitMap(child);
+      PartitionInfo.ColumnMappingEntry cgindex;
+      int index;
+      HashSet<Integer> projectedCGs = new HashSet<Integer>(); 
+      for (Iterator<PartitionInfo.ColumnMappingEntry> it = splitMap.iterator(); it.hasNext();) {
+        cgindex = it.next();
+        index = cgindex.getCGIndex();
+        cgentry = getCGEntry(index);
+        // if the CG is already included in this sub-column stitch, then no need
+        // to add it as a separate stitch since all keys therein is already included
+        if (!projectedCGs.contains(index))
+        {
+          PartitionedColumn parCol =
+             new PartitionedColumn(0, true);
+          mPCNeedTmpTuple.add(parCol);
+          cgentry.addUser(parCol, getCGName(child), cgindex.getKeys());
+          mapParCol.addChild(parCol); // contribute to the non-key-partitioned
+         // hashes
+          mPCNeedMap.add(parCol);
+         projectedCGs.add(index);
+        }
+      }
+    }
+    else {
+      // this sub-type is MAP split and the projection is on another key which
+      // is not a split key:
+      // => need to add a specific key stitch
+      if (cgindices == null) {
+        throw new AssertionError(
+            "Internal Logical Error: MAP key set is empty.");
+      }
+
+      Set<Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>>> entrySet = cgindices.entrySet();
+      HashSet<String> projectedKeys;
+      Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>> mapentry;
+      boolean needParent = (entrySet.size() > 1);
+      boolean newParent = false;
+      PartitionedColumn parCol;
+      for (Iterator<Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>>> it = entrySet.iterator(); it.hasNext(); )
+      {
+        mapentry = it.next();
+        projectedKeys = mapentry.getValue();
+        cgentry = getCGEntry(mapentry.getKey().getCGIndex());
+        if (needParent)
+        {
+          parCol = new PartitionedColumn(i, Partition.SplitType.MAP, true);
+          mExecs.add(parCol); // not a leaf : MAP stitch needed
+          mStitchSize++;
+          mPCNeedMap.add(parCol);
+          parent.addChild(parCol);
+          parent = parCol;
+          needParent = false;
+          newParent = true;
+        } else {
+          parCol = new PartitionedColumn(newParent ? 0 : i, true);
+          parent.addChild(parCol);
+        }
+        mPCNeedTmpTuple.add(parCol);
+        cgentry.addUser(parCol, getCGName(child), projectedKeys);
+      }
+    }
+  }
+
+  private CGEntry getCGEntry(int cgindex)
+  {
+    CGEntry result = mCGs.get(cgindex);
+    if (result == null)
+    {
+      result = new CGEntry(cgindex);
+      mCGs.put(cgindex, result);
+    }
+    return result;
+  }
+
+  /**
+   * recursively build split executions
+   * @throws IOException 
+   */
+  private void buildSplit(Schema.ColumnSchema fs, PartitionedColumn parent)
+      throws ParseException, IOException {
+    // depth-first traversal
+    CGEntry cgentry;
+    PartitionedColumn parCol;
+
+    if (fs.schema != null && fs.schema.getNumColumns() > 0) {
+      Schema.ColumnSchema child;
+      for (int i = 0; i < fs.schema.getNumColumns(); i++) {
+        child = fs.schema.getColumn(i);
+        PartitionInfo.ColumnMappingEntry cgindex = getCGIndex(child);
+        if (cgindex == null) {
+          // not a CG: go one level lower
+          parCol = new PartitionedColumn(i, false);
+          mExecs.add(parCol); // breadth-first
+          mSplitSize++;
+          parent.addChild(parCol);
+          buildSplit(child, parCol);
+        }
+        else {
+          cgentry = getCGEntry(cgindex.getCGIndex());
+          if (getSplitMap(child).isEmpty()) {
+            // this subfield is not MAP-split
+            parCol = new PartitionedColumn(i, false);
+            cgentry.addUser(parCol, getCGName(child));
+            parCol.setProjIndex(cgindex.getFieldIndex());
+            parent.addChild(parCol);
+
+          }
+          else {
+            // this subfield is MAP-split
+            // => need to add splits for all split keys
+            handleMapSplit(parent, child, i, cgentry);
+          }
+        }
+      }
+    }
+    else throw new AssertionError(
+        "Internal Logical Error: Leaf type must have a CG ancestor or is CG itself.");
+  }
+
+  /**
+   * build splits for MAP split (sub)columns
+   * 
+   * @throws IOException
+   */
+  private void handleMapSplit(PartitionedColumn parent,
+      Schema.ColumnSchema child, int i, CGEntry cgentry) throws ParseException, IOException {
+    // first the map partitioned column that contain all non-key-partitioned
+    // hashes
+    PartitionedColumn mapParCol =
+        new PartitionedColumn(i, Partition.SplitType.MAP, false);
+    cgentry.addUser(mapParCol, getCGName(child));
+    mExecs.add(mapParCol); // not a leaf : MAP split needed
+    mSplitSize++;
+    parent.addChild(mapParCol);
+
+    // the map-key partitioned columns:
+    HashSet<PartitionInfo.ColumnMappingEntry> splitMaps = getSplitMap(child);
+    PartitionInfo.ColumnMappingEntry cgindex;
+    int index;
+    HashSet<String> keySet;
+    for (Iterator<PartitionInfo.ColumnMappingEntry> it = splitMaps.iterator(); it.hasNext();) {
+      cgindex = it.next();
+      index = cgindex.getCGIndex();
+      cgentry = getCGEntry(index);
+      keySet = cgindex.getKeys();
+      PartitionedColumn parCol;
+      parCol = new PartitionedColumn(0, false);
+      parCol.setKeys(keySet);
+      cgentry.addUser(parCol, getCGName(child), keySet);
+      parCol.setProjIndex(cgindex.getFieldIndex());
+      // contribute to the non-key-partitioned hashes
+      mapParCol.addChild(parCol);
+      mPCNeedMap.add(parCol); // children needs the tmp map
+    }
+  }
+
+  /**
+   * read in a tuple based on stitches
+   */
+  public void read(Tuple t) throws AssertionError, IOException, Exception {
+    if (mStitchSize == 0 || mCGs == null || mCGs.isEmpty())
+      return;
+
+    // dispatch
+    mExecs.get(mStitchSize - 1).setRecord(t);
+
+    // read in CG data
+    Set<Map.Entry<Integer, CGEntry>> entrySet = mCGs.entrySet();
+    Iterator<Map.Entry<Integer, CGEntry>> it = entrySet.iterator();
+    while (it.hasNext())
+      it.next().getValue().read();
+
+    TypesUtils.resetTuple(t);
+    // dispatch
+    mExecs.get(mStitchSize - 1).setRecord(t);
+
+    // start the stitch
+    for (int i = 0; i < mStitchSize; i++)
+      mExecs.get(i).stitch();
+
+    return;
+  }
+
+  /**
+   * insert a tuple after splits
+   */
+  public void insert(final BytesWritable key, final Tuple t)
+      throws AssertionError, IOException, Exception {
+    if (mSplitSize == 0 || mCGs == null || mCGs.isEmpty())
+      throw new AssertionError("Empty Column Group List!");
+
+    // dispatch
+    mExecs.get(0).setRecord(t);
+    for (int i = 0; i < mPCNeedMap.size(); i++)
+      mPCNeedMap.get(i).clearMap();
+
+    for (int i = 0; i < mSplitSize; i++)
+      mExecs.get(i).split();
+
+    // insert CG data
+    Set<Map.Entry<Integer, CGEntry>> entrySet = mCGs.entrySet();
+    Iterator<Map.Entry<Integer, CGEntry>> it = entrySet.iterator();
+    while (it.hasNext())
+      it.next().getValue().insert(key);
+    return;
+  }
+
+  /**
+   * sets the source tuple for the column group ops
+   */
+  public void setSource(Tuple[] tuples) throws ParseException {
+    if (tuples.length < mCGs.size())
+      throw new ParseException(
+          "Internal Logical Error: Invalid number of column groups");
+    for (int i = 0; i < tuples.length; i++) {
+      if (mCGs.get(i) != null) mCGs.get(i).setSource(tuples[i]);
+    }
+  }
+
+  /**
+   * returns projection schema for a particular column group
+   */
+  public String getProjection(int cgindex) throws ParseException {
+    CGEntry cgentry = mCGs.get(cgindex);
+    if (cgentry != null) return cgentry.getProjection();
+    return null;
+  }
+
+  /**
+   * returns table projection
+   */
+  public Projection getProjection() {
+    return mProjection;
+  }
+
+  public HashSet<PartitionInfo.ColumnMappingEntry> getSplitMap(
+      Schema.ColumnSchema fs) {
+    return mPartitionInfo.getSplitMap(fs);
+  }
+
+  public CGSchema generateDefaultCGSchema(String compressor, String serializer,
+      final int defaultCGIndex) throws ParseException {
+    return mPartitionInfo.generateDefaultCGSchema(compressor, serializer,
+        defaultCGIndex);
+  }
+
+  void setSplit(Schema.ColumnSchema fs, SplitType st, SplitType cst, String name, String childName, boolean splitChild) throws ParseException {
+    mPartitionInfo.setSplit(fs, st, cst, name, childName, splitChild);
+  }
+
+  boolean setCGIndex(Schema.ColumnSchema fs, int ri, int fi, String name) {
+    return mPartitionInfo.setCGIndex(fs, ri, fi, name);
+  }
+
+  PartitionInfo.ColumnMappingEntry getCGIndex(Schema.ColumnSchema fs) {
+    PartitionInfo.PartitionFieldInfo pi;
+    if ((pi = mPartitionInfo.fieldMap.get(fs)) != null) return pi.getCGIndex();
+    return null;
+  }
+
+  /**
+   * @param fs
+   * @return fully qualified name for a CG column
+   */
+  String getCGName(Schema.ColumnSchema fs) {
+    PartitionInfo.PartitionFieldInfo pi;
+    if ((pi = mPartitionInfo.fieldMap.get(fs)) != null) return pi.getCGName();
+    return null;
+  }
+  
+  public boolean isCGNeeded(int i)
+  {
+    return mCGs.containsKey(i);
+  }
+}