You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/11/05 22:03:00 UTC

svn commit: r833166 [4/5] - in /hadoop/pig/trunk/contrib/zebra: ./ src/java/org/apache/hadoop/zebra/ src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/ src/java/org/apache/hadoop/zebra/pig/ src/java/org/apache/hadoop/zebra/pi...

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinNegative.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinNegative.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinNegative.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinNegative.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,552 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestMergeJoinNegative {
+	
+	final static String STR_SCHEMA1 = "a:int,b:float,c:long,d:double,e:string,f:bytes,m1:map(string)";
+	final static String STR_STORAGE1 = "[a, b, c]; [e, f]; [m1#{a}]";
+	final static String STR_SCHEMA2 = "aa:int,bb:float,ee:string";
+	final static String STR_STORAGE2 = "[aa, bb]; [ee]";
+	final static String STR_SCHEMA3 = "a:string,b:int,c:float";
+	final static String STR_STORAGE3 = "[a, b]; [c]";
+	
+	static int fileId = 0;
+	
+	protected static ExecType execType = ExecType.MAPREDUCE;
+	private static MiniCluster cluster;
+	protected static PigServer pigServer;
+	private static Path pathTable1;
+	private static Path pathTable2;
+	private static Path pathTable3;
+	private static Path pathTable4;
+	private static Configuration conf;
+	
+	@BeforeClass
+	public static void setUp() throws Exception {
+		if (System.getProperty("hadoop.log.dir") == null) {
+			String base = new File(".").getPath(); // getAbsolutePath();
+			System.setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+		}
+
+		if (execType == ExecType.MAPREDUCE) {
+			cluster = MiniCluster.buildCluster();
+			pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+		} else {
+			pigServer = new PigServer(ExecType.LOCAL);
+		}
+		
+		conf = new Configuration();
+		FileSystem fs = cluster.getFileSystem();
+		Path pathWorking = fs.getWorkingDirectory();
+		pathTable1 = new Path(pathWorking, "table1");
+		pathTable2 = new Path(pathWorking, "table2");
+		pathTable3 = new Path(pathWorking, "table3");
+		pathTable4 = new Path(pathWorking, "table4");
+		
+		// Create table1 data
+		Map<String, String> m1 = new HashMap<String, String>();
+		m1.put("a","m1-a");
+		m1.put("b","m1-b");
+		
+		Object[][] table1 = {
+			{5,		-3.25f,	1001L,	51e+2,	"Zebra",	new DataByteArray("Zebra"),		m1},
+			{-1,	3.25f,	1000L,	50e+2,	"zebra",	new DataByteArray("zebra"),		m1},
+			{1001,	100.0f,	1000L,	50e+2,	"apple",	new DataByteArray("apple"),		m1},
+			{1002,	28.0f,	1000L,	50e+2,	"hadoop",	new DataByteArray("hadoop"),	m1},
+			{1000,	0.0f,	1002L,	52e+2,	"apple",	new DataByteArray("apple"),		m1} };
+		
+		// Create table1
+		createTable(pathTable1, STR_SCHEMA1, STR_STORAGE1, table1);
+		
+		// Create table2 data
+		Map<String, String> m2 = new HashMap<String, String>();
+		m2.put("a","m2-a");
+		m2.put("b","m2-b");
+		
+		Object[][] table2 = {
+			{15,	56.0f,	1004L,	50e+2,	"green",	new DataByteArray("green"),		m2},
+			{-1,	-99.0f,	1008L,	51e+2,	"orange",	new DataByteArray("orange"),	m2},
+			{1001,	0.0f,	1000L,	55e+2,	"white",	new DataByteArray("white"),		m2},
+			{1001,	-88.0f,	1001L,	52e+2,	"brown",	new DataByteArray("brown"),		m2},
+			{2000,	33.0f,	1002L,	52e+2,	"beige",	new DataByteArray("beige"),		m2} };
+		
+		// Create table2
+		createTable(pathTable2, STR_SCHEMA1, STR_STORAGE1, table2);
+		
+		// Create table3 data
+		Object[][] table3 = {
+			{0,		7.0f,	"grape"},
+			{1001,	8.0f,	"orange"},
+			{-200,	9.0f,	"banana"},
+			{8,		-88.0f,	"peach"} };
+		
+		// Create table3
+		createTable(pathTable3, STR_SCHEMA2, STR_STORAGE2, table3);
+		
+		// Create table4 data
+		Object[][] table4 = {
+			{"grape",	0,		7.0f},
+			{"orange",	1001,	8.0f},
+			{"banana",	-200,	9.0f},
+			{"peach",	8,		-88.0f} };
+		
+		// Create table4
+		createTable(pathTable4, STR_SCHEMA3, STR_STORAGE3, table4);
+		
+		// Load table1
+		String query1 = "table1 = LOAD '" + pathTable1.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+		pigServer.registerQuery(query1);
+		
+		// Load table2
+		String query2 = "table2 = LOAD '" + pathTable2.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+		pigServer.registerQuery(query2);
+		
+		// Load table3
+		String query3 = "table3 = LOAD '" + pathTable3.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+		pigServer.registerQuery(query3);
+		
+		// Load table4
+		String query4 = "table4 = LOAD '" + pathTable4.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+		pigServer.registerQuery(query4);
+	}
+	
+	private static void createTable(Path path, String schemaString, String storageString, Object[][] tableData) throws IOException {
+		// Create table from tableData array
+		BasicTable.Writer writer = new BasicTable.Writer(path, schemaString, storageString, conf);
+		
+		Schema schema = writer.getSchema();
+		Tuple tuple = TypesUtils.createTuple(schema);
+		TableInserter inserter = writer.getInserter("ins", false);
+		
+		for (int i = 0; i < tableData.length; ++i) {
+			TypesUtils.resetTuple(tuple);
+			for (int k = 0; k < tableData[i].length; ++k) {
+				tuple.set(k, tableData[i][k]);
+				System.out.println("DEBUG: setting tuple k=" + k + "value= " + tableData[i][k]);
+			}
+			inserter.insert(new BytesWritable(("key" + i).getBytes()), tuple);
+		}
+		inserter.close();
+		writer.close();
+	}
+	
+	@AfterClass
+	public static void tearDown() throws Exception {
+		pigServer.shutdown();
+	}
+	
+	@Test(expected = IOException.class)
+	public void test_merge_joint_12() throws ExecException, IOException {
+		//
+		// Pig script changes position of join key (negative test)
+		//
+		
+		// Sort tables
+		String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+		pigServer.registerQuery(orderby1);
+		
+		String orderby2 = "sort2 = ORDER table2 BY " + "a" + " ;";
+		pigServer.registerQuery(orderby2);
+		
+		// Store sorted tables
+		++fileId;  // increment filename suffix
+		String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+		pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+		pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		// Load sorted tables
+		String load1 = "records1 = LOAD '" + pathSort1 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load1);
+		
+		String load2 = "records2 = LOAD '" + pathSort2 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load2);
+		
+		// Change position and name of join key
+		String reorder1 = "reorder1 = FOREACH records1 GENERATE b as n2, a as n1, c as c, d as d, e as e, f as f, m1 as m1;";
+		pigServer.registerQuery(reorder1);
+		
+		// Merge tables
+		String join = "joinRecords = JOIN reorder1 BY " + "(" + "n2" + ")" + " , records2 BY " + "("+ "a" + ")" +
+			" USING \"merge\";";    // n2 is wrong data type
+		pigServer.registerQuery(join);
+		
+		Iterator<Tuple> it = pigServer.openIterator("joinRecords");  // get iterator to trigger error
+	}
+	
+	@Test(expected = IOException.class)
+	public void test_merge_joint_13() throws ExecException, IOException {
+		//
+		// Pig script changes sort order (negative test)
+		//
+		
+		// Sort tables
+		String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+		pigServer.registerQuery(orderby1);
+		
+		String orderby2 = "sort2 = ORDER table2 BY " + "a" + " ;";
+		pigServer.registerQuery(orderby2);
+		
+		// Store sorted tables
+		++fileId;  // increment filename suffix
+		String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+		pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+		pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		// Load sorted tables
+		String load1 = "records1 = LOAD '" + pathSort1 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load1);
+		
+		String load2 = "records2 = LOAD '" + pathSort2 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load2);
+		
+		// Change sort order for key
+		String reorder1 = "reorder1 = FOREACH records1 GENERATE a*(-1) as a, b as b, c as c, d as d, e as e, f as f, m1 as m1;";
+		pigServer.registerQuery(reorder1);
+		
+		// Merge tables
+		String join = "joinRecords = JOIN reorder1 BY " + "(" + "a" + ")" + " , records2 BY " + "("+ "a" + ")" +
+			" USING \"merge\";";
+		pigServer.registerQuery(join);
+		
+		Iterator<Tuple> it = pigServer.openIterator("joinRecords");  // get iterator to trigger error
+	}
+	
+	@Test(expected = IOException.class)
+	public void test_merge_joint_14() throws ExecException, IOException {
+		//
+		// Left hand table is not in ascending order (negative test)
+		//
+		
+		// Sort tables
+		String orderby1 = "sort1 = ORDER table1 BY " + "b" + " ;";  // sort left hand table by wrong key
+		pigServer.registerQuery(orderby1);
+		
+		String orderby2 = "sort2 = ORDER table2 BY " + "a" + " ;";
+		pigServer.registerQuery(orderby2);
+		
+		// Store sorted tables
+		++fileId;  // increment filename suffix
+		String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+		pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+		pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		// Load sorted tables
+		String load1 = "records1 = LOAD '" + pathSort1 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load1);
+		
+		String load2 = "records2 = LOAD '" + pathSort2 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load2);
+		
+		// Merge tables
+		String join = "joinRecords = JOIN records1 BY " + "(" + "a" + ")" + " , records2 BY " + "("+ "a" + ")" +
+			" USING \"merge\";";
+		pigServer.registerQuery(join);
+		
+		Iterator<Tuple> it = pigServer.openIterator("joinRecords");  // get iterator to trigger error
+	}
+	
+	@Test(expected = IOException.class)
+	public void test_merge_joint_15() throws ExecException, IOException {
+		//
+		// Right hand table is not in ascending order (negative test)
+		//
+		
+		// Sort tables
+		String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+		pigServer.registerQuery(orderby1);
+		
+		String orderby2 = "sort2 = ORDER table2 BY " + "b" + " ;";  // sort right hand table by wrong key
+		pigServer.registerQuery(orderby2);
+		
+		// Store sorted tables
+		++fileId;  // increment filename suffix
+		String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+		pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+		pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		// Load sorted tables
+		String load1 = "records1 = LOAD '" + pathSort1 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load1);
+		
+		String load2 = "records2 = LOAD '" + pathSort2 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load2);
+		
+		// Merge tables
+		String join = "joinRecords = JOIN records1 BY " + "(" + "a" + ")" + " , records2 BY " + "("+ "a" + ")" +
+			" USING \"merge\";";
+		pigServer.registerQuery(join);
+		
+		Iterator<Tuple> it = pigServer.openIterator("joinRecords");  // get iterator to trigger error
+	}
+	
+	@Test(expected = IOException.class)
+	public void test_merge_joint_16() throws ExecException, IOException {
+		//
+		// More than two input tables (negative test)
+		//
+		
+		// Sort tables
+		String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+		pigServer.registerQuery(orderby1);
+		
+		String orderby2 = "sort2 = ORDER table2 BY " + "a" + " ;";
+		pigServer.registerQuery(orderby2);
+		
+		String orderby3 = "sort3 = ORDER table3 BY " + "aa" + " ;";
+		pigServer.registerQuery(orderby3);
+		
+		// Store sorted tables
+		++fileId;  // increment filename suffix
+		String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+		pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+		pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		String pathSort3 = pathTable3.toString() + Integer.toString(fileId);
+		pigServer.store("sort3", pathSort3, TableStorer.class.getCanonicalName() +
+			"('[aa, bb]; [ee]')");
+		
+		// Load sorted tables
+		String load1 = "records1 = LOAD '" + pathSort1 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load1);
+		
+		String load2 = "records2 = LOAD '" + pathSort2 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load2);
+		
+		String load3 = "records3 = LOAD '" + pathSort3 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('aa, bb, ee', 'sorted');";
+		pigServer.registerQuery(load3);
+		
+		// Merge tables
+		String join = "joinRecords = JOIN records1 BY " + "(" + "a" + ")" + " , records2 BY " + "("+ "a" + ")" +
+			" , records3 BY " + "("+ "aa" + ")" + " USING \"merge\";";  // merge three tables
+		pigServer.registerQuery(join);
+	}
+	
+	@Test(expected = IOException.class)
+	public void test_merge_joint_25() throws ExecException, IOException {
+		//
+		// Two tables do not have common join key (negative test)
+		//
+		
+		// Sort tables
+		String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+		pigServer.registerQuery(orderby1);
+		
+		String orderby3 = "sort3 = ORDER table3 BY " + "aa" + " ;";
+		pigServer.registerQuery(orderby3);
+		
+		// Store sorted tables
+		++fileId;  // increment filename suffix
+		String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+		pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		String pathSort3 = pathTable3.toString() + Integer.toString(fileId);
+		pigServer.store("sort3", pathSort3, TableStorer.class.getCanonicalName() +
+			"('[aa, bb]; [ee]')");
+		
+		// Load sorted tables
+		String load1 = "records1 = LOAD '" + pathSort1 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load1);
+		
+		String load3 = "records3 = LOAD '" + pathSort3 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('aa, bb, ee', 'sorted');";
+		pigServer.registerQuery(load3);
+		
+		// Merge tables
+		String join = "joinRecords = JOIN records1 BY " + "(" + "a" + ")" + " , records3 BY " + "("+ "a" + ")" +
+			" USING \"merge\";";					// sort key a does not exist for records3
+		pigServer.registerQuery(join);
+	}
+	
+	@Test(expected = IOException.class)
+	public void test_merge_joint_26() throws ExecException, IOException {
+		//
+		// Two tables do not have common join key (negative test)
+		//
+		
+		// Sort tables
+		String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+		pigServer.registerQuery(orderby1);
+		
+		String orderby3 = "sort3 = ORDER table3 BY " + "aa" + " ;";
+		pigServer.registerQuery(orderby3);
+		
+		// Store sorted tables
+		++fileId;  // increment filename suffix
+		String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+		pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		String pathSort3 = pathTable3.toString() + Integer.toString(fileId);
+		pigServer.store("sort3", pathSort3, TableStorer.class.getCanonicalName() +
+			"('[aa, bb]; [ee]')");
+		
+		// Load sorted tables
+		String load1 = "records1 = LOAD '" + pathSort1 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load1);
+		
+		String load3 = "records3 = LOAD '" + pathSort3 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('aa, bb, ee', 'sorted');";
+		pigServer.registerQuery(load3);
+		
+		// Merge tables
+		String join = "joinRecords = JOIN records1 BY " + "(" + "aa" + ")" + " , records3 BY " + "("+ "aa" + ")" +
+			" USING \"merge\";";					// sort key aa does not exist for records1
+		pigServer.registerQuery(join);
+	}
+	
+	@Test(expected = IOException.class)
+	public void test_merge_joint_27() throws ExecException, IOException {
+		//
+		// Two tables do not have common join key (negative test)
+		//
+		
+		// Sort tables
+		String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+		pigServer.registerQuery(orderby1);
+		
+		String orderby3 = "sort3 = ORDER table3 BY " + "aa" + " ;";
+		pigServer.registerQuery(orderby3);
+		
+		// Store sorted tables
+		++fileId;  // increment filename suffix
+		String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+		pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		String pathSort3 = pathTable3.toString() + Integer.toString(fileId);
+		pigServer.store("sort3", pathSort3, TableStorer.class.getCanonicalName() +
+			"('[aa, bb]; [ee]')");
+		
+		// Load sorted tables
+		String load1 = "records1 = LOAD '" + pathSort1 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load1);
+		
+		String load3 = "records3 = LOAD '" + pathSort3 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('aa, bb, ee', 'sorted');";
+		pigServer.registerQuery(load3);
+		
+		// Merge tables
+		String join = "joinRecords = JOIN records1 BY " + "(" + "aaa" + ")" + " , records3 BY " + "("+ "aaa" + ")" +
+			" USING \"merge\";";					// sort key aaa does not exist for records1 and records3
+		pigServer.registerQuery(join);
+	}
+	
+	@Test(expected = IOException.class)
+	public void test_merge_joint_28() throws ExecException, IOException {
+		//
+		// Two table key names are the same but the data types are different (negative test)
+		//
+		
+		// Sort tables
+		String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";  // a is float
+		pigServer.registerQuery(orderby1);
+		
+		String orderby4 = "sort4 = ORDER table4 BY " + "a" + " ;";  // a is string
+		pigServer.registerQuery(orderby4);
+		
+		// Store sorted tables
+		++fileId;  // increment filename suffix
+		String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+		pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		String pathSort4 = pathTable4.toString() + Integer.toString(fileId);
+		pigServer.store("sort4", pathSort4, TableStorer.class.getCanonicalName() +
+			"('[a, b]; [c]')");
+		
+		// Load sorted tables
+		String load1 = "records1 = LOAD '" + pathSort1 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load1);
+		
+		String load4 = "records4 = LOAD '" + pathSort4 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c', 'sorted');";
+		pigServer.registerQuery(load4);
+		
+		// Merge tables
+		String join = "joinRecords = JOIN records1 BY " + "(" + "a" + ")" + " , records4 BY " + "("+ "a" + ")" +
+			" USING \"merge\";";					// sort key a is different data type for records1 and records4
+		pigServer.registerQuery(join);
+		
+		Iterator<Tuple> it = pigServer.openIterator("joinRecords");  // get iterator to trigger error
+	}
+	
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinPartial.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinPartial.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinPartial.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinPartial.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.ArrayList;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestMergeJoinPartial {
+	
+	final static String STR_SCHEMA1 = "a:int,b:float,c:long,d:double,e:string,f:bytes,m1:map(string)";
+	final static String STR_STORAGE1 = "[a, b, c]; [e, f]; [m1#{a}]";
+	
+	static int fileId = 0;
+	
+	protected static ExecType execType = ExecType.MAPREDUCE;
+	private static MiniCluster cluster;
+	protected static PigServer pigServer;
+	private static Path pathTable1;
+	private static Path pathTable2;
+	private static Configuration conf;
+	
+	private static Object[][] table1;
+	private static Object[][] table2;
+	
+	@BeforeClass
+	public static void setUp() throws Exception {
+		if (System.getProperty("hadoop.log.dir") == null) {
+			String base = new File(".").getPath(); // getAbsolutePath();
+			System.setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+		}
+
+		if (execType == ExecType.MAPREDUCE) {
+			cluster = MiniCluster.buildCluster();
+			pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+		} else {
+			pigServer = new PigServer(ExecType.LOCAL);
+		}
+		
+		conf = new Configuration();
+		FileSystem fs = cluster.getFileSystem();
+		Path pathWorking = fs.getWorkingDirectory();
+		pathTable1 = new Path(pathWorking, "table1");
+		pathTable2 = new Path(pathWorking, "table2");
+		
+		// Create table1 data
+		Map<String, String> m1 = new HashMap<String, String>();
+		m1.put("a","m1-a");
+		m1.put("b","m1-b");
+		
+		table1 = new Object[][]{
+			{5,		-3.25f,	1001L,	51e+2,	"Zebra",	new DataByteArray("Zebra"),		m1},
+			{-1,	3.25f,	1000L,	50e+2,	"zebra",	new DataByteArray("zebra"),		m1},
+			{1001,	100.0f,	1003L,	50e+2,	"Apple",	new DataByteArray("Apple"),		m1},
+			{1001,	101.0f,	1001L,	50e+2,	"apple",	new DataByteArray("apple"),		m1},
+			{1001,	50.0f,	1000L,	50e+2,	"Pig",		new DataByteArray("Pig"),		m1},
+			{1001,	52.0f,	1001L,	50e+2,	"pig",		new DataByteArray("pig"),		m1},
+			{1002,	28.0f,	1000L,	50e+2,	"Hadoop",	new DataByteArray("Hadoop"),	m1},
+			{1000,	0.0f,	1002L,	52e+2,	"hadoop",	new DataByteArray("hadoop"),	m1} };
+		
+		// Create table1
+		createTable(pathTable1, STR_SCHEMA1, STR_STORAGE1, table1);
+		
+		// Create table2 data
+		Map<String, String> m2 = new HashMap<String, String>();
+		m2.put("a","m2-a");
+		m2.put("b","m2-b");
+		
+		table2 = new Object[][] {
+			{15,	56.0f,	1004L,	50e+2,	"green",	new DataByteArray("green"),		m2},
+			{-1,	-99.0f,	1002L,	51e+2,	"orange",	new DataByteArray("orange"),	m2},
+			{1001,	100.0f,	1003L,	55e+2,	"white",	new DataByteArray("white"),		m2},
+			{1001,	102.0f,	1001L,	52e+2,	"purple",	new DataByteArray("purple"),	m2},
+			{1001,	50.0f,	1008L,	52e+2,	"gray",		new DataByteArray("gray"),		m2},
+			{1001,	53.0f,	1001L,	52e+2,	"brown",	new DataByteArray("brown"),		m2},
+			{2000,	33.0f,	1006L,	52e+2,	"beige",	new DataByteArray("beige"),		m2} };
+		
+		// Create table2
+		createTable(pathTable2, STR_SCHEMA1, STR_STORAGE1, table2);
+		
+		// Load table1
+		String query1 = "table1 = LOAD '" + pathTable1.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+		pigServer.registerQuery(query1);
+		
+		// Load table2
+		String query2 = "table2 = LOAD '" + pathTable2.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+		pigServer.registerQuery(query2);
+		
+	}
+	
+	private static void createTable(Path path, String schemaString, String storageString, Object[][] tableData)
+			throws IOException {
+		//
+		// Create table from tableData array
+		//
+		BasicTable.Writer writer = new BasicTable.Writer(path, schemaString, storageString, conf);
+		
+		Schema schema = writer.getSchema();
+		Tuple tuple = TypesUtils.createTuple(schema);
+		TableInserter inserter = writer.getInserter("ins", false);
+		
+		for (int i = 0; i < tableData.length; ++i) {
+			TypesUtils.resetTuple(tuple);
+			for (int k = 0; k < tableData[i].length; ++k) {
+				tuple.set(k, tableData[i][k]);
+				System.out.println("DEBUG: setting tuple k=" + k + "value= " + tableData[i][k]);
+			}
+			inserter.insert(new BytesWritable(("key" + i).getBytes()), tuple);
+		}
+		inserter.close();
+		writer.close();
+	}
+	
+	@AfterClass
+	public static void tearDown() throws Exception {
+		pigServer.shutdown();
+	}
+	
+	
+	@Test
+	public void test_merge_joint_17() throws ExecException, IOException {
+		//
+		// Multiple join where join keys are partial, and the order of keys is honored
+		//
+		
+		// Sort tables
+		String orderby1 = "sort1 = ORDER table1 BY " + "a,b,c" + " ;";
+		pigServer.registerQuery(orderby1);
+		
+		String orderby2 = "sort2 = ORDER table2 BY " + "a,b,c" + " ;";
+		pigServer.registerQuery(orderby2);
+		
+		// Store sorted tables
+		++fileId;  // increment filename suffix
+		String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+		pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+		pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		// Load sorted tables
+		String load1 = "records1 = LOAD '" + pathSort1 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load1);
+		
+		String load2 = "records2 = LOAD '" + pathSort2 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load2);
+		
+		// Merge tables
+		String join = "joinRecords = JOIN records1 BY " + "(" + "a,b,c" + ")" + " , records2 BY " + "("+ "a,b,c" + ")" +
+			" USING \"merge\";";
+		pigServer.registerQuery(join);
+		
+		// Verify merged tables
+		ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+		
+		addResultRow(resultTable, table1[2], table2[2]);  // set expected values for row1
+		
+		Iterator<Tuple> it = pigServer.openIterator("joinRecords");
+		verifyTable(resultTable, it);
+	}
+	
+	@Test
+	public void test_merge_joint_22() throws ExecException, IOException {
+		//
+		// Multiple join where join keys are partial, and the order of keys is honored
+		//
+		// Known bug with partial key join
+		// - Need to add verification to this test once bug is fixed
+		
+		// Sort tables
+		String orderby1 = "sort1 = ORDER table1 BY " + "a,b,c" + " ;";
+		pigServer.registerQuery(orderby1);
+		
+		String orderby2 = "sort2 = ORDER table2 BY " + "a,b,c" + " ;";
+		pigServer.registerQuery(orderby2);
+		
+		// Store sorted tables
+		++fileId;  // increment filename suffix
+		String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+		pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+		pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		// Load sorted tables
+		String load1 = "records1 = LOAD '" + pathSort1 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load1);
+		
+		String load2 = "records2 = LOAD '" + pathSort2 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load2);
+		
+		// Merge tables
+		String join = "joinRecords = JOIN records1 BY " + "(" + "a,b" + ")" + " , records2 BY " + "("+ "a,b" + ")" +
+			" USING \"merge\";";
+		pigServer.registerQuery(join);
+		
+		printTable("joinRecords");
+	}
+	
+	@Test
+	public void test_merge_joint_23() throws ExecException, IOException {
+		//
+		// Multiple join where join keys are partial, and the order of keys is honored
+		//
+		// Known bug with partial key join
+		// - Need to add verification to this test once bug is fixed
+		
+		// Sort tables
+		String orderby1 = "sort1 = ORDER table1 BY " + "a,b,c" + " ;";
+		pigServer.registerQuery(orderby1);
+		
+		String orderby2 = "sort2 = ORDER table2 BY " + "a,b,c" + " ;";
+		pigServer.registerQuery(orderby2);
+		
+		// Store sorted tables
+		++fileId;  // increment filename suffix
+		String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+		pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+		pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		// Load sorted tables
+		String load1 = "records1 = LOAD '" + pathSort1 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load1);
+		
+		String load2 = "records2 = LOAD '" + pathSort2 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load2);
+		
+		// Merge tables
+		String join = "joinRecords = JOIN records1 BY " + "(" + "a" + ")" + " , records2 BY " + "("+ "a" + ")" +
+			" USING \"merge\";";
+		pigServer.registerQuery(join);
+		
+		printTable("joinRecords");
+	}
+	
+	@Test(expected = IOException.class)
+	public void test_merge_joint_24() throws ExecException, IOException {
+		//
+		// Multiple join where join keys are partial, and the order of keys is honored
+		// (negative test)
+		
+		// Sort tables
+		String orderby1 = "sort1 = ORDER table1 BY " + "a,b,c" + " ;";
+		pigServer.registerQuery(orderby1);
+		
+		String orderby2 = "sort2 = ORDER table2 BY " + "a,b,c" + " ;";
+		pigServer.registerQuery(orderby2);
+		
+		// Store sorted tables
+		++fileId;  // increment filename suffix
+		String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+		pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+		pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+			"('[a, b, c]; [d, e, f, m1]')");
+		
+		// Load sorted tables
+		String load1 = "records1 = LOAD '" + pathSort1 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load1);
+		
+		String load2 = "records2 = LOAD '" + pathSort2 +
+			"' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+		pigServer.registerQuery(load2);
+		
+		// Merge tables
+		String join = "joinRecords = JOIN records1 BY " + "(" + "a,c" + ")" + " , records2 BY " + "("+ "a,c" + ")" +
+			" USING \"merge\";";
+		pigServer.registerQuery(join);
+		
+		Iterator<Tuple> it = pigServer.openIterator("joinRecords");  // get iterator to trigger error
+	}
+	
+	public void printTable(String tablename) throws IOException {
+		//
+		// Print Pig Table (for debugging)
+		//
+		Iterator<Tuple> it1 = pigServer.openIterator(tablename);
+		Tuple RowValue1 = null;
+		while (it1.hasNext()) {
+			RowValue1 = it1.next();
+			System.out.println();
+			
+			for (int i = 0; i < RowValue1.size(); ++i) {
+				System.out.println("DEBUG: " + tablename + " RowValue.get(" + i + ") = " + RowValue1.get(i));
+				
+			}
+		}
+	}
+	
+	public void addResultRow(ArrayList<ArrayList<Object>> resultTable, Object[] leftRow, Object[] rightRow) {
+		//
+		// Add a row to expected results table
+		//
+		ArrayList<Object> resultRow = new ArrayList<Object>();
+		
+		for (int i=0; i<leftRow.length; ++i)
+			resultRow.add(leftRow[i]);
+		for (int i=0; i<rightRow.length; ++i)
+			resultRow.add(rightRow[i]);
+		
+		resultTable.add(resultRow);
+	}
+	
+	public void verifyTable(ArrayList<ArrayList<Object>> resultTable, Iterator<Tuple> it) throws IOException {
+		//
+		// Verify expected results table to returned test case table
+		//
+		Tuple RowValues;
+		int rowIndex = 0;
+		
+		while (it.hasNext()) {
+			RowValues = it.next();
+			ArrayList<Object> resultRow = resultTable.get(rowIndex);
+			Assert.assertEquals(resultRow.size(), RowValues.size());  // verify expected tuple count
+			System.out.println();
+			
+			for (int i = 0; i < RowValues.size(); ++i) {
+				System.out.println("DEBUG: resultTable " + " RowValue.get(" + i + ") = " + RowValues.get(i) +
+						" " + resultRow.get(i));
+				Assert.assertEquals(resultRow.get(i), RowValues.get(i));  // verify each row value
+			}
+			++rowIndex;
+		}
+		Assert.assertEquals(resultTable.size(), rowIndex);  // verify expected row count
+	}
+	
+}

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java Thu Nov  5 21:02:57 2009
@@ -93,7 +93,7 @@
     System.out.println("path =" + path);
 
     BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
