You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/02/13 01:06:17 UTC

svn commit: r909667 [8/9] - in /hadoop/pig/branches/load-store-redesign/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/ src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/ src/java/org/apache/hadoop/zebra/mapreduce/ src/ja...

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TestBasicTable;
+import org.apache.hadoop.zebra.mapreduce.RowTableSplit;
+import org.apache.hadoop.zebra.mapreduce.TableInputFormat;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestTfileSplit {
+	private static Configuration conf;
+	private static Path path;
+
+	@BeforeClass
+	public static void setUpOnce() throws IOException {
+		TestBasicTable.setUpOnce();
+		conf = TestBasicTable.conf;
+		path = new Path(TestBasicTable.rootPath, "TfileSplitTest");
+	}
+
+	@AfterClass
+	public static void tearDown() throws IOException {
+		BasicTable.drop(path, conf);
+	}
+
+	/* In this test, we test creating input splits for the projection on non-existing column case.
+	 * The first non-deleted column group should be used for split. */ 
+	@Test
+	public void testTfileSplit1() 
+	throws IOException, ParseException {
+		BasicTable.drop(path, conf);
+		TestBasicTable.createBasicTable(1, 100, "a, b, c, d, e, f", "[a, b]; [c, d]", null, path, true);    
+
+		TableInputFormat inputFormat = new TableInputFormat();
+		Job job = new Job(conf);
+		inputFormat.setInputPaths(job, path);
+		inputFormat.setMinSplitSize(job, 100);
+		inputFormat.setProjection(job, "aa");
+		List<InputSplit> splits = inputFormat.getSplits(job);
+
+		RowTableSplit split = (RowTableSplit) splits.get(0);
+		String str = split.getSplit().toString();
+		StringTokenizer tokens = new StringTokenizer(str, "\n");
+		str = tokens.nextToken();
+		tokens = new StringTokenizer(str, " ");
+		tokens.nextToken();
+		tokens.nextToken();
+		String s = tokens.nextToken();
+		s = s.substring(0, s.length()-1);
+		int cgIndex = Integer.parseInt(s);
+		Assert.assertEquals(cgIndex, 0); 
+	}
+
+	/* In this test, we test creating input splits when dropped column groups are around.
+	 * Here the projection involves all columns and only one valid column group is present.
+	 * As such, that column group should be used for split.*/
+	@Test
+	public void testTfileSplit2() 
+	throws IOException, ParseException {    
+		BasicTable.drop(path, conf);
+		TestBasicTable.createBasicTable(1, 100, "a, b, c, d, e, f", "[a, b]; [c, d]", null, path, true);    
+		BasicTable.dropColumnGroup(path, conf, "CG0");
+		BasicTable.dropColumnGroup(path, conf, "CG2");
+
+		TableInputFormat inputFormat = new TableInputFormat();
+		Job job = new Job(conf);
+		inputFormat.setInputPaths(job, path);
+		inputFormat.setMinSplitSize(job, 100);
+		List<InputSplit> splits = inputFormat.getSplits(job);
+
+		RowTableSplit split = (RowTableSplit) splits.get( 0 );
+		String str = split.getSplit().toString(); 
+		StringTokenizer tokens = new StringTokenizer(str, "\n");
+		str = tokens.nextToken();
+		tokens = new StringTokenizer(str, " ");
+		tokens.nextToken();
+		tokens.nextToken();
+		String s = tokens.nextToken();
+		s = s.substring(0, s.length()-1);
+		int cgIndex = Integer.parseInt(s);
+		Assert.assertEquals(cgIndex, 1); 
+	}
+
+	/* In this test, we test creating input splits when there is no valid column group present.
+	 * Should return 0 splits. */
+	@Test
+	public void testTfileSplit3() 
+	throws IOException, ParseException {    
+		BasicTable.drop(path, conf);
+		TestBasicTable.createBasicTable(1, 100, "a, b, c, d, e, f", "[a, b]; [c, d]", null, path, true);    
+		BasicTable.dropColumnGroup(path, conf, "CG0");
+		BasicTable.dropColumnGroup(path, conf, "CG1");
+		BasicTable.dropColumnGroup(path, conf, "CG2");
+
+		TableInputFormat inputFormat = new TableInputFormat();
+		Job job = new Job(conf);
+		inputFormat.setInputPaths(job, path);
+		inputFormat.setMinSplitSize(job, 100);
+		List<InputSplit> splits = inputFormat.getSplits(job);
+
+		Assert.assertEquals(splits.size(), 0);
+	}
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTypedApi.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTypedApi.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTypedApi.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTypedApi.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,1097 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.ZebraTuple;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition;
+import org.apache.hadoop.zebra.mapreduce.ZebraSchema;
+import org.apache.hadoop.zebra.mapreduce.ZebraSortInfo;
+import org.apache.hadoop.zebra.mapreduce.ZebraStorageHint;
+
+/**
+ * This is a sample a complete MR sample code for Table. It doens't contain
+ * 'read' part. But, it should be similar and easier to write. Refer to test
+ * cases in the same directory.
+ * 
+ * Assume the input files contain rows of word and count, separated by a space:
+ * 
+ * <pre>
+ * us 2
+ * japan 2
+ * india 4
+ * us 2
+ * japan 1
+ * india 3
+ * nouse 5
+ * nowhere 4
+ * 
+ * 
+ */
+public class TestTypedApi {
+	static String inputPath;
+	static String inputFileName = "multi-input.txt";
+	protected static ExecType execType = ExecType.LOCAL;
+	private static MiniCluster cluster;
+	protected static PigServer pigServer;
+	// private static Path pathWorking, pathTable1, path2, path3,
+	// pathTable4, pathTable5;
+	private static Configuration conf;
+	public static String sortKey = null;
+
+	private static FileSystem fs;
+
+	private static String zebraJar;
+	private static String whichCluster;
+	private static String multiLocs;
+	private static String strTable1 = null;
+	private static String strTable2 = null;
+	private static String strTable3 = null;
+
+	@BeforeClass
+	public static void setUpOnce() throws IOException {
+		if (System.getenv("hadoop.log.dir") == null) {
+			String base = new File(".").getPath(); // getAbsolutePath();
+			System
+			.setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+		}
+
+		if (System.getProperty("whichCluster") == null) {
+			System.setProperty("whichCluster", "miniCluster");
+			System.out.println("should be called");
+			whichCluster = System.getProperty("whichCluster");
+		} else {
+			whichCluster = System.getProperty("whichCluster");
+		}
+
+		System.out.println("clusterddddd: " + whichCluster);
+		System.out.println(" get env hadoop home: " + System.getenv("HADOOP_HOME"));
+		System.out.println(" get env user name: " + System.getenv("USER"));
+		if ((whichCluster.equalsIgnoreCase("realCluster") && System
+				.getenv("HADOOP_HOME") == null)) {
+			System.out.println("Please set HADOOP_HOME");
+			System.exit(0);
+		}
+
+		conf = new Configuration();
+
+		if ((whichCluster.equalsIgnoreCase("realCluster") && System.getenv("USER") == null)) {
+			System.out.println("Please set USER");
+			System.exit(0);
+		}
+		zebraJar = System.getenv("HADOOP_HOME") + "/lib/zebra.jar";
+
+		File file = new File(zebraJar);
+		if (!file.exists() && whichCluster.equalsIgnoreCase("realCluster")) {
+			System.out.println("Please put zebra.jar at hadoop_home/lib");
+			System.exit(0);
+		}
+
+		// set inputPath and output path
+		String workingDir = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			inputPath = new String("/user/" + System.getenv("USER") + "/"
+					+ inputFileName);
+			System.out.println("inputPath: " + inputPath);
+			multiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+					+ "," + "/user/" + System.getenv("USER") + "/" + "india" + ","
+					+ "/user/" + System.getenv("USER") + "/" + "japan");
+			fs = new Path(inputPath).getFileSystem(conf);
+		} else {
+			RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+			fs = new LocalFileSystem(rawLFS);
+			workingDir = fs.getWorkingDirectory().toString().split(":")[1];
+			inputPath = new String(workingDir + "/" + inputFileName);
+			System.out.println("inputPath: " + inputPath);
+			multiLocs = new String(workingDir + "/" + "us" + "," + workingDir + "/"
+					+ "india" + "," + workingDir + "/" + "japan");
+		}
+		writeToFile(inputPath);
+		// check inputPath existence
+		File inputFile = new File(inputPath);
+		if (!inputFile.exists() && whichCluster.equalsIgnoreCase("realCluster")) {
+			System.out.println("Please put inputFile in hdfs: " + inputPath);
+			// System.exit(0);
+		}
+		if (!inputFile.exists() && whichCluster.equalsIgnoreCase("miniCluster")) {
+			System.out
+			.println("Please put inputFile under workingdir. working dir is : "
+					+ workingDir);
+			System.exit(0);
+		}
+
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
+					.toProperties(conf));
+			pigServer.registerJar(zebraJar);
+
+		}
+
+		if (whichCluster.equalsIgnoreCase("miniCluster")) {
+			if (execType == ExecType.MAPREDUCE) {
+				cluster = MiniCluster.buildCluster();
+				pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+				fs = cluster.getFileSystem();
+
+			} else {
+				pigServer = new PigServer(ExecType.LOCAL);
+			}
+		}
+	}
+
+	@AfterClass
+	public static void tearDownOnce() throws Exception {
+		pigServer.shutdown();
+		if (strTable1 != null) {
+			BasicTable.drop(new Path(strTable1), conf);
+		}
+		if (strTable1 != null) {
+			BasicTable.drop(new Path(strTable2), conf);
+		}
+		if (strTable1 != null) {
+			BasicTable.drop(new Path(strTable3 + ""), conf);
+		}
+	}
+
+	public String getCurrentMethodName() {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		PrintWriter pw = new PrintWriter(baos);
+		(new Throwable()).printStackTrace(pw);
+		pw.flush();
+		String stackTrace = baos.toString();
+		pw.close();
+
+		StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+		tok.nextToken(); // 'java.lang.Throwable'
+		tok.nextToken(); // 'at ...getCurrentMethodName'
+		String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+		// Parse line 3
+		tok = new StringTokenizer(l.trim(), " <(");
+		String t = tok.nextToken(); // 'at'
+		t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+		StringTokenizer st = new StringTokenizer(t, ".");
+		String methodName = null;
+		while (st.hasMoreTokens()) {
+			methodName = st.nextToken();
+		}
+		return methodName;
+	}
+
+	public static void writeToFile(String inputFile) throws IOException {
+		if (whichCluster.equalsIgnoreCase("miniCluster")) {
+			FileWriter fstream = new FileWriter(inputFile);
+			BufferedWriter out = new BufferedWriter(fstream);
+			out.write("us 2\n");
+			out.write("japan 2\n");
+			out.write("india 4\n");
+			out.write("us 2\n");
+			out.write("japan 1\n");
+			out.write("india 3\n");
+			out.write("nouse 5\n");
+			out.write("nowhere 4\n");
+			out.close();
+		}
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			FSDataOutputStream fout = fs.create(new Path(inputFile));
+			fout.writeBytes("us 2\n");
+			fout.writeBytes("japan 2\n");
+			fout.writeBytes("india 4\n");
+			fout.writeBytes("us 2\n");
+			fout.writeBytes("japan 1\n");
+			fout.writeBytes("india 3\n");
+			fout.writeBytes("nouse 5\n");
+			fout.writeBytes("nowhere 4\n");
+			fout.close();
+		}
+	}
+
+	public Path generateOutPath(String currentMethod) {
+		Path outPath = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			outPath = new Path("/user/" + System.getenv("USER") + "/multiOutput/"
+					+ currentMethod);
+		} else {
+			String workingDir = fs.getWorkingDirectory().toString().split(":")[1];
+			outPath = new Path(workingDir + "/multiOutput/" + currentMethod);
+			System.out.println("output file: " + outPath.toString());
+		}
+		return outPath;
+	}
+
+	public void removeDir(Path outPath) throws IOException {
+		String command = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			command = System.getenv("HADOOP_HOME") + "/bin/hadoop fs -rmr "
+			+ outPath.toString();
+		} else {
+			StringTokenizer st = new StringTokenizer(outPath.toString(), ":");
+			int count = 0;
+			String file = null;
+			while (st.hasMoreElements()) {
+				count++;
+				String token = st.nextElement().toString();
+				if (count == 2)
+					file = token;
+			}
+			command = "rm -rf " + file;
+		}
+		Runtime runtime = Runtime.getRuntime();
+		Process proc = runtime.exec(command);
+		int exitVal = -1;
+		try {
+			exitVal = proc.waitFor();
+		} catch (InterruptedException e) {
+			System.err.println(e);
+		}
+
+	}
+
+	public static void getTablePaths(String myMultiLocs) {
+		StringTokenizer st = new StringTokenizer(myMultiLocs, ",");
+
+		// get how many tokens inside st object
+		System.out.println("tokens count: " + st.countTokens());
+		int count = 0;
+
+		// iterate st object to get more tokens from it
+		while (st.hasMoreElements()) {
+			count++;
+			String token = st.nextElement().toString();
+			if (count == 1)
+				strTable1 = token;
+			if (count == 2)
+				strTable2 = token;
+			if (count == 3)
+				strTable3 = token;
+			System.out.println("token = " + token);
+		}
+	}
+
+	public static void checkTable(String myMultiLocs) throws IOException {
+		System.out.println("myMultiLocs:" + myMultiLocs);
+		System.out.println("sorgetTablePathst key:" + sortKey);
+
+		getTablePaths(myMultiLocs);
+		String query1 = null;
+		String query2 = null;
+
+		if (strTable1 != null) {
+
+			query1 = "records1 = LOAD '" + strTable1
+			+ "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+		}
+		if (strTable2 != null) {
+			query2 = "records2 = LOAD '" + strTable2
+			+ "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+		}
+
+		int count1 = 0;
+		int count2 = 0;
+
+		if (query1 != null) {
+			System.out.println(query1);
+			pigServer.registerQuery(query1);
+			Iterator<Tuple> it = pigServer.openIterator("records1");
+			while (it.hasNext()) {
+				count1++;
+				Tuple RowValue = it.next();
+				System.out.println(RowValue);
+				// test 1 us table
+				if (query1.contains("test1") || query1.contains("test2")
+						|| query1.contains("test3")) {
+
+					if (count1 == 1) {
+						Assert.assertEquals("us", RowValue.get(0));
+						Assert.assertEquals(2, RowValue.get(1));
+					}
+					if (count1 == 2) {
+						Assert.assertEquals("us", RowValue.get(0));
+						Assert.assertEquals(2, RowValue.get(1));
+					}
+				} // test1, test2
+
+			}// while
+			if (query1.contains("test1") || query1.contains("test2")
+					|| query1.contains("test3")) {
+				Assert.assertEquals(2, count1);
+			}
+		}// if query1 != null
+
+		if (query2 != null) {
+			pigServer.registerQuery(query2);
+			Iterator<Tuple> it = pigServer.openIterator("records2");
+
+			while (it.hasNext()) {
+				count2++;
+				Tuple RowValue = it.next();
+				System.out.println(RowValue);
+
+				// if test1 other table
+				if (query2.contains("test1")) {
+					if (count2 == 1) {
+						Assert.assertEquals("india", RowValue.get(0));
+						Assert.assertEquals(3, RowValue.get(1));
+					}
+					if (count2 == 2) {
+						Assert.assertEquals("india", RowValue.get(0));
+						Assert.assertEquals(4, RowValue.get(1));
+					}
+					if (count2 == 3) {
+						Assert.assertEquals("japan", RowValue.get(0));
+						Assert.assertEquals(1, RowValue.get(1));
+					}
+					if (count2 == 4) {
+						Assert.assertEquals("japan", RowValue.get(0));
+						Assert.assertEquals(2, RowValue.get(1));
+					}
+
+					if (count1 == 5) {
+						Assert.assertEquals("nouse", RowValue.get(0));
+						Assert.assertEquals(5, RowValue.get(1));
+					}
+					if (count1 == 6) {
+						Assert.assertEquals("nowhere", RowValue.get(0));
+						Assert.assertEquals(4, RowValue.get(1));
+					}
+				}// if test1
+				// if test2 other table
+				if (query2.contains("test2")) {
+					if (count2 == 1) {
+						Assert.assertEquals("india", RowValue.get(0));
+						Assert.assertEquals(4, RowValue.get(1));
+					}
+					if (count2 == 2) {
+						Assert.assertEquals("india", RowValue.get(0));
+						Assert.assertEquals(3, RowValue.get(1));
+					}
+					if (count2 == 3) {
+						Assert.assertEquals("japan", RowValue.get(0));
+						Assert.assertEquals(2, RowValue.get(1));
+					}
+					if (count2 == 4) {
+						Assert.assertEquals("japan", RowValue.get(0));
+						Assert.assertEquals(1, RowValue.get(1));
+					}
+
+					if (count1 == 5) {
+						Assert.assertEquals("nouse", RowValue.get(0));
+						Assert.assertEquals(5, RowValue.get(1));
+					}
+					if (count1 == 6) {
+						Assert.assertEquals("nowhere", RowValue.get(0));
+						Assert.assertEquals(4, RowValue.get(1));
+					}
+				}// if test2
+				// if test3 other table
+				if (query2.contains("test3")) {
+					if (count2 == 1) {
+						Assert.assertEquals("japan", RowValue.get(0));
+						Assert.assertEquals(1, RowValue.get(1));
+					}
+					if (count2 == 2) {
+						Assert.assertEquals("japan", RowValue.get(0));
+						Assert.assertEquals(2, RowValue.get(1));
+					}
+					if (count2 == 3) {
+						Assert.assertEquals("india", RowValue.get(0));
+						Assert.assertEquals(3, RowValue.get(1));
+					}
+					if (count2 == 4) {
+						Assert.assertEquals("india", RowValue.get(0));
+						Assert.assertEquals(4, RowValue.get(1));
+					}
+					if (count1 == 5) {
+						Assert.assertEquals("nowhere", RowValue.get(0));
+						Assert.assertEquals(4, RowValue.get(1));
+					}
+					if (count1 == 6) {
+						Assert.assertEquals("nouse", RowValue.get(0));
+						Assert.assertEquals(5, RowValue.get(1));
+					}
+
+				}// if test3
+
+			}// while
+			if (query2.contains("test1") || query2.contains("test2")
+					|| query2.contains("test3")) {
+				Assert.assertEquals(6, count2);
+			}
+		}// if query2 != null
+
+	}
+
+	@Test
+	public void test1() throws ParseException, IOException,
+	org.apache.hadoop.zebra.parser.ParseException, Exception {
+		/*
+		 * test positive test case. schema, projection, sortInfo are all good ones.
+		 */
+		System.out.println("******Starttt  testcase: " + getCurrentMethodName());
+		List<Path> paths = new ArrayList<Path>(1);
+
+		sortKey = "word,count";
+		System.out.println("hello sort on word and count");
+		String methodName = getCurrentMethodName();
+		String myMultiLocs = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			myMultiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+					+ methodName + "," + "/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName);
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "us" + methodName)));
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName)));
+		} else {
+			RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+			fs = new LocalFileSystem(rawLFS);
+			myMultiLocs = new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName + "," + fs.getWorkingDirectory() + "/" + "others"
+					+ methodName);
+			paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName)));
+			paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "others"
+					+ methodName)));
+		}
+		getTablePaths(myMultiLocs);
+		removeDir(new Path(strTable1));
+		removeDir(new Path(strTable2));
+		String schema = "word:string, count:int";
+		String storageHint = "[word];[count]";
+		runMR(sortKey, schema, storageHint, paths.toArray(new Path[2]));
+		checkTable(myMultiLocs);
+		System.out.println("DONE test " + getCurrentMethodName());
+
+	}
+
+	@Test(expected = ParseException.class)
+	public void test2() throws ParseException, IOException,
+	org.apache.hadoop.zebra.parser.ParseException, Exception {
+		/*
+		 * test negative test case. wrong schema fomat: schema = "{, count:int";
+		 */
+		System.out.println("******Starttt  testcase: " + getCurrentMethodName());
+		List<Path> paths = new ArrayList<Path>(1);
+
+		sortKey = "word,count";
+		System.out.println("hello sort on word and count");
+		String methodName = getCurrentMethodName();
+		String myMultiLocs = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			myMultiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+					+ methodName + "," + "/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName);
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "us" + methodName)));
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName)));
+
+		} else {
+			RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+			fs = new LocalFileSystem(rawLFS);
+			myMultiLocs = new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName + "," + fs.getWorkingDirectory() + "/" + "others"
+					+ methodName);
+			paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName)));
+			paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "others"
+					+ methodName)));
+
+		}
+		getTablePaths(myMultiLocs);
+		removeDir(new Path(strTable1));
+		removeDir(new Path(strTable2));
+		String schema = "{, count:int";
+		String storageHint = "[word];[count]";
+		runMR(sortKey, schema, storageHint, paths.toArray(new Path[2]));
+
+	}
+
+	@Test(expected = IOException.class)
+	public void test3() throws ParseException, IOException,
+	org.apache.hadoop.zebra.parser.ParseException, Exception {
+		/*
+		 * test negative test case. non-exist sort key
+		 */
+		System.out.println("******Starttt  testcase: " + getCurrentMethodName());
+		List<Path> paths = new ArrayList<Path>(1);
+
+		sortKey = "not exist";
+		System.out.println("hello sort on word and count");
+		String methodName = getCurrentMethodName();
+		String myMultiLocs = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			myMultiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+					+ methodName + "," + "/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName);
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "us" + methodName)));
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName)));
+
+		} else {
+			RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+			fs = new LocalFileSystem(rawLFS);
+			myMultiLocs = new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName + "," + fs.getWorkingDirectory() + "/" + "others"
+					+ methodName);
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "us" + methodName)));
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName)));
+
+		}
+		getTablePaths(myMultiLocs);
+		removeDir(new Path(strTable1));
+		removeDir(new Path(strTable2));
+		String schema = "word:string, count:int";
+		String storageHint = "[word];[count]";
+		runMR(sortKey, schema, storageHint, paths.toArray(new Path[2]));
+
+	}
+
+	@Test(expected = IOException.class)
+	public void test4() throws ParseException, IOException,
+	org.apache.hadoop.zebra.parser.ParseException, Exception {
+		/*
+		 * test negative test case. sort key is empty string
+		 */
+		System.out.println("******Starttt  testcase: " + getCurrentMethodName());
+		List<Path> paths = new ArrayList<Path>(1);
+
+		sortKey = "";
+		System.out.println("hello sort on word and count");
+		String methodName = getCurrentMethodName();
+		String myMultiLocs = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			myMultiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+					+ methodName + "," + "/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName);
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "us" + methodName)));
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName)));
+
+		} else {
+			RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+			fs = new LocalFileSystem(rawLFS);
+			myMultiLocs = new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName + "," + fs.getWorkingDirectory() + "/" + "others"
+					+ methodName);
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "us" + methodName)));
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName)));
+
+		}
+		getTablePaths(myMultiLocs);
+		removeDir(new Path(strTable1));
+		removeDir(new Path(strTable2));
+		String schema = "word:string, count:int";
+		String storageHint = "[word];[count]";
+		runMR(sortKey, schema, storageHint, paths.toArray(new Path[2]));
+
+	}
+
+	@Test(expected = NullPointerException.class)
+	public void test5() throws ParseException, IOException,
+	org.apache.hadoop.zebra.parser.ParseException, Exception {
+		/*
+		 * test negative test case. sort key null
+		 */
+		System.out.println("******Starttt  testcase: " + getCurrentMethodName());
+		List<Path> paths = new ArrayList<Path>(1);
+
+		sortKey = null;
+		System.out.println("hello sort on word and count");
+		String methodName = getCurrentMethodName();
+		String myMultiLocs = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			myMultiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+					+ methodName + "," + "/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName);
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "us" + methodName)));
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName)));
+
+		} else {
+			RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+			fs = new LocalFileSystem(rawLFS);
+			myMultiLocs = new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName + "," + fs.getWorkingDirectory() + "/" + "others"
+					+ methodName);
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "us" + methodName)));
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName)));
+
+		}
+		getTablePaths(myMultiLocs);
+		removeDir(new Path(strTable1));
+		removeDir(new Path(strTable2));
+		String schema = "word:string, count:int";
+		String storageHint = "[word];[count]";
+		runMR(sortKey, schema, storageHint, paths.toArray(new Path[2]));
+
+	}
+
+	@Test(expected = ParseException.class)
+	public void test6() throws ParseException, IOException,
+	org.apache.hadoop.zebra.parser.ParseException, Exception {
+		/*
+		 * test negative test case. storage hint: none exist column
+		 */
+		System.out.println("******Starttt  testcase: " + getCurrentMethodName());
+		List<Path> paths = new ArrayList<Path>(1);
+
+		sortKey = "word,count";
+		System.out.println("hello sort on word and count");
+		String methodName = getCurrentMethodName();
+		String myMultiLocs = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			myMultiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+					+ methodName + "," + "/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName);
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "us" + methodName)));
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName)));
+
+		} else {
+			RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+			fs = new LocalFileSystem(rawLFS);
+			myMultiLocs = new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName + "," + fs.getWorkingDirectory() + "/" + "others"
+					+ methodName);
+			paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName)));
+			paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "others"
+					+ methodName)));
+
+		}
+		getTablePaths(myMultiLocs);
+		removeDir(new Path(strTable1));
+		removeDir(new Path(strTable2));
+		String schema = "word:string, count:int";
+		String storageHint = "[none-exist-column]";
+		runMR(sortKey, schema, storageHint, paths.toArray(new Path[2]));
+
+	}
+
+	@Test(expected = ParseException.class)
+	public void test7() throws ParseException, IOException,
+	org.apache.hadoop.zebra.parser.ParseException, Exception {
+		/*
+		 * test negative test case. storage hint: wrong storage hint format, missing
+		 * [
+		 */
+		System.out.println("******Starttt  testcase: " + getCurrentMethodName());
+		List<Path> paths = new ArrayList<Path>(1);
+
+		sortKey = "word,count";
+		System.out.println("hello sort on word and count");
+		String methodName = getCurrentMethodName();
+		String myMultiLocs = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			myMultiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+					+ methodName + "," + "/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName);
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "us" + methodName)));
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName)));
+
+		} else {
+			RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+			fs = new LocalFileSystem(rawLFS);
+			myMultiLocs = new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName + "," + fs.getWorkingDirectory() + "/" + "others"
+					+ methodName);
+			paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName)));
+			paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "others"
+					+ methodName)));
+
+		}
+		getTablePaths(myMultiLocs);
+		removeDir(new Path(strTable1));
+		removeDir(new Path(strTable2));
+		String schema = "word:string, count:int";
+		String storageHint = "none-exist-column]";
+		runMR(sortKey, schema, storageHint, paths.toArray(new Path[2]));
+
+	}
+
+	@Test(expected = ParseException.class)
+	public void test8() throws ParseException, IOException,
+	org.apache.hadoop.zebra.parser.ParseException, Exception {
+		/*
+		 * test negative test case. schema defines more columns then the input file.
+		 * user input has only two fields
+		 */
+		System.out.println("******Starttt  testcase: " + getCurrentMethodName());
+		List<Path> paths = new ArrayList<Path>(1);
+
+		sortKey = "word,count";
+		System.out.println("hello sort on word and count");
+		String methodName = getCurrentMethodName();
+		String myMultiLocs = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			myMultiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+					+ methodName + "," + "/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName);
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "us" + methodName)));
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName)));
+
+		} else {
+			RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+			fs = new LocalFileSystem(rawLFS);
+			myMultiLocs = new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName + "," + fs.getWorkingDirectory() + "/" + "others"
+					+ methodName);
+			paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName)));
+			paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "others"
+					+ methodName)));
+
+		}
+		getTablePaths(myMultiLocs);
+		removeDir(new Path(strTable1));
+		removeDir(new Path(strTable2));
+		String schema = "word:string, count:int,word:string, count:int";
+		String storageHint = "[word];[count]";
+		runMR(sortKey, schema, storageHint, paths.toArray(new Path[2]));
+
+	}
+
+	@Test(expected = ParseException.class)
+	public void test9() throws ParseException, IOException,
+	org.apache.hadoop.zebra.parser.ParseException, Exception {
+		/*
+		 * test negative test case. data type defined in schema is wrong. it is
+		 * inttt instead of int
+		 */
+		System.out.println("******Starttt  testcase: " + getCurrentMethodName());
+		List<Path> paths = new ArrayList<Path>(1);
+
+		sortKey = "word,count";
+		System.out.println("hello sort on word and count");
+		String methodName = getCurrentMethodName();
+		String myMultiLocs = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			myMultiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+					+ methodName + "," + "/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName);
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "us" + methodName)));
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName)));
+
+		} else {
+			RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+			fs = new LocalFileSystem(rawLFS);
+			myMultiLocs = new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName + "," + fs.getWorkingDirectory() + "/" + "others"
+					+ methodName);
+			paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName)));
+			paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "others"
+					+ methodName)));
+
+		}
+		getTablePaths(myMultiLocs);
+		removeDir(new Path(strTable1));
+		removeDir(new Path(strTable2));
+		String schema = "word:string, count:inttt";
+		String storageHint = "[word];[count]";
+		runMR(sortKey, schema, storageHint, paths.toArray(new Path[2]));
+
+	}
+
+	@Test(expected = ParseException.class)
+	public void test10() throws ParseException, IOException,
+	org.apache.hadoop.zebra.parser.ParseException, Exception {
+		/*
+		 * test negative test case. schema format is wrong, schema is seperated by ;
+		 * instead of ,
+		 */
+		System.out.println("******Starttt  testcase: " + getCurrentMethodName());
+		List<Path> paths = new ArrayList<Path>(1);
+
+		sortKey = "word,count";
+		System.out.println("hello sort on word and count");
+		String methodName = getCurrentMethodName();
+		String myMultiLocs = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			myMultiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+					+ methodName + "," + "/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName);
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "us" + methodName)));
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName)));
+
+		} else {
+			RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+			fs = new LocalFileSystem(rawLFS);
+			myMultiLocs = new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName + "," + fs.getWorkingDirectory() + "/" + "others"
+					+ methodName);
+			paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName)));
+			paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "others"
+					+ methodName)));
+
+		}
+		getTablePaths(myMultiLocs);
+		removeDir(new Path(strTable1));
+		removeDir(new Path(strTable2));
+		String schema = "word:string; count:int";
+		String storageHint = "[word];[count]";
+		runMR(sortKey, schema, storageHint, paths.toArray(new Path[2]));
+		System.out.println("done test 10");
+	}
+
+	static class MapClass extends
+	Mapper<LongWritable, Text, BytesWritable, ZebraTuple> {
+		private BytesWritable bytesKey;
+		private ZebraTuple tupleRow;
+		private Object javaObj;
+
+		@Override
+		public void map(LongWritable key, Text value, Context context)
+		throws IOException, InterruptedException {
+			// value should contain "word count"
+			String[] wdct = value.toString().split(" ");
+			if (wdct.length != 2) {
+				// LOG the error
+				return;
+			}
+
+			byte[] word = wdct[0].getBytes();
+			bytesKey.set(word, 0, word.length);
+			System.out.println("word: " + new String(word));
+			tupleRow.set(0, new String(word));
+			tupleRow.set(1, Integer.parseInt(wdct[1]));
+			System.out.println("count:  " + Integer.parseInt(wdct[1]));
+
+			// This key has to be created by user
+			/*
+			 * Tuple userKey = new DefaultTuple(); userKey.append(new String(word));
+			 * userKey.append(Integer.parseInt(wdct[1]));
+			 */
+			System.out.println("in map, sortkey: " + sortKey);
+			ZebraTuple userKey = new ZebraTuple();
+			if (sortKey.equalsIgnoreCase("word,count")) {
+				userKey.append(new String(word));
+				userKey.append(Integer.parseInt(wdct[1]));
+			}
+
+			if (sortKey.equalsIgnoreCase("count")) {
+				userKey.append(Integer.parseInt(wdct[1]));
+			}
+
+			if (sortKey.equalsIgnoreCase("word")) {
+				userKey.append(new String(word));
+			}
+
+			try {
+
+				/* New M/R Interface */
+				/* Converts user key to zebra BytesWritable key */
+				/* using sort key expr tree */
+				/* Returns a java base object */
+				/* Done for each user key */
+
+				bytesKey = BasicTableOutputFormat.getSortKey(javaObj, userKey);
+			} catch (Exception e) {
+
+			}
+
+			context.write(bytesKey, tupleRow);
+		}
+
+		@Override
+		public void setup(Context context) {
+			bytesKey = new BytesWritable();
+			sortKey = context.getConfiguration().get("sortKey");
+			try {
+				Schema outSchema = BasicTableOutputFormat.getSchema(context);
+				tupleRow = (ZebraTuple) TypesUtils.createTuple(outSchema);
+				javaObj = BasicTableOutputFormat.getSortKeyGenerator(context);
+			} catch (IOException e) {
+				throw new RuntimeException(e);
+			} catch (org.apache.hadoop.zebra.parser.ParseException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+	}
+
+	static class ReduceClass extends
+	Reducer<BytesWritable, Tuple, BytesWritable, Tuple> {
+		Tuple outRow;
+
+		public void reduce(BytesWritable key, Iterator<Tuple> values, Context context)
+		throws IOException, InterruptedException {
+			try {
+				for (; values.hasNext();) {
+					context.write(key, values.next());
+				}
+			} catch (ExecException e) {
+				e.printStackTrace();
+			}
+		}
+
+	}
+
+	static class OutputPartitionerClass extends ZebraOutputPartition {
+
+		@Override
+		public int getOutputPartition(BytesWritable key, Tuple value)
+		throws IndexOutOfBoundsException, ExecException {
+
+			System.out.println("value0 : " + value.get(0));
+			String reg = null;
+			try {
+				reg = (String) (value.get(0));
+			} catch (Exception e) {
+				//
+			}
+
+			if (reg.equals("us"))
+				return 0;
+			else
+				return 1;
+		}
+
+	}
+
+	public static final class MemcmpRawComparator implements
+	RawComparator<Object>, Serializable {
+		@Override
+		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+			return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+		}
+
+		@Override
+		public int compare(Object o1, Object o2) {
+
+			throw new RuntimeException("Object comparison not supported");
+		}
+	}
+
+	public void runMR(String sortKey, String schema, String storageHint,
+			Path... paths) throws ParseException, IOException, Exception,
+			org.apache.hadoop.zebra.parser.ParseException {
+
+		Job job = new Job();
+		job.setJobName("TestTypedAPI");
+		Configuration conf = job.getConfiguration();
+		conf.set("table.output.tfile.compression", "gz");
+		conf.set("sortKey", sortKey);
+		// input settings
+		job.setInputFormatClass(TextInputFormat.class);
+		job.setMapperClass(TestTypedApi.MapClass.class);
+		job.setMapOutputKeyClass(BytesWritable.class);
+		job.setMapOutputValueClass(ZebraTuple.class);
+		FileInputFormat.setInputPaths(job, inputPath);
+
+		// output settings
+
+		job.setOutputFormatClass(BasicTableOutputFormat.class);
+
+		BasicTableOutputFormat.setMultipleOutputs(job,
+				TestTypedApi.OutputPartitionerClass.class, paths);
+		ZebraSchema zSchema = ZebraSchema.createZebraSchema(schema);
+		ZebraStorageHint zStorageHint = ZebraStorageHint
+		.createZebraStorageHint(storageHint);
+		ZebraSortInfo zSortInfo = ZebraSortInfo.createZebraSortInfo(sortKey, null);
+		BasicTableOutputFormat.setStorageInfo(job, zSchema, zStorageHint,
+				zSortInfo);
+		System.out.println("in runMR, sortkey: " + sortKey);
+
+		job.setNumReduceTasks(1);
+		job.submit();
+		job.waitForCompletion( true );
+		BasicTableOutputFormat.close( job );
+	}
+
+	public static void main(String[] args) throws ParseException,
+	org.apache.hadoop.zebra.parser.ParseException, Exception {
+		TestTypedApi test = new TestTypedApi();
+		TestTypedApi.setUpOnce();
+		test.test1();
+		test.test2();
+		test.test3();
+		test.test4();
+		test.test5();
+		test.test6();
+		test.test7();
+		test.test8();
+		test.test9();
+		test.test10();
+	}
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTypedApi2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTypedApi2.java?rev=909667&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTypedApi2.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTypedApi2.java Sat Feb 13 00:06:15 2010
@@ -0,0 +1,693 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.TreeMap;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition;
+import org.apache.hadoop.zebra.mapreduce.ZebraProjection;
+import org.apache.hadoop.zebra.mapreduce.ZebraSchema;
+import org.apache.hadoop.zebra.mapreduce.ZebraSortInfo;
+import org.apache.hadoop.zebra.mapreduce.ZebraStorageHint;
+import org.apache.hadoop.zebra.mapreduce.TestBasicTableIOFormatLocalFS.InvIndex;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.ZebraTuple;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * This is a sample a complete MR sample code for Table. It doens't contain
+ * 'read' part. But, it should be similar and easier to write. Refer to test
+ * cases in the same directory.
+ * 
+ * Assume the input files contain rows of word and count, separated by a space:
+ * 
+ * <pre>
+ * us 2
+ * japan 2
+ * india 4
+ * us 2
+ * japan 1
+ * india 3
+ * nouse 5
+ * nowhere 4
+ * 
+ * 
+ */
+public class TestTypedApi2 {
+	static String inputPath;
+	static String inputFileName = "multi-input.txt";
+	protected static ExecType execType = ExecType.LOCAL;
+	private static MiniCluster cluster;
+	protected static PigServer pigServer;
+	// private static Path pathWorking, pathTable1, path2, path3,
+	// pathTable4, pathTable5;
+	private static Configuration conf;
+	public static String sortKey = null;
+
+	private static FileSystem fs;
+
+	private static String zebraJar;
+	private static String whichCluster;
+	private static String multiLocs;
+	private static String strTable1 = null;
+	private static String strTable2 = null;
+	private static String strTable3 = null;
+
+	@BeforeClass
+	public static void setUpOnce() throws IOException {
+		if (System.getenv("hadoop.log.dir") == null) {
+			String base = new File(".").getPath(); // getAbsolutePath();
+			System
+			.setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+		}
+
+		if (System.getProperty("whichCluster") == null) {
+			System.setProperty("whichCluster", "miniCluster");
+			System.out.println("should be called");
+			whichCluster = System.getProperty("whichCluster");
+		} else {
+			whichCluster = System.getProperty("whichCluster");
+		}
+
+		System.out.println("clusterddddd: " + whichCluster);
+		System.out.println(" get env hadoop home: " + System.getenv("HADOOP_HOME"));
+		System.out.println(" get env user name: " + System.getenv("USER"));
+		if ((whichCluster.equalsIgnoreCase("realCluster") && System
+				.getenv("HADOOP_HOME") == null)) {
+			System.out.println("Please set HADOOP_HOME");
+			System.exit(0);
+		}
+
+		conf = new Configuration();
+
+		if ((whichCluster.equalsIgnoreCase("realCluster") && System.getenv("USER") == null)) {
+			System.out.println("Please set USER");
+			System.exit(0);
+		}
+		zebraJar = System.getenv("HADOOP_HOME") + "/lib/zebra.jar";
+
+		File file = new File(zebraJar);
+		if (!file.exists() && whichCluster.equalsIgnoreCase("realCluster")) {
+			System.out.println("Please put zebra.jar at hadoop_home/lib");
+			System.exit(0);
+		}
+
+		// set inputPath and output path
+		String workingDir = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			inputPath = new String("/user/" + System.getenv("USER") + "/"
+					+ inputFileName);
+			System.out.println("inputPath: " + inputPath);
+			multiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+					+ "," + "/user/" + System.getenv("USER") + "/" + "india" + ","
+					+ "/user/" + System.getenv("USER") + "/" + "japan");
+			fs = new Path(inputPath).getFileSystem(conf);
+		} else {
+			RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+			fs = new LocalFileSystem(rawLFS);
+			workingDir = fs.getWorkingDirectory().toString().split(":")[1];
+			inputPath = new String(workingDir + "/" + inputFileName);
+			System.out.println("inputPath: " + inputPath);
+			multiLocs = new String(workingDir + "/" + "us" + "," + workingDir + "/"
+					+ "india" + "," + workingDir + "/" + "japan");
+		}
+		writeToFile(inputPath);
+		// check inputPath existence
+		File inputFile = new File(inputPath);
+		if (!inputFile.exists() && whichCluster.equalsIgnoreCase("realCluster")) {
+			System.out.println("Please put inputFile in hdfs: " + inputPath);
+			// System.exit(0);
+		}
+		if (!inputFile.exists() && whichCluster.equalsIgnoreCase("miniCluster")) {
+			System.out
+			.println("Please put inputFile under workingdir. working dir is : "
+					+ workingDir);
+			System.exit(0);
+		}
+
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
+					.toProperties(conf));
+			pigServer.registerJar(zebraJar);
+
+		}
+
+		if (whichCluster.equalsIgnoreCase("miniCluster")) {
+			if (execType == ExecType.MAPREDUCE) {
+				cluster = MiniCluster.buildCluster();
+				pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+				fs = cluster.getFileSystem();
+
+			} else {
+				pigServer = new PigServer(ExecType.LOCAL);
+			}
+		}
+	}
+
+	public String getCurrentMethodName() {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		PrintWriter pw = new PrintWriter(baos);
+		(new Throwable()).printStackTrace(pw);
+		pw.flush();
+		String stackTrace = baos.toString();
+		pw.close();
+
+		StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+		tok.nextToken(); // 'java.lang.Throwable'
+		tok.nextToken(); // 'at ...getCurrentMethodName'
+		String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+		// Parse line 3
+		tok = new StringTokenizer(l.trim(), " <(");
+		String t = tok.nextToken(); // 'at'
+		t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+		StringTokenizer st = new StringTokenizer(t, ".");
+		String methodName = null;
+		while (st.hasMoreTokens()) {
+			methodName = st.nextToken();
+		}
+		return methodName;
+	}
+
+	public static void writeToFile(String inputFile) throws IOException {
+		if (whichCluster.equalsIgnoreCase("miniCluster")) {
+			FileWriter fstream = new FileWriter(inputFile);
+			BufferedWriter out = new BufferedWriter(fstream);
+			out.write("us 2\n");
+			out.write("japan 2\n");
+			out.write("india 4\n");
+			out.write("us 2\n");
+			out.write("japan 1\n");
+			out.write("india 3\n");
+			out.write("nouse 5\n");
+			out.write("nowhere 4\n");
+			out.close();
+		}
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			FSDataOutputStream fout = fs.create(new Path(inputFile));
+			fout.writeBytes("us 2\n");
+			fout.writeBytes("japan 2\n");
+			fout.writeBytes("india 4\n");
+			fout.writeBytes("us 2\n");
+			fout.writeBytes("japan 1\n");
+			fout.writeBytes("india 3\n");
+			fout.writeBytes("nouse 5\n");
+			fout.writeBytes("nowhere 4\n");
+			fout.close();
+		}
+	}
+
+	public Path generateOutPath(String currentMethod) {
+		Path outPath = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			outPath = new Path("/user/" + System.getenv("USER") + "/multiOutput/"
+					+ currentMethod);
+		} else {
+			String workingDir = fs.getWorkingDirectory().toString().split(":")[1];
+			outPath = new Path(workingDir + "/multiOutput/" + currentMethod);
+			System.out.println("output file: " + outPath.toString());
+		}
+		return outPath;
+	}
+
+	public void removeDir(Path outPath) throws IOException {
+		String command = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			command = System.getenv("HADOOP_HOME") + "/bin/hadoop fs -rmr "
+			+ outPath.toString();
+		} else {
+			StringTokenizer st = new StringTokenizer(outPath.toString(), ":");
+			int count = 0;
+			String file = null;
+			while (st.hasMoreElements()) {
+				count++;
+				String token = st.nextElement().toString();
+				if (count == 2)
+					file = token;
+			}
+			command = "rm -rf " + file;
+		}
+		Runtime runtime = Runtime.getRuntime();
+		Process proc = runtime.exec(command);
+		int exitVal = -1;
+		try {
+			exitVal = proc.waitFor();
+		} catch (InterruptedException e) {
+			System.err.println(e);
+		}
+
+	}
+
+	public static void getTablePaths(String myMultiLocs) {
+		StringTokenizer st = new StringTokenizer(myMultiLocs, ",");
+
+		// get how many tokens inside st object
+		System.out.println("tokens count: " + st.countTokens());
+		int count = 0;
+
+		// iterate st object to get more tokens from it
+		while (st.hasMoreElements()) {
+			count++;
+			String token = st.nextElement().toString();
+			if (count == 1)
+				strTable1 = token;
+			if (count == 2)
+				strTable2 = token;
+			if (count == 3)
+				strTable3 = token;
+			System.out.println("token = " + token);
+		}
+	}
+
+	public static void checkTable(String myMultiLocs) throws IOException {
+		System.out.println("myMultiLocs:" + myMultiLocs);
+		System.out.println("sorgetTablePathst key:" + sortKey);
+
+		getTablePaths(myMultiLocs);
+		String query1 = null;
+		String query2 = null;
+
+		if (strTable1 != null) {
+
+			query1 = "records1 = LOAD '" + strTable1
+			+ "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+		}
+		if (strTable2 != null) {
+			query2 = "records2 = LOAD '" + strTable2
+			+ "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+		}
+
+		int count1 = 0;
+		int count2 = 0;
+
+		if (query1 != null) {
+			System.out.println(query1);
+			pigServer.registerQuery(query1);
+			Iterator<Tuple> it = pigServer.openIterator("records1");
+			while (it.hasNext()) {
+				count1++;
+				Tuple RowValue = it.next();
+				System.out.println(RowValue);
+				// test 1 us table
+				if (query1.contains("test1") || query1.contains("test2")
+						|| query1.contains("test3")) {
+
+					if (count1 == 1) {
+						Assert.assertEquals("us", RowValue.get(0));
+						Assert.assertEquals(2, RowValue.get(1));
+					}
+					if (count1 == 2) {
+						Assert.assertEquals("us", RowValue.get(0));
+						Assert.assertEquals(2, RowValue.get(1));
+					}
+				} // test1, test2
+
+			}// while
+			if (query1.contains("test1") || query1.contains("test2")
+					|| query1.contains("test3")) {
+				Assert.assertEquals(2, count1);
+			}
+		}// if query1 != null
+
+		if (query2 != null) {
+			pigServer.registerQuery(query2);
+			Iterator<Tuple> it = pigServer.openIterator("records2");
+
+			while (it.hasNext()) {
+				count2++;
+				Tuple RowValue = it.next();
+				System.out.println(RowValue);
+
+				// if test1 other table
+				if (query2.contains("test1")) {
+					if (count2 == 1) {
+						Assert.assertEquals("india", RowValue.get(0));
+						Assert.assertEquals(3, RowValue.get(1));
+					}
+					if (count2 == 2) {
+						Assert.assertEquals("india", RowValue.get(0));
+						Assert.assertEquals(4, RowValue.get(1));
+					}
+					if (count2 == 3) {
+						Assert.assertEquals("japan", RowValue.get(0));
+						Assert.assertEquals(1, RowValue.get(1));
+					}
+					if (count2 == 4) {
+						Assert.assertEquals("japan", RowValue.get(0));
+						Assert.assertEquals(2, RowValue.get(1));
+					}
+
+					if (count1 == 5) {
+						Assert.assertEquals("nouse", RowValue.get(0));
+						Assert.assertEquals(5, RowValue.get(1));
+					}
+					if (count1 == 6) {
+						Assert.assertEquals("nowhere", RowValue.get(0));
+						Assert.assertEquals(4, RowValue.get(1));
+					}
+				}// if test1
+				// if test2 other table
+				if (query2.contains("test2")) {
+					if (count2 == 1) {
+						Assert.assertEquals("india", RowValue.get(0));
+						Assert.assertEquals(4, RowValue.get(1));
+					}
+					if (count2 == 2) {
+						Assert.assertEquals("india", RowValue.get(0));
+						Assert.assertEquals(3, RowValue.get(1));
+					}
+					if (count2 == 3) {
+						Assert.assertEquals("japan", RowValue.get(0));
+						Assert.assertEquals(2, RowValue.get(1));
+					}
+					if (count2 == 4) {
+						Assert.assertEquals("japan", RowValue.get(0));
+						Assert.assertEquals(1, RowValue.get(1));
+					}
+
+					if (count1 == 5) {
+						Assert.assertEquals("nouse", RowValue.get(0));
+						Assert.assertEquals(5, RowValue.get(1));
+					}
+					if (count1 == 6) {
+						Assert.assertEquals("nowhere", RowValue.get(0));
+						Assert.assertEquals(4, RowValue.get(1));
+					}
+				}// if test2
+				// if test3 other table
+				if (query2.contains("test3")) {
+					if (count2 == 1) {
+						Assert.assertEquals("japan", RowValue.get(0));
+						Assert.assertEquals(1, RowValue.get(1));
+					}
+					if (count2 == 2) {
+						Assert.assertEquals("japan", RowValue.get(0));
+						Assert.assertEquals(2, RowValue.get(1));
+					}
+					if (count2 == 3) {
+						Assert.assertEquals("india", RowValue.get(0));
+						Assert.assertEquals(3, RowValue.get(1));
+					}
+					if (count2 == 4) {
+						Assert.assertEquals("india", RowValue.get(0));
+						Assert.assertEquals(4, RowValue.get(1));
+					}
+					if (count1 == 5) {
+						Assert.assertEquals("nowhere", RowValue.get(0));
+						Assert.assertEquals(4, RowValue.get(1));
+					}
+					if (count1 == 6) {
+						Assert.assertEquals("nouse", RowValue.get(0));
+						Assert.assertEquals(5, RowValue.get(1));
+					}
+
+				}// if test3
+
+			}// while
+			if (query2.contains("test1") || query2.contains("test2")
+					|| query2.contains("test3")) {
+				Assert.assertEquals(6, count2);
+			}
+		}// if query2 != null
+
+	}
+
+	@Test
+	public void test1() throws ParseException, IOException,
+	org.apache.hadoop.zebra.parser.ParseException, Exception {
+		/*
+		 * test positive test case. User defined comparator class
+		 */
+		System.out.println("******Starttt  testcase: " + getCurrentMethodName());
+		List<Path> paths = new ArrayList<Path>(3);
+		sortKey = "word,count";
+		System.out.println("hello sort on word and count");
+		String methodName = getCurrentMethodName();
+		String myMultiLocs = null;
+		if (whichCluster.equalsIgnoreCase("realCluster")) {
+			myMultiLocs = new String("/user/" + System.getenv("USER") + "/" + "us"
+					+ methodName + "," + "/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName);
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "us" + methodName)));
+			paths.add(new Path(new String("/user/" + System.getenv("USER") + "/"
+					+ "others" + methodName)));
+		} else {
+			RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+			fs = new LocalFileSystem(rawLFS);
+			myMultiLocs = new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName + "," + fs.getWorkingDirectory() + "/" + "others"
+					+ methodName);
+			paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "us"
+					+ methodName)));
+			paths.add(new Path(new String(fs.getWorkingDirectory() + "/" + "others"
+					+ methodName)));
+		}
+		getTablePaths(myMultiLocs);
+		removeDir(new Path(strTable1));
+		removeDir(new Path(strTable2));
+		String schema = "word:string, count:int";
+		String storageHint = "[word];[count]";
+		String sortInfo = null;
+		runMR(sortKey, schema, storageHint, paths.toArray(new Path[2]));
+		// runMR( sortKey, schema, storageHint, myMultiLocs);
+		checkTable(myMultiLocs);
+		System.out.println("DONE test " + getCurrentMethodName());
+
+	}
+
+	static class MapClass extends
+	Mapper<LongWritable, Text, BytesWritable, ZebraTuple> {
+		private BytesWritable bytesKey;
+		private ZebraTuple tupleRow;
+		private Object javaObj;
+
+		@Override
+		public void map(LongWritable key, Text value, Context context)
+		throws IOException, InterruptedException {
+			// value should contain "word count"
+			String[] wdct = value.toString().split(" ");
+			if (wdct.length != 2) {
+				// LOG the error
+				return;
+			}
+
+			byte[] word = wdct[0].getBytes();
+			bytesKey.set(word, 0, word.length);
+			System.out.println("word: " + new String(word));
+			tupleRow.set(0, new String(word));
+			tupleRow.set(1, Integer.parseInt(wdct[1]));
+			System.out.println("count:  " + Integer.parseInt(wdct[1]));
+
+			// This key has to be created by user
+			/*
+			 * Tuple userKey = new DefaultTuple(); userKey.append(new String(word));
+			 * userKey.append(Integer.parseInt(wdct[1]));
+			 */
+			System.out.println("in map, sortkey: " + sortKey);
+			ZebraTuple userKey = new ZebraTuple();
+			if (sortKey.equalsIgnoreCase("word,count")) {
+				userKey.append(new String(word));
+				userKey.append(Integer.parseInt(wdct[1]));
+			}
+
+			if (sortKey.equalsIgnoreCase("count")) {
+				userKey.append(Integer.parseInt(wdct[1]));
+			}
+
+			if (sortKey.equalsIgnoreCase("word")) {
+				userKey.append(new String(word));
+			}
+
+			try {
+
+				/* New M/R Interface */
+				/* Converts user key to zebra BytesWritable key */
+				/* using sort key expr tree */
+				/* Returns a java base object */
+				/* Done for each user key */
+
+				bytesKey = BasicTableOutputFormat.getSortKey(javaObj, userKey);
+			} catch (Exception e) {
+
+			}
+
+			context.write(bytesKey, tupleRow);
+		}
+
+		@Override
+		public void setup(Context context) {
+			bytesKey = new BytesWritable();
+			sortKey = context.getConfiguration().get("sortKey");
+			try {
+				Schema outSchema = BasicTableOutputFormat.getSchema(context);
+				tupleRow = (ZebraTuple) TypesUtils.createTuple(outSchema);
+				javaObj = BasicTableOutputFormat.getSortKeyGenerator(context);
+			} catch (IOException e) {
+				throw new RuntimeException(e);
+			} catch (org.apache.hadoop.zebra.parser.ParseException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+	}
+
+	static class ReduceClass extends
+	Reducer<BytesWritable, Tuple, BytesWritable, Tuple> {
+		Tuple outRow;
+
+		public void reduce(BytesWritable key, Iterator<Tuple> values, Context context)
+		throws IOException, InterruptedException {
+			try {
+				for (; values.hasNext();) {
+					context.write(key, values.next());
+				}
+			} catch (ExecException e) {
+				e.printStackTrace();
+			}
+		}
+
+	}
+
+	static class OutputPartitionerClass extends ZebraOutputPartition {
+		@Override
+		public int getOutputPartition(BytesWritable key, Tuple value)
+		throws IndexOutOfBoundsException, ExecException {
+
+			System.out.println("value0 : " + value.get(0));
+			String reg = null;
+			try {
+				reg = (String) (value.get(0));
+			} catch (Exception e) {
+				//
+			}
+
+			if (reg.equals("us"))
+				return 0;
+			else
+				return 1;
+		}
+
+	}
+
+	public static final class MemcmpRawComparator implements
+	RawComparator<Object>, Serializable {
+		@Override
+		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+			return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+		}
+
+		@Override
+		public int compare(Object o1, Object o2) {
+
+			throw new RuntimeException("Object comparison not supported");
+		}
+	}
+
+	public void runMR(String sortKey, String schema, String storageHint,
+			Path... paths) throws ParseException, IOException, Exception,
+			org.apache.hadoop.zebra.parser.ParseException {
+
+		Job job = new Job();
+		job.setJobName("TestTypedAPI");
+		Configuration conf = job.getConfiguration();
+		conf.set("table.output.tfile.compression", "gz");
+		conf.set("sortKey", sortKey);
+		// input settings
+		job.setInputFormatClass(TextInputFormat.class);
+		job.setMapperClass(TestTypedApi2.MapClass.class);
+		job.setMapOutputKeyClass(BytesWritable.class);
+		job.setMapOutputValueClass(ZebraTuple.class);
+		FileInputFormat.setInputPaths(job, inputPath);
+
+		// TODO:
+		//job.setNumMapTasks(1);
+
+		// output settings
+
+		job.setOutputFormatClass(BasicTableOutputFormat.class);
+		BasicTableOutputFormat.setMultipleOutputs(job,
+				TestTypedApi2.OutputPartitionerClass.class, paths);
+
+		ZebraSchema zSchema = ZebraSchema.createZebraSchema(schema);
+		ZebraStorageHint zStorageHint = ZebraStorageHint
+		.createZebraStorageHint(storageHint);
+		ZebraSortInfo zSortInfo = ZebraSortInfo.createZebraSortInfo(sortKey,
+				TestTypedApi2.MemcmpRawComparator.class);
+		BasicTableOutputFormat.setStorageInfo(job, zSchema, zStorageHint,
+				zSortInfo);
+		System.out.println("in runMR, sortkey: " + sortKey);
+
+		job.setNumReduceTasks(1);
+		job.submit();
+		job.waitForCompletion( true );
+		BasicTableOutputFormat.close( job );
+	}
+
+	public static void main(String[] args) throws ParseException,
+	org.apache.hadoop.zebra.parser.ParseException, Exception {
+		TestTypedApi2 test = new TestTypedApi2();
+		TestTypedApi2.setUpOnce();
+		test.test1();
+
+	}
+}