-        STR_STORAGE, false, conf);
+        STR_STORAGE, conf);
     writer.finish();
 
     Schema schema = writer.getSchema();

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java Thu Nov  5 21:02:57 2009
@@ -100,7 +100,7 @@
     System.out.println("path =" + path);
 
     BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
-        STR_STORAGE, false, conf);
+        STR_STORAGE, conf);
     Schema schema = writer.getSchema();
 
     BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ * 
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ * 
+ */
+public class TestSortedTableUnion {
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path pathTable;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (execType == ExecType.MAPREDUCE) {
+      cluster = MiniCluster.buildCluster();
+      pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    } else {
+      pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    Configuration conf = new Configuration();
+    FileSystem fs = cluster.getFileSystem();
+    Path pathWorking = fs.getWorkingDirectory();
+    pathTable = new Path(pathWorking, "TestTableStorer");
+    System.out.println("pathTable =" + pathTable);
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+        "SF_a:string,SF_b:string,SF_c,SF_d,SF_e,SF_f,SF_g",
+        "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", conf);
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    final int numsBatch = 10;
+    final int numsInserters = 1;
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+        for (int k = 0; k < tuple.size(); ++k) {
+          try {
+            tuple.set(k, (9-b) + "_" + i + "" + k);
+          } catch (ExecException e) {
+            e.printStackTrace();
+          }
+        }
+        inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+
+  /**
+   * Return the name of the routine that called getCurrentMethodName
+   * 
+   */
+  public String getCurrentMethodName() {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter pw = new PrintWriter(baos);
+    (new Throwable()).printStackTrace(pw);
+    pw.flush();
+    String stackTrace = baos.toString();
+    pw.close();
+
+    StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+    tok.nextToken(); // 'java.lang.Throwable'
+    tok.nextToken(); // 'at ...getCurrentMethodName'
+    String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+    // Parse line 3
+    tok = new StringTokenizer(l.trim(), " <(");
+    String t = tok.nextToken(); // 'at'
+    t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+    return t;
+  }
+
+  @Test
+  public void testStorer() throws ExecException, IOException {
+    /*
+     * Use pig LOAD to load testing data for store
+     */
+    String query = "records = LOAD '" + pathTable.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query);
+
+    /*
+    Iterator<Tuple> it2 = pigServer.openIterator("records");
+    int row0 = 0;
+    Tuple RowValue2 = null;
+    while (it2.hasNext()) {
+      // Last row value
+      RowValue2 = it2.next();
+      row0++;
+      if (row0 == 10) {
+        Assert.assertEquals("0_01", RowValue2.get(1));
+        Assert.assertEquals("0_00", RowValue2.get(0));
+      }
+    }
+    Assert.assertEquals(10, row0);
+    */
+
+    String orderby = "srecs = ORDER records BY SF_a;";
+    pigServer.registerQuery(orderby);
+
+    /*
+     * Use pig STORE to store testing data BasicTable.Writer writer = new
+     * BasicTable.Writer(pathTable, "SF_a,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g",
+     * "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", false, conf);
+     */
+    Path newPath = new Path(getCurrentMethodName());
+
+    /*
+     * Table1 creation
+     */
+    pigServer
+        .store(
+            "srecs",
+            newPath.toString()+"1",
+            TableStorer.class.getCanonicalName()
+                + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+    pigServer
+        .store(
+            "srecs",
+            newPath.toString()+"2",
+            TableStorer.class.getCanonicalName()
+                + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+
+
+    String query4 = "records2 = LOAD '"
+        + newPath.toString() + "1,"
+        + newPath.toString() + "2"
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('', 'sorted');";
+    pigServer.registerQuery(query4);
+
+    // check JOIN content
+    Iterator<Tuple> it3 = pigServer.openIterator("records2");
+    int row = 0, index;
+    Tuple RowValue3 = null;
+    while (it3.hasNext()) {
+      // Last row value
+      RowValue3 = it3.next();
+      Assert.assertEquals(7, RowValue3.size());
+      row++;
+      index = (row-1)/2;
+      Assert.assertEquals(index+"_01", RowValue3.get(1));
+      Assert.assertEquals(index+"_00", RowValue3.get(0));
+      Assert.assertEquals(index+"_06", RowValue3.get(6));
+      Assert.assertEquals(index+"_05", RowValue3.get(5));
+      Assert.assertEquals(index+"_04", RowValue3.get(4));
+      Assert.assertEquals(index+"_03", RowValue3.get(3));
+      Assert.assertEquals(index+"_02", RowValue3.get(2));
+    }
+    Assert.assertEquals(20, row);
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionMergeJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionMergeJoin.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionMergeJoin.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionMergeJoin.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ * 
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ * 
+ */
+public class TestSortedTableUnionMergeJoin {
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path pathTable;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (execType == ExecType.MAPREDUCE) {
+      cluster = MiniCluster.buildCluster();
+      pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    } else {
+      pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    Configuration conf = new Configuration();
+    FileSystem fs = cluster.getFileSystem();
+    Path pathWorking = fs.getWorkingDirectory();
+    pathTable = new Path(pathWorking, "TestTableStorer");
+    System.out.println("pathTable =" + pathTable);
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+        "SF_a:string,SF_b:string,SF_c,SF_d,SF_e,SF_f,SF_g",
+        "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", conf);
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    final int numsBatch = 10;
+    final int numsInserters = 1;
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+        for (int k = 0; k < tuple.size(); ++k) {
+          try {
+            tuple.set(k, (9-b) + "_" + i + "" + k);
+          } catch (ExecException e) {
+            e.printStackTrace();
+          }
+        }
+        inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+
+  /**
+   * Return the name of the routine that called getCurrentMethodName
+   * 
+   */
+  public String getCurrentMethodName() {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter pw = new PrintWriter(baos);
+    (new Throwable()).printStackTrace(pw);
+    pw.flush();
+    String stackTrace = baos.toString();
+    pw.close();
+
+    StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+    tok.nextToken(); // 'java.lang.Throwable'
+    tok.nextToken(); // 'at ...getCurrentMethodName'
+    String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+    // Parse line 3
+    tok = new StringTokenizer(l.trim(), " <(");
+    String t = tok.nextToken(); // 'at'
+    t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+    return t;
+  }
+
+  @Test
+  public void testStorer() throws ExecException, IOException {
+    /*
+     * Use pig LOAD to load testing data for store
+     */
+    String query = "records = LOAD '" + pathTable.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query);
+
+    /*
+    Iterator<Tuple> it2 = pigServer.openIterator("records");
+    int row0 = 0;
+    Tuple RowValue2 = null;
+    while (it2.hasNext()) {
+      // Last row value
+      RowValue2 = it2.next();
+      row0++;
+      if (row0 == 10) {
+        Assert.assertEquals("0_01", RowValue2.get(1));
+        Assert.assertEquals("0_00", RowValue2.get(0));
+      }
+    }
+    Assert.assertEquals(10, row0);
+    */
+
+    String orderby = "srecs = ORDER records BY SF_a;";
+    pigServer.registerQuery(orderby);
+
+    /*
+     * Use pig STORE to store testing data BasicTable.Writer writer = new
+     * BasicTable.Writer(pathTable, "SF_a,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g",
+     * "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", false, conf);
+     */
+    Path newPath = new Path(getCurrentMethodName());
+
+    /*
+     * Table1 creation
+     */
+    pigServer
+        .store(
+            "srecs",
+            newPath.toString()+"1",
+            TableStorer.class.getCanonicalName()
+                + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+    pigServer
+        .store(
+            "srecs",
+            newPath.toString()+"2",
+            TableStorer.class.getCanonicalName()
+                + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+
+    String query3 = "records1 = LOAD '"
+        + newPath.toString() + "1"
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('', 'sorted');";
+    pigServer.registerQuery(query3);
+
+    String query4 = "records2 = LOAD '"
+        + newPath.toString() + "1,"
+        + newPath.toString() + "2"
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('', 'sorted');";
+    pigServer.registerQuery(query4);
+
+    String join = "joinRecords = JOIN records1 BY SF_a, records2 BY SF_a USING \"merge\";";
+    pigServer.registerQuery(join);
+
+    // check JOIN content
+    Iterator<Tuple> it3 = pigServer.openIterator("records2");
+    int row = 0, index;
+    Tuple RowValue3 = null;
+    while (it3.hasNext()) {
+      // Last row value
+      RowValue3 = it3.next();
+      Assert.assertEquals(7, RowValue3.size());
+      row++;
+      index = (row-1)/2;
+      Assert.assertEquals(index+"_01", RowValue3.get(1));
+      Assert.assertEquals(index+"_00", RowValue3.get(0));
+      Assert.assertEquals(index+"_06", RowValue3.get(6));
+      Assert.assertEquals(index+"_05", RowValue3.get(5));
+      Assert.assertEquals(index+"_04", RowValue3.get(4));
+      Assert.assertEquals(index+"_03", RowValue3.get(3));
+      Assert.assertEquals(index+"_02", RowValue3.get(2));
+    }
+    Assert.assertEquals(20, row);
+  }
+}

Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java Thu Nov  5 21:02:57 2009
@@ -76,7 +76,7 @@
     System.out.println("pathTable =" + pathTable);
 
     BasicTable.Writer writer = new BasicTable.Writer(pathTable,
-        "a:string,b,c:string,d,e,f,g", "[a,b,c];[d,e,f,g]", false, conf);
+        "a:string,b,c:string,d,e,f,g", "[a,b,c];[d,e,f,g]", conf);
     Schema schema = writer.getSchema();
     Tuple tuple = TypesUtils.createTuple(schema);
 

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoin.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoin.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoin.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ * 
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ * 
+ */
+public class TestTableMergeJoin {
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path pathTable;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (execType == ExecType.MAPREDUCE) {
+      cluster = MiniCluster.buildCluster();
+      pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    } else {
+      pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    Configuration conf = new Configuration();
+    FileSystem fs = cluster.getFileSystem();
+    Path pathWorking = fs.getWorkingDirectory();
+    pathTable = new Path(pathWorking, "TestTableStorer");
+    System.out.println("pathTable =" + pathTable);
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+        "SF_a:string,SF_b:string,SF_c,SF_d,SF_e,SF_f,SF_g",
+        "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", conf);
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    final int numsBatch = 10;
+    final int numsInserters = 1;
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+        for (int k = 0; k < tuple.size(); ++k) {
+          try {
+            tuple.set(k, (9-b) + "_" + i + "" + k);
+          } catch (ExecException e) {
+            e.printStackTrace();
+          }
+        }
+        inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+
+  /**
+   * Return the name of the routine that called getCurrentMethodName
+   * 
+   */
+  public String getCurrentMethodName() {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter pw = new PrintWriter(baos);
+    (new Throwable()).printStackTrace(pw);
+    pw.flush();
+    String stackTrace = baos.toString();
+    pw.close();
+
+    StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+    tok.nextToken(); // 'java.lang.Throwable'
+    tok.nextToken(); // 'at ...getCurrentMethodName'
+    String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+    // Parse line 3
+    tok = new StringTokenizer(l.trim(), " <(");
+    String t = tok.nextToken(); // 'at'
+    t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+    return t;
+  }
+
+  @Test
+  public void testStorer() throws ExecException, IOException {
+    /*
+     * Use pig LOAD to load testing data for store
+     */
+    String query = "records = LOAD '" + pathTable.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query);
+
+    /*
+    Iterator<Tuple> it2 = pigServer.openIterator("records");
+    int row0 = 0;
+    Tuple RowValue2 = null;
+    while (it2.hasNext()) {
+      // Last row value
+      RowValue2 = it2.next();
+      row0++;
+      if (row0 == 10) {
+        Assert.assertEquals("0_01", RowValue2.get(1));
+        Assert.assertEquals("0_00", RowValue2.get(0));
+      }
+    }
+    Assert.assertEquals(10, row0);
+    */
+
+    String orderby = "srecs = ORDER records BY SF_a;";
+    pigServer.registerQuery(orderby);
+
+    /*
+     * Use pig STORE to store testing data BasicTable.Writer writer = new
+     * BasicTable.Writer(pathTable, "SF_a,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g",
+     * "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", false, conf);
+     */
+    Path newPath = new Path(getCurrentMethodName());
+
+    /*
+     * Table1 creation
+     */
+    pigServer
+        .store(
+            "srecs",
+            newPath.toString()+"1",
+            TableStorer.class.getCanonicalName()
+                + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+
+    String query3 = "records1 = LOAD '"
+        + newPath.toString() + "1"
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('', 'sorted');";
+    pigServer.registerQuery(query3);
+
+    /*
+    Iterator<Tuple> it3 = pigServer.openIterator("records1");
+    int row = 0;
+    Tuple RowValue3 = null;
+    while (it3.hasNext()) {
+      // Last row value
+      RowValue3 = it3.next();
+      Assert.assertEquals(2, RowValue3.size());
+      row++;
+      if (row == 10) {
+        Assert.assertEquals("9_01", RowValue3.get(1));
+        Assert.assertEquals("9_00", RowValue3.get(0));
+      }
+    }
+    Assert.assertEquals(10, row);
+    */
+    
+    /*
+     * Table2 creation
+     */
+    pigServer
+        .store(
+            "srecs",
+            newPath.toString()+"2",
+            TableStorer.class.getCanonicalName()
+                + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+
+
+    String query4 = "records2 = LOAD '"
+        + newPath.toString() + "2"
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query4);
+
+    String join = "joinRecords = JOIN records1 BY SF_a, records2 BY SF_a USING \"merge\";";
+    pigServer.registerQuery(join);
+    // check JOIN content
+    Iterator<Tuple> it3 = pigServer.openIterator("joinRecords");
+    int row = 0;
+    Tuple RowValue3 = null;
+    while (it3.hasNext()) {
+      // Last row value
+      RowValue3 = it3.next();
+      Assert.assertEquals(14, RowValue3.size());
+      row++;
+      if (row == 10) {
+        Assert.assertEquals("9_01", RowValue3.get(1));
+        Assert.assertEquals("9_00", RowValue3.get(0));
+        Assert.assertEquals("9_01", RowValue3.get(8));
+        Assert.assertEquals("9_00", RowValue3.get(7));
+        Assert.assertEquals("9_06", RowValue3.get(6));
+        Assert.assertEquals("9_05", RowValue3.get(5));
+        Assert.assertEquals("9_04", RowValue3.get(4));
+        Assert.assertEquals("9_03", RowValue3.get(3));
+        Assert.assertEquals("9_02", RowValue3.get(2));
+      }
+    }
+    Assert.assertEquals(10, row);
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinAfterFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinAfterFilter.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinAfterFilter.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinAfterFilter.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ * 
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ * 
+ */
+public class TestTableMergeJoinAfterFilter {
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path pathTable;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (execType == ExecType.MAPREDUCE) {
+      cluster = MiniCluster.buildCluster();
+      pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    } else {
+      pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    Configuration conf = new Configuration();
+    FileSystem fs = cluster.getFileSystem();
+    Path pathWorking = fs.getWorkingDirectory();
+    pathTable = new Path(pathWorking, "TestTableStorer");
+    System.out.println("pathTable =" + pathTable);
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+        "SF_a:string,SF_b:string,SF_c,SF_d,SF_e,SF_f,SF_g",
+        "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", conf);
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    final int numsBatch = 10;
+    final int numsInserters = 1;
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+        for (int k = 0; k < tuple.size(); ++k) {
+          try {
+            tuple.set(k, (9-b) + "_" + i + "" + k);
+          } catch (ExecException e) {
+            e.printStackTrace();
+          }
+        }
+        inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+
+  /**
+   * Return the name of the routine that called getCurrentMethodName
+   * 
+   */
+  public String getCurrentMethodName() {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter pw = new PrintWriter(baos);
+    (new Throwable()).printStackTrace(pw);
+    pw.flush();
+    String stackTrace = baos.toString();
+    pw.close();
+
+    StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+    tok.nextToken(); // 'java.lang.Throwable'
+    tok.nextToken(); // 'at ...getCurrentMethodName'
+    String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+    // Parse line 3
+    tok = new StringTokenizer(l.trim(), " <(");
+    String t = tok.nextToken(); // 'at'
+    t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+    return t;
+  }
+
+  @Test
+  public void testStorer() throws ExecException, IOException {
+    /*
+     * Use pig LOAD to load testing data for store
+     */
+    String query = "records = LOAD '" + pathTable.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query);
+
+    /*
+    Iterator<Tuple> it2 = pigServer.openIterator("records");
+    int row0 = 0;
+    Tuple RowValue2 = null;
+    while (it2.hasNext()) {
+      // Last row value
+      RowValue2 = it2.next();
+      row0++;
+      if (row0 == 10) {
+        Assert.assertEquals("0_01", RowValue2.get(1));
+        Assert.assertEquals("0_00", RowValue2.get(0));
+      }
+    }
+    Assert.assertEquals(10, row0);
+    */
+
+    String orderby = "srecs = ORDER records BY SF_a;";
+    pigServer.registerQuery(orderby);
+
+    /*
+     * Use pig STORE to store testing data BasicTable.Writer writer = new
+     * BasicTable.Writer(pathTable, "SF_a,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g",
+     * "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", false, conf);
+     */
+    Path newPath = new Path(getCurrentMethodName());
+
+    /*
+     * Table1 creation
+     */
+    pigServer
+        .store(
+            "srecs",
+            newPath.toString()+"1",
+            TableStorer.class.getCanonicalName()
+                + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+
+    String query3 = "records1 = LOAD '"
+        + newPath.toString() + "1"
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('SF_a, SF_b', 'sorted');";
+    pigServer.registerQuery(query3);
+
+    /*
+     * Table2 creation
+     */
+    pigServer
+        .store(
+            "srecs",
+            newPath.toString()+"2",
+            TableStorer.class.getCanonicalName()
+                + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+
+
+    String query4 = "records2 = LOAD '"
+        + newPath.toString() + "2"
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query4);
+
+    String filter = "records3 = FILTER records2 BY SF_a > '4';";
+    pigServer.registerQuery(filter);
+
+    Iterator<Tuple> it2 = pigServer.openIterator("records3");
+    int row2 = 0;
+    Tuple RowValue2 = null;
+    while (it2.hasNext()) {
+      // Last row value
+      RowValue2 = it2.next();
+      Assert.assertEquals(7, RowValue2.size());
+      row2++;
+      if (row2 == 5) {
+        Assert.assertEquals("8_01", RowValue2.get(1));
+        Assert.assertEquals("8_00", RowValue2.get(0));
+      }
+    }
+    Assert.assertEquals(6, row2);
+    
+    String join = "joinRecords = JOIN records1 BY SF_a, records3 BY SF_a USING \"merge\";";
+    pigServer.registerQuery(join);
+    // check JOIN content
+    Iterator<Tuple> it3 = pigServer.openIterator("joinRecords");
+    int row = 0;
+    Tuple RowValue3 = null;
+    while (it3.hasNext()) {
+      // Last row value
+      RowValue3 = it3.next();
+      Assert.assertEquals(9, RowValue3.size());
+      row++;
+      if (row == 6) {
+        Assert.assertEquals("9_01", RowValue3.get(1));
+        Assert.assertEquals("9_00", RowValue3.get(0));
+        Assert.assertEquals("9_06", RowValue3.get(8));
+        Assert.assertEquals("9_05", RowValue3.get(7));
+        Assert.assertEquals("9_04", RowValue3.get(6));
+        Assert.assertEquals("9_03", RowValue3.get(5));
+        Assert.assertEquals("9_02", RowValue3.get(4));
+        Assert.assertEquals("9_01", RowValue3.get(3));
+        Assert.assertEquals("9_00", RowValue3.get(2));
+      }
+    }
+    Assert.assertEquals(6, row);
+  }
+}

Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinFloat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinFloat.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinFloat.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinFloat.java Thu Nov  5 21:02:57 2009
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ * 
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ * 
+ */
+public class TestTableMergeJoinFloat {
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path pathTable;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    if (execType == ExecType.MAPREDUCE) {
+      cluster = MiniCluster.buildCluster();
+      pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    } else {
+      pigServer = new PigServer(ExecType.LOCAL);
+    }
+
+    Configuration conf = new Configuration();
+    FileSystem fs = cluster.getFileSystem();
+    Path pathWorking = fs.getWorkingDirectory();
+    pathTable = new Path(pathWorking, "TestTableStorer");
+    System.out.println("pathTable =" + pathTable);
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+        "SF_a:float,SF_b:string,SF_c,SF_d,SF_e,SF_f,SF_g",
+        "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", conf);
+    Schema schema = writer.getSchema();
+    System.out.println("typeName" + schema.getColumn("SF_a").getType().pigDataType());
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    final int numsBatch = 10;
+    final int numsInserters = 1;
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+        for (int k = 0; k < tuple.size(); ++k) {
+          try {
+        	if(k==0) {
+        		Float f = 32.987f;
+        		tuple.set(0, k+b-f);
+        	} else {
+        		tuple.set(k, b + "_" + i + "" + k);
+        	}
+          } catch (ExecException e) {
+            e.printStackTrace();
+          }
+        }
+        inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+
+  /**
+   * Return the name of the routine that called getCurrentMethodName
+   * 
+   */
+  public String getCurrentMethodName() {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter pw = new PrintWriter(baos);
+    (new Throwable()).printStackTrace(pw);
+    pw.flush();
+    String stackTrace = baos.toString();
+    pw.close();
+
+    StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+    tok.nextToken(); // 'java.lang.Throwable'
+    tok.nextToken(); // 'at ...getCurrentMethodName'
+    String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+    // Parse line 3
+    tok = new StringTokenizer(l.trim(), " <(");
+    String t = tok.nextToken(); // 'at'
+    t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+    return t;
+  }
+
+  @Test
+  public void testStorer() throws ExecException, IOException {
+    /*
+     * Use pig LOAD to load testing data for store
+     */
+    String query = "records = LOAD '" + pathTable.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query);
+
+
+    String orderby = "srecs = ORDER records BY SF_a;";
+    pigServer.registerQuery(orderby);
+
+    Path newPath = new Path(getCurrentMethodName());
+
+    /*
+     * Table1 creation
+     */
+
+    
+    pigServer
+        .store(
+            "srecs",
+            newPath.toString()+"1",
+            TableStorer.class.getCanonicalName()
+                + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+
+    String query3 = "records1 = LOAD '"
+        + newPath.toString() + "1"
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('SF_a, SF_b', 'sorted');";
+    pigServer.registerQuery(query3);
+    /*
+     * Table2 creation
+     */
+
+    pigServer
+        .store(
+            "srecs",
+            newPath.toString()+"2",
+            TableStorer.class.getCanonicalName()
+                + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+
+
+    String query4 = "records2 = LOAD '"
+        + newPath.toString() + "2"
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query4);
+
+    String join = "joinRecords = JOIN records1 BY SF_a, records2 BY SF_a USING \"merge\";";
+    pigServer.registerQuery(join);
+    // check JOIN content
+    Iterator<Tuple> it3 = pigServer.openIterator("joinRecords");
+    int row = 0;
+    Tuple RowValue3 = null;
+    while (it3.hasNext()) {
+      // Last row value
+      RowValue3 = it3.next();
+      Assert.assertEquals(9, RowValue3.size());
+      row++;
+      if (row == 10) {
+        Assert.assertEquals("9_01", RowValue3.get(1));
+        Assert.assertEquals(-23.987f, RowValue3.get(0));
+        Assert.assertEquals("9_06", RowValue3.get(8));
+        Assert.assertEquals("9_05", RowValue3.get(7));
+        Assert.assertEquals("9_04", RowValue3.get(6));
+        Assert.assertEquals("9_03", RowValue3.get(5));
+        Assert.assertEquals("9_02", RowValue3.get(4));
+        Assert.assertEquals("9_01", RowValue3.get(3));
+        Assert.assertEquals(-23.987f, RowValue3.get(2));
+      }
+    }
+    Assert.assertEquals(10, row);
+  }
+}