You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/11/05 22:03:00 UTC
svn commit: r833166 [4/5] - in /hadoop/pig/trunk/contrib/zebra: ./
src/java/org/apache/hadoop/zebra/ src/java/org/apache/hadoop/zebra/io/
src/java/org/apache/hadoop/zebra/mapred/
src/java/org/apache/hadoop/zebra/pig/ src/java/org/apache/hadoop/zebra/pi...
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinNegative.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinNegative.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinNegative.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinNegative.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,552 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestMergeJoinNegative {
+
+ final static String STR_SCHEMA1 = "a:int,b:float,c:long,d:double,e:string,f:bytes,m1:map(string)";
+ final static String STR_STORAGE1 = "[a, b, c]; [e, f]; [m1#{a}]";
+ final static String STR_SCHEMA2 = "aa:int,bb:float,ee:string";
+ final static String STR_STORAGE2 = "[aa, bb]; [ee]";
+ final static String STR_SCHEMA3 = "a:string,b:int,c:float";
+ final static String STR_STORAGE3 = "[a, b]; [c]";
+
+ static int fileId = 0;
+
+ protected static ExecType execType = ExecType.MAPREDUCE;
+ private static MiniCluster cluster;
+ protected static PigServer pigServer;
+ private static Path pathTable1;
+ private static Path pathTable2;
+ private static Path pathTable3;
+ private static Path pathTable4;
+ private static Configuration conf;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System.setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+ }
+
+ if (execType == ExecType.MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ } else {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+
+ conf = new Configuration();
+ FileSystem fs = cluster.getFileSystem();
+ Path pathWorking = fs.getWorkingDirectory();
+ pathTable1 = new Path(pathWorking, "table1");
+ pathTable2 = new Path(pathWorking, "table2");
+ pathTable3 = new Path(pathWorking, "table3");
+ pathTable4 = new Path(pathWorking, "table4");
+
+ // Create table1 data
+ Map<String, String> m1 = new HashMap<String, String>();
+ m1.put("a","m1-a");
+ m1.put("b","m1-b");
+
+ Object[][] table1 = {
+ {5, -3.25f, 1001L, 51e+2, "Zebra", new DataByteArray("Zebra"), m1},
+ {-1, 3.25f, 1000L, 50e+2, "zebra", new DataByteArray("zebra"), m1},
+ {1001, 100.0f, 1000L, 50e+2, "apple", new DataByteArray("apple"), m1},
+ {1002, 28.0f, 1000L, 50e+2, "hadoop", new DataByteArray("hadoop"), m1},
+ {1000, 0.0f, 1002L, 52e+2, "apple", new DataByteArray("apple"), m1} };
+
+ // Create table1
+ createTable(pathTable1, STR_SCHEMA1, STR_STORAGE1, table1);
+
+ // Create table2 data
+ Map<String, String> m2 = new HashMap<String, String>();
+ m2.put("a","m2-a");
+ m2.put("b","m2-b");
+
+ Object[][] table2 = {
+ {15, 56.0f, 1004L, 50e+2, "green", new DataByteArray("green"), m2},
+ {-1, -99.0f, 1008L, 51e+2, "orange", new DataByteArray("orange"), m2},
+ {1001, 0.0f, 1000L, 55e+2, "white", new DataByteArray("white"), m2},
+ {1001, -88.0f, 1001L, 52e+2, "brown", new DataByteArray("brown"), m2},
+ {2000, 33.0f, 1002L, 52e+2, "beige", new DataByteArray("beige"), m2} };
+
+ // Create table2
+ createTable(pathTable2, STR_SCHEMA1, STR_STORAGE1, table2);
+
+ // Create table3 data
+ Object[][] table3 = {
+ {0, 7.0f, "grape"},
+ {1001, 8.0f, "orange"},
+ {-200, 9.0f, "banana"},
+ {8, -88.0f, "peach"} };
+
+ // Create table3
+ createTable(pathTable3, STR_SCHEMA2, STR_STORAGE2, table3);
+
+ // Create table4 data
+ Object[][] table4 = {
+ {"grape", 0, 7.0f},
+ {"orange", 1001, 8.0f},
+ {"banana", -200, 9.0f},
+ {"peach", 8, -88.0f} };
+
+ // Create table4
+ createTable(pathTable4, STR_SCHEMA3, STR_STORAGE3, table4);
+
+ // Load table1
+ String query1 = "table1 = LOAD '" + pathTable1.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query1);
+
+ // Load table2
+ String query2 = "table2 = LOAD '" + pathTable2.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query2);
+
+ // Load table3
+ String query3 = "table3 = LOAD '" + pathTable3.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query3);
+
+ // Load table4
+ String query4 = "table4 = LOAD '" + pathTable4.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query4);
+ }
+
+ private static void createTable(Path path, String schemaString, String storageString, Object[][] tableData) throws IOException {
+ // Create table from tableData array
+ BasicTable.Writer writer = new BasicTable.Writer(path, schemaString, storageString, conf);
+
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+ TableInserter inserter = writer.getInserter("ins", false);
+
+ for (int i = 0; i < tableData.length; ++i) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tableData[i].length; ++k) {
+ tuple.set(k, tableData[i][k]);
+ System.out.println("DEBUG: setting tuple k=" + k + "value= " + tableData[i][k]);
+ }
+ inserter.insert(new BytesWritable(("key" + i).getBytes()), tuple);
+ }
+ inserter.close();
+ writer.close();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+
+ @Test(expected = IOException.class)
+ public void test_merge_joint_12() throws ExecException, IOException {
+ //
+ // Pig script changes position of join key (negative test)
+ //
+
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby1);
+
+ String orderby2 = "sort2 = ORDER table2 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby2);
+
+ // Store sorted tables
+ ++fileId; // increment filename suffix
+ String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+ pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+ pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ // Load sorted tables
+ String load1 = "records1 = LOAD '" + pathSort1 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load1);
+
+ String load2 = "records2 = LOAD '" + pathSort2 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load2);
+
+ // Change position and name of join key
+ String reorder1 = "reorder1 = FOREACH records1 GENERATE b as n2, a as n1, c as c, d as d, e as e, f as f, m1 as m1;";
+ pigServer.registerQuery(reorder1);
+
+ // Merge tables
+ String join = "joinRecords = JOIN reorder1 BY " + "(" + "n2" + ")" + " , records2 BY " + "("+ "a" + ")" +
+ " USING \"merge\";"; // n2 is wrong data type
+ pigServer.registerQuery(join);
+
+ Iterator<Tuple> it = pigServer.openIterator("joinRecords"); // get iterator to trigger error
+ }
+
+ @Test(expected = IOException.class)
+ public void test_merge_joint_13() throws ExecException, IOException {
+ //
+ // Pig script changes sort order (negative test)
+ //
+
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby1);
+
+ String orderby2 = "sort2 = ORDER table2 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby2);
+
+ // Store sorted tables
+ ++fileId; // increment filename suffix
+ String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+ pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+ pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ // Load sorted tables
+ String load1 = "records1 = LOAD '" + pathSort1 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load1);
+
+ String load2 = "records2 = LOAD '" + pathSort2 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load2);
+
+ // Change sort order for key
+ String reorder1 = "reorder1 = FOREACH records1 GENERATE a*(-1) as a, b as b, c as c, d as d, e as e, f as f, m1 as m1;";
+ pigServer.registerQuery(reorder1);
+
+ // Merge tables
+ String join = "joinRecords = JOIN reorder1 BY " + "(" + "a" + ")" + " , records2 BY " + "("+ "a" + ")" +
+ " USING \"merge\";";
+ pigServer.registerQuery(join);
+
+ Iterator<Tuple> it = pigServer.openIterator("joinRecords"); // get iterator to trigger error
+ }
+
+ @Test(expected = IOException.class)
+ public void test_merge_joint_14() throws ExecException, IOException {
+ //
+ // Left hand table is not in ascending order (negative test)
+ //
+
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "b" + " ;"; // sort left hand table by wrong key
+ pigServer.registerQuery(orderby1);
+
+ String orderby2 = "sort2 = ORDER table2 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby2);
+
+ // Store sorted tables
+ ++fileId; // increment filename suffix
+ String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+ pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+ pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ // Load sorted tables
+ String load1 = "records1 = LOAD '" + pathSort1 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load1);
+
+ String load2 = "records2 = LOAD '" + pathSort2 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load2);
+
+ // Merge tables
+ String join = "joinRecords = JOIN records1 BY " + "(" + "a" + ")" + " , records2 BY " + "("+ "a" + ")" +
+ " USING \"merge\";";
+ pigServer.registerQuery(join);
+
+ Iterator<Tuple> it = pigServer.openIterator("joinRecords"); // get iterator to trigger error
+ }
+
+ @Test(expected = IOException.class)
+ public void test_merge_joint_15() throws ExecException, IOException {
+ //
+ // Right hand table is not in ascending order (negative test)
+ //
+
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby1);
+
+ String orderby2 = "sort2 = ORDER table2 BY " + "b" + " ;"; // sort right hand table by wrong key
+ pigServer.registerQuery(orderby2);
+
+ // Store sorted tables
+ ++fileId; // increment filename suffix
+ String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+ pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+ pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ // Load sorted tables
+ String load1 = "records1 = LOAD '" + pathSort1 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load1);
+
+ String load2 = "records2 = LOAD '" + pathSort2 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load2);
+
+ // Merge tables
+ String join = "joinRecords = JOIN records1 BY " + "(" + "a" + ")" + " , records2 BY " + "("+ "a" + ")" +
+ " USING \"merge\";";
+ pigServer.registerQuery(join);
+
+ Iterator<Tuple> it = pigServer.openIterator("joinRecords"); // get iterator to trigger error
+ }
+
+ @Test(expected = IOException.class)
+ public void test_merge_joint_16() throws ExecException, IOException {
+ //
+ // More than two input tables (negative test)
+ //
+
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby1);
+
+ String orderby2 = "sort2 = ORDER table2 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby2);
+
+ String orderby3 = "sort3 = ORDER table3 BY " + "aa" + " ;";
+ pigServer.registerQuery(orderby3);
+
+ // Store sorted tables
+ ++fileId; // increment filename suffix
+ String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+ pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+ pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ String pathSort3 = pathTable3.toString() + Integer.toString(fileId);
+ pigServer.store("sort3", pathSort3, TableStorer.class.getCanonicalName() +
+ "('[aa, bb]; [ee]')");
+
+ // Load sorted tables
+ String load1 = "records1 = LOAD '" + pathSort1 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load1);
+
+ String load2 = "records2 = LOAD '" + pathSort2 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load2);
+
+ String load3 = "records3 = LOAD '" + pathSort3 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('aa, bb, ee', 'sorted');";
+ pigServer.registerQuery(load3);
+
+ // Merge tables
+ String join = "joinRecords = JOIN records1 BY " + "(" + "a" + ")" + " , records2 BY " + "("+ "a" + ")" +
+ " , records3 BY " + "("+ "aa" + ")" + " USING \"merge\";"; // merge three tables
+ pigServer.registerQuery(join);
+ }
+
+ @Test(expected = IOException.class)
+ public void test_merge_joint_25() throws ExecException, IOException {
+ //
+ // Two tables do not have common join key (negative test)
+ //
+
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby1);
+
+ String orderby3 = "sort3 = ORDER table3 BY " + "aa" + " ;";
+ pigServer.registerQuery(orderby3);
+
+ // Store sorted tables
+ ++fileId; // increment filename suffix
+ String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+ pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ String pathSort3 = pathTable3.toString() + Integer.toString(fileId);
+ pigServer.store("sort3", pathSort3, TableStorer.class.getCanonicalName() +
+ "('[aa, bb]; [ee]')");
+
+ // Load sorted tables
+ String load1 = "records1 = LOAD '" + pathSort1 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load1);
+
+ String load3 = "records3 = LOAD '" + pathSort3 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('aa, bb, ee', 'sorted');";
+ pigServer.registerQuery(load3);
+
+ // Merge tables
+ String join = "joinRecords = JOIN records1 BY " + "(" + "a" + ")" + " , records3 BY " + "("+ "a" + ")" +
+ " USING \"merge\";"; // sort key a does not exist for records3
+ pigServer.registerQuery(join);
+ }
+
+ @Test(expected = IOException.class)
+ public void test_merge_joint_26() throws ExecException, IOException {
+ //
+ // Two tables do not have common join key (negative test)
+ //
+
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby1);
+
+ String orderby3 = "sort3 = ORDER table3 BY " + "aa" + " ;";
+ pigServer.registerQuery(orderby3);
+
+ // Store sorted tables
+ ++fileId; // increment filename suffix
+ String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+ pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ String pathSort3 = pathTable3.toString() + Integer.toString(fileId);
+ pigServer.store("sort3", pathSort3, TableStorer.class.getCanonicalName() +
+ "('[aa, bb]; [ee]')");
+
+ // Load sorted tables
+ String load1 = "records1 = LOAD '" + pathSort1 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load1);
+
+ String load3 = "records3 = LOAD '" + pathSort3 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('aa, bb, ee', 'sorted');";
+ pigServer.registerQuery(load3);
+
+ // Merge tables
+ String join = "joinRecords = JOIN records1 BY " + "(" + "aa" + ")" + " , records3 BY " + "("+ "aa" + ")" +
+ " USING \"merge\";"; // sort key aa does not exist for records1
+ pigServer.registerQuery(join);
+ }
+
+ @Test(expected = IOException.class)
+ public void test_merge_joint_27() throws ExecException, IOException {
+ //
+ // Two tables do not have common join key (negative test)
+ //
+
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby1);
+
+ String orderby3 = "sort3 = ORDER table3 BY " + "aa" + " ;";
+ pigServer.registerQuery(orderby3);
+
+ // Store sorted tables
+ ++fileId; // increment filename suffix
+ String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+ pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ String pathSort3 = pathTable3.toString() + Integer.toString(fileId);
+ pigServer.store("sort3", pathSort3, TableStorer.class.getCanonicalName() +
+ "('[aa, bb]; [ee]')");
+
+ // Load sorted tables
+ String load1 = "records1 = LOAD '" + pathSort1 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load1);
+
+ String load3 = "records3 = LOAD '" + pathSort3 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('aa, bb, ee', 'sorted');";
+ pigServer.registerQuery(load3);
+
+ // Merge tables
+ String join = "joinRecords = JOIN records1 BY " + "(" + "aaa" + ")" + " , records3 BY " + "("+ "aaa" + ")" +
+ " USING \"merge\";"; // sort key aaa does not exist for records1 and records3
+ pigServer.registerQuery(join);
+ }
+
+ @Test(expected = IOException.class)
+ public void test_merge_joint_28() throws ExecException, IOException {
+ //
+ // Two table key names are the same but the data types are different (negative test)
+ //
+
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;"; // a is float
+ pigServer.registerQuery(orderby1);
+
+ String orderby4 = "sort4 = ORDER table4 BY " + "a" + " ;"; // a is string
+ pigServer.registerQuery(orderby4);
+
+ // Store sorted tables
+ ++fileId; // increment filename suffix
+ String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+ pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ String pathSort4 = pathTable4.toString() + Integer.toString(fileId);
+ pigServer.store("sort4", pathSort4, TableStorer.class.getCanonicalName() +
+ "('[a, b]; [c]')");
+
+ // Load sorted tables
+ String load1 = "records1 = LOAD '" + pathSort1 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load1);
+
+ String load4 = "records4 = LOAD '" + pathSort4 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c', 'sorted');";
+ pigServer.registerQuery(load4);
+
+ // Merge tables
+ String join = "joinRecords = JOIN records1 BY " + "(" + "a" + ")" + " , records4 BY " + "("+ "a" + ")" +
+ " USING \"merge\";"; // sort key a is different data type for records1 and records4
+ pigServer.registerQuery(join);
+
+ Iterator<Tuple> it = pigServer.openIterator("joinRecords"); // get iterator to trigger error
+ }
+
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinPartial.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinPartial.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinPartial.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMergeJoinPartial.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.ArrayList;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestMergeJoinPartial {
+
+ final static String STR_SCHEMA1 = "a:int,b:float,c:long,d:double,e:string,f:bytes,m1:map(string)";
+ final static String STR_STORAGE1 = "[a, b, c]; [e, f]; [m1#{a}]";
+
+ static int fileId = 0;
+
+ protected static ExecType execType = ExecType.MAPREDUCE;
+ private static MiniCluster cluster;
+ protected static PigServer pigServer;
+ private static Path pathTable1;
+ private static Path pathTable2;
+ private static Configuration conf;
+
+ private static Object[][] table1;
+ private static Object[][] table2;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System.setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+ }
+
+ if (execType == ExecType.MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ } else {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+
+ conf = new Configuration();
+ FileSystem fs = cluster.getFileSystem();
+ Path pathWorking = fs.getWorkingDirectory();
+ pathTable1 = new Path(pathWorking, "table1");
+ pathTable2 = new Path(pathWorking, "table2");
+
+ // Create table1 data
+ Map<String, String> m1 = new HashMap<String, String>();
+ m1.put("a","m1-a");
+ m1.put("b","m1-b");
+
+ table1 = new Object[][]{
+ {5, -3.25f, 1001L, 51e+2, "Zebra", new DataByteArray("Zebra"), m1},
+ {-1, 3.25f, 1000L, 50e+2, "zebra", new DataByteArray("zebra"), m1},
+ {1001, 100.0f, 1003L, 50e+2, "Apple", new DataByteArray("Apple"), m1},
+ {1001, 101.0f, 1001L, 50e+2, "apple", new DataByteArray("apple"), m1},
+ {1001, 50.0f, 1000L, 50e+2, "Pig", new DataByteArray("Pig"), m1},
+ {1001, 52.0f, 1001L, 50e+2, "pig", new DataByteArray("pig"), m1},
+ {1002, 28.0f, 1000L, 50e+2, "Hadoop", new DataByteArray("Hadoop"), m1},
+ {1000, 0.0f, 1002L, 52e+2, "hadoop", new DataByteArray("hadoop"), m1} };
+
+ // Create table1
+ createTable(pathTable1, STR_SCHEMA1, STR_STORAGE1, table1);
+
+ // Create table2 data
+ Map<String, String> m2 = new HashMap<String, String>();
+ m2.put("a","m2-a");
+ m2.put("b","m2-b");
+
+ table2 = new Object[][] {
+ {15, 56.0f, 1004L, 50e+2, "green", new DataByteArray("green"), m2},
+ {-1, -99.0f, 1002L, 51e+2, "orange", new DataByteArray("orange"), m2},
+ {1001, 100.0f, 1003L, 55e+2, "white", new DataByteArray("white"), m2},
+ {1001, 102.0f, 1001L, 52e+2, "purple", new DataByteArray("purple"), m2},
+ {1001, 50.0f, 1008L, 52e+2, "gray", new DataByteArray("gray"), m2},
+ {1001, 53.0f, 1001L, 52e+2, "brown", new DataByteArray("brown"), m2},
+ {2000, 33.0f, 1006L, 52e+2, "beige", new DataByteArray("beige"), m2} };
+
+ // Create table2
+ createTable(pathTable2, STR_SCHEMA1, STR_STORAGE1, table2);
+
+ // Load table1
+ String query1 = "table1 = LOAD '" + pathTable1.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query1);
+
+ // Load table2
+ String query2 = "table2 = LOAD '" + pathTable2.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query2);
+
+ }
+
+ private static void createTable(Path path, String schemaString, String storageString, Object[][] tableData)
+ throws IOException {
+ //
+ // Create table from tableData array
+ //
+ BasicTable.Writer writer = new BasicTable.Writer(path, schemaString, storageString, conf);
+
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+ TableInserter inserter = writer.getInserter("ins", false);
+
+ for (int i = 0; i < tableData.length; ++i) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tableData[i].length; ++k) {
+ tuple.set(k, tableData[i][k]);
+ System.out.println("DEBUG: setting tuple k=" + k + "value= " + tableData[i][k]);
+ }
+ inserter.insert(new BytesWritable(("key" + i).getBytes()), tuple);
+ }
+ inserter.close();
+ writer.close();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+
+
+ @Test
+ public void test_merge_joint_17() throws ExecException, IOException {
+ //
+ // Multiple join where join keys are partial, and the order of keys is honored
+ //
+
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "a,b,c" + " ;";
+ pigServer.registerQuery(orderby1);
+
+ String orderby2 = "sort2 = ORDER table2 BY " + "a,b,c" + " ;";
+ pigServer.registerQuery(orderby2);
+
+ // Store sorted tables
+ ++fileId; // increment filename suffix
+ String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+ pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+ pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ // Load sorted tables
+ String load1 = "records1 = LOAD '" + pathSort1 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load1);
+
+ String load2 = "records2 = LOAD '" + pathSort2 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load2);
+
+ // Merge tables
+ String join = "joinRecords = JOIN records1 BY " + "(" + "a,b,c" + ")" + " , records2 BY " + "("+ "a,b,c" + ")" +
+ " USING \"merge\";";
+ pigServer.registerQuery(join);
+
+ // Verify merged tables
+ ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+
+ addResultRow(resultTable, table1[2], table2[2]); // set expected values for row1
+
+ Iterator<Tuple> it = pigServer.openIterator("joinRecords");
+ verifyTable(resultTable, it);
+ }
+
+ @Test
+ public void test_merge_joint_22() throws ExecException, IOException {
+ //
+ // Multiple join where join keys are partial, and the order of keys is honored
+ //
+ // Known bug with partial key join
+ // - Need to add verification to this test once bug is fixed
+
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "a,b,c" + " ;";
+ pigServer.registerQuery(orderby1);
+
+ String orderby2 = "sort2 = ORDER table2 BY " + "a,b,c" + " ;";
+ pigServer.registerQuery(orderby2);
+
+ // Store sorted tables
+ ++fileId; // increment filename suffix
+ String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+ pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+ pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ // Load sorted tables
+ String load1 = "records1 = LOAD '" + pathSort1 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load1);
+
+ String load2 = "records2 = LOAD '" + pathSort2 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load2);
+
+ // Merge tables
+ String join = "joinRecords = JOIN records1 BY " + "(" + "a,b" + ")" + " , records2 BY " + "("+ "a,b" + ")" +
+ " USING \"merge\";";
+ pigServer.registerQuery(join);
+
+ printTable("joinRecords");
+ }
+
+ @Test
+ public void test_merge_joint_23() throws ExecException, IOException {
+ //
+ // Multiple join where join keys are partial, and the order of keys is honored
+ //
+ // Known bug with partial key join
+ // - Need to add verification to this test once bug is fixed
+
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "a,b,c" + " ;";
+ pigServer.registerQuery(orderby1);
+
+ String orderby2 = "sort2 = ORDER table2 BY " + "a,b,c" + " ;";
+ pigServer.registerQuery(orderby2);
+
+ // Store sorted tables
+ ++fileId; // increment filename suffix
+ String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+ pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+ pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ // Load sorted tables
+ String load1 = "records1 = LOAD '" + pathSort1 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load1);
+
+ String load2 = "records2 = LOAD '" + pathSort2 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load2);
+
+ // Merge tables
+ String join = "joinRecords = JOIN records1 BY " + "(" + "a" + ")" + " , records2 BY " + "("+ "a" + ")" +
+ " USING \"merge\";";
+ pigServer.registerQuery(join);
+
+ printTable("joinRecords");
+ }
+
+ @Test(expected = IOException.class)
+ public void test_merge_joint_24() throws ExecException, IOException {
+ //
+ // Multiple join where join keys are partial, and the order of keys is honored
+ // (negative test)
+
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "a,b,c" + " ;";
+ pigServer.registerQuery(orderby1);
+
+ String orderby2 = "sort2 = ORDER table2 BY " + "a,b,c" + " ;";
+ pigServer.registerQuery(orderby2);
+
+ // Store sorted tables
+ ++fileId; // increment filename suffix
+ String pathSort1 = pathTable1.toString() + Integer.toString(fileId);
+ pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ String pathSort2 = pathTable2.toString() + Integer.toString(fileId);
+ pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+ "('[a, b, c]; [d, e, f, m1]')");
+
+ // Load sorted tables
+ String load1 = "records1 = LOAD '" + pathSort1 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load1);
+
+ String load2 = "records2 = LOAD '" + pathSort2 +
+ "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c, d, e, f, m1', 'sorted');";
+ pigServer.registerQuery(load2);
+
+ // Merge tables
+ String join = "joinRecords = JOIN records1 BY " + "(" + "a,c" + ")" + " , records2 BY " + "("+ "a,c" + ")" +
+ " USING \"merge\";";
+ pigServer.registerQuery(join);
+
+ Iterator<Tuple> it = pigServer.openIterator("joinRecords"); // get iterator to trigger error
+ }
+
+ public void printTable(String tablename) throws IOException {
+ //
+ // Print Pig Table (for debugging)
+ //
+ Iterator<Tuple> it1 = pigServer.openIterator(tablename);
+ Tuple RowValue1 = null;
+ while (it1.hasNext()) {
+ RowValue1 = it1.next();
+ System.out.println();
+
+ for (int i = 0; i < RowValue1.size(); ++i) {
+ System.out.println("DEBUG: " + tablename + " RowValue.get(" + i + ") = " + RowValue1.get(i));
+
+ }
+ }
+ }
+
+ public void addResultRow(ArrayList<ArrayList<Object>> resultTable, Object[] leftRow, Object[] rightRow) {
+ //
+ // Add a row to expected results table
+ //
+ ArrayList<Object> resultRow = new ArrayList<Object>();
+
+ for (int i=0; i<leftRow.length; ++i)
+ resultRow.add(leftRow[i]);
+ for (int i=0; i<rightRow.length; ++i)
+ resultRow.add(rightRow[i]);
+
+ resultTable.add(resultRow);
+ }
+
+ public void verifyTable(ArrayList<ArrayList<Object>> resultTable, Iterator<Tuple> it) throws IOException {
+ //
+ // Verify expected results table to returned test case table
+ //
+ Tuple RowValues;
+ int rowIndex = 0;
+
+ while (it.hasNext()) {
+ RowValues = it.next();
+ ArrayList<Object> resultRow = resultTable.get(rowIndex);
+ Assert.assertEquals(resultRow.size(), RowValues.size()); // verify expected tuple count
+ System.out.println();
+
+ for (int i = 0; i < RowValues.size(); ++i) {
+ System.out.println("DEBUG: resultTable " + " RowValue.get(" + i + ") = " + RowValues.get(i) +
+ " " + resultRow.get(i));
+ Assert.assertEquals(resultRow.get(i), RowValues.get(i)); // verify each row value
+ }
+ ++rowIndex;
+ }
+ Assert.assertEquals(resultTable.size(), rowIndex); // verify expected row count
+ }
+
+}
Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java Thu Nov 5 21:02:57 2009
@@ -93,7 +93,7 @@
System.out.println("path =" + path);
BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
- STR_STORAGE, false, conf);
+ STR_STORAGE, conf);
writer.finish();
Schema schema = writer.getSchema();
Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSimpleType.java Thu Nov 5 21:02:57 2009
@@ -100,7 +100,7 @@
System.out.println("path =" + path);
BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
- STR_STORAGE, false, conf);
+ STR_STORAGE, conf);
Schema schema = writer.getSchema();
BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ *
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ *
+ */
+public class TestSortedTableUnion {
+ protected static ExecType execType = ExecType.MAPREDUCE;
+ private static MiniCluster cluster;
+ protected static PigServer pigServer;
+ private static Path pathTable;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System
+ .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+ }
+
+ if (execType == ExecType.MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ } else {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+
+ Configuration conf = new Configuration();
+ FileSystem fs = cluster.getFileSystem();
+ Path pathWorking = fs.getWorkingDirectory();
+ pathTable = new Path(pathWorking, "TestTableStorer");
+ System.out.println("pathTable =" + pathTable);
+ BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+ "SF_a:string,SF_b:string,SF_c,SF_d,SF_e,SF_f,SF_g",
+ "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", conf);
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+
+ final int numsBatch = 10;
+ final int numsInserters = 1;
+ TableInserter[] inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, (9-b) + "_" + i + "" + k);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+ }
+ }
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+ writer.close();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+
+ /**
+ * Return the name of the routine that called getCurrentMethodName
+ *
+ */
+ public String getCurrentMethodName() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter pw = new PrintWriter(baos);
+ (new Throwable()).printStackTrace(pw);
+ pw.flush();
+ String stackTrace = baos.toString();
+ pw.close();
+
+ StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+ tok.nextToken(); // 'java.lang.Throwable'
+ tok.nextToken(); // 'at ...getCurrentMethodName'
+ String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+ // Parse line 3
+ tok = new StringTokenizer(l.trim(), " <(");
+ String t = tok.nextToken(); // 'at'
+ t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+ return t;
+ }
+
+ @Test
+ public void testStorer() throws ExecException, IOException {
+ /*
+ * Use pig LOAD to load testing data for store
+ */
+ String query = "records = LOAD '" + pathTable.toString()
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query);
+
+ /*
+ Iterator<Tuple> it2 = pigServer.openIterator("records");
+ int row0 = 0;
+ Tuple RowValue2 = null;
+ while (it2.hasNext()) {
+ // Last row value
+ RowValue2 = it2.next();
+ row0++;
+ if (row0 == 10) {
+ Assert.assertEquals("0_01", RowValue2.get(1));
+ Assert.assertEquals("0_00", RowValue2.get(0));
+ }
+ }
+ Assert.assertEquals(10, row0);
+ */
+
+ String orderby = "srecs = ORDER records BY SF_a;";
+ pigServer.registerQuery(orderby);
+
+ /*
+ * Use pig STORE to store testing data BasicTable.Writer writer = new
+ * BasicTable.Writer(pathTable, "SF_a,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g",
+ * "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", false, conf);
+ */
+ Path newPath = new Path(getCurrentMethodName());
+
+ /*
+ * Table1 creation
+ */
+ pigServer
+ .store(
+ "srecs",
+ newPath.toString()+"1",
+ TableStorer.class.getCanonicalName()
+ + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+ pigServer
+ .store(
+ "srecs",
+ newPath.toString()+"2",
+ TableStorer.class.getCanonicalName()
+ + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+
+
+ String query4 = "records2 = LOAD '"
+ + newPath.toString() + "1,"
+ + newPath.toString() + "2"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('', 'sorted');";
+ pigServer.registerQuery(query4);
+
+ // check JOIN content
+ Iterator<Tuple> it3 = pigServer.openIterator("records2");
+ int row = 0, index;
+ Tuple RowValue3 = null;
+ while (it3.hasNext()) {
+ // Last row value
+ RowValue3 = it3.next();
+ Assert.assertEquals(7, RowValue3.size());
+ row++;
+ index = (row-1)/2;
+ Assert.assertEquals(index+"_01", RowValue3.get(1));
+ Assert.assertEquals(index+"_00", RowValue3.get(0));
+ Assert.assertEquals(index+"_06", RowValue3.get(6));
+ Assert.assertEquals(index+"_05", RowValue3.get(5));
+ Assert.assertEquals(index+"_04", RowValue3.get(4));
+ Assert.assertEquals(index+"_03", RowValue3.get(3));
+ Assert.assertEquals(index+"_02", RowValue3.get(2));
+ }
+ Assert.assertEquals(20, row);
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionMergeJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionMergeJoin.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionMergeJoin.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionMergeJoin.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ *
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ *
+ */
+public class TestSortedTableUnionMergeJoin {
+ protected static ExecType execType = ExecType.MAPREDUCE;
+ private static MiniCluster cluster;
+ protected static PigServer pigServer;
+ private static Path pathTable;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System
+ .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+ }
+
+ if (execType == ExecType.MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ } else {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+
+ Configuration conf = new Configuration();
+ FileSystem fs = cluster.getFileSystem();
+ Path pathWorking = fs.getWorkingDirectory();
+ pathTable = new Path(pathWorking, "TestTableStorer");
+ System.out.println("pathTable =" + pathTable);
+ BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+ "SF_a:string,SF_b:string,SF_c,SF_d,SF_e,SF_f,SF_g",
+ "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", conf);
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+
+ final int numsBatch = 10;
+ final int numsInserters = 1;
+ TableInserter[] inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, (9-b) + "_" + i + "" + k);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+ }
+ }
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+ writer.close();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+
+ /**
+ * Return the name of the routine that called getCurrentMethodName
+ *
+ */
+ public String getCurrentMethodName() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter pw = new PrintWriter(baos);
+ (new Throwable()).printStackTrace(pw);
+ pw.flush();
+ String stackTrace = baos.toString();
+ pw.close();
+
+ StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+ tok.nextToken(); // 'java.lang.Throwable'
+ tok.nextToken(); // 'at ...getCurrentMethodName'
+ String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+ // Parse line 3
+ tok = new StringTokenizer(l.trim(), " <(");
+ String t = tok.nextToken(); // 'at'
+ t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+ return t;
+ }
+
+ @Test
+ public void testStorer() throws ExecException, IOException {
+ /*
+ * Use pig LOAD to load testing data for store
+ */
+ String query = "records = LOAD '" + pathTable.toString()
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query);
+
+ /*
+ Iterator<Tuple> it2 = pigServer.openIterator("records");
+ int row0 = 0;
+ Tuple RowValue2 = null;
+ while (it2.hasNext()) {
+ // Last row value
+ RowValue2 = it2.next();
+ row0++;
+ if (row0 == 10) {
+ Assert.assertEquals("0_01", RowValue2.get(1));
+ Assert.assertEquals("0_00", RowValue2.get(0));
+ }
+ }
+ Assert.assertEquals(10, row0);
+ */
+
+ String orderby = "srecs = ORDER records BY SF_a;";
+ pigServer.registerQuery(orderby);
+
+ /*
+ * Use pig STORE to store testing data BasicTable.Writer writer = new
+ * BasicTable.Writer(pathTable, "SF_a,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g",
+ * "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", false, conf);
+ */
+ Path newPath = new Path(getCurrentMethodName());
+
+ /*
+ * Table1 creation
+ */
+ pigServer
+ .store(
+ "srecs",
+ newPath.toString()+"1",
+ TableStorer.class.getCanonicalName()
+ + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+ pigServer
+ .store(
+ "srecs",
+ newPath.toString()+"2",
+ TableStorer.class.getCanonicalName()
+ + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+
+ String query3 = "records1 = LOAD '"
+ + newPath.toString() + "1"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('', 'sorted');";
+ pigServer.registerQuery(query3);
+
+ String query4 = "records2 = LOAD '"
+ + newPath.toString() + "1,"
+ + newPath.toString() + "2"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('', 'sorted');";
+ pigServer.registerQuery(query4);
+
+ String join = "joinRecords = JOIN records1 BY SF_a, records2 BY SF_a USING \"merge\";";
+ pigServer.registerQuery(join);
+
+ // check JOIN content
+ Iterator<Tuple> it3 = pigServer.openIterator("records2");
+ int row = 0, index;
+ Tuple RowValue3 = null;
+ while (it3.hasNext()) {
+ // Last row value
+ RowValue3 = it3.next();
+ Assert.assertEquals(7, RowValue3.size());
+ row++;
+ index = (row-1)/2;
+ Assert.assertEquals(index+"_01", RowValue3.get(1));
+ Assert.assertEquals(index+"_00", RowValue3.get(0));
+ Assert.assertEquals(index+"_06", RowValue3.get(6));
+ Assert.assertEquals(index+"_05", RowValue3.get(5));
+ Assert.assertEquals(index+"_04", RowValue3.get(4));
+ Assert.assertEquals(index+"_03", RowValue3.get(3));
+ Assert.assertEquals(index+"_02", RowValue3.get(2));
+ }
+ Assert.assertEquals(20, row);
+ }
+}
Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java?rev=833166&r1=833165&r2=833166&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java Thu Nov 5 21:02:57 2009
@@ -76,7 +76,7 @@
System.out.println("pathTable =" + pathTable);
BasicTable.Writer writer = new BasicTable.Writer(pathTable,
- "a:string,b,c:string,d,e,f,g", "[a,b,c];[d,e,f,g]", false, conf);
+ "a:string,b,c:string,d,e,f,g", "[a,b,c];[d,e,f,g]", conf);
Schema schema = writer.getSchema();
Tuple tuple = TypesUtils.createTuple(schema);
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoin.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoin.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoin.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ *
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ *
+ */
+public class TestTableMergeJoin {
+ protected static ExecType execType = ExecType.MAPREDUCE;
+ private static MiniCluster cluster;
+ protected static PigServer pigServer;
+ private static Path pathTable;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System
+ .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+ }
+
+ if (execType == ExecType.MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ } else {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+
+ Configuration conf = new Configuration();
+ FileSystem fs = cluster.getFileSystem();
+ Path pathWorking = fs.getWorkingDirectory();
+ pathTable = new Path(pathWorking, "TestTableStorer");
+ System.out.println("pathTable =" + pathTable);
+ BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+ "SF_a:string,SF_b:string,SF_c,SF_d,SF_e,SF_f,SF_g",
+ "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", conf);
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+
+ final int numsBatch = 10;
+ final int numsInserters = 1;
+ TableInserter[] inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, (9-b) + "_" + i + "" + k);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+ }
+ }
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+ writer.close();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+
+ /**
+ * Return the name of the routine that called getCurrentMethodName
+ *
+ */
+ public String getCurrentMethodName() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter pw = new PrintWriter(baos);
+ (new Throwable()).printStackTrace(pw);
+ pw.flush();
+ String stackTrace = baos.toString();
+ pw.close();
+
+ StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+ tok.nextToken(); // 'java.lang.Throwable'
+ tok.nextToken(); // 'at ...getCurrentMethodName'
+ String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+ // Parse line 3
+ tok = new StringTokenizer(l.trim(), " <(");
+ String t = tok.nextToken(); // 'at'
+ t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+ return t;
+ }
+
+ @Test
+ public void testStorer() throws ExecException, IOException {
+ /*
+ * Use pig LOAD to load testing data for store
+ */
+ String query = "records = LOAD '" + pathTable.toString()
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query);
+
+ /*
+ Iterator<Tuple> it2 = pigServer.openIterator("records");
+ int row0 = 0;
+ Tuple RowValue2 = null;
+ while (it2.hasNext()) {
+ // Last row value
+ RowValue2 = it2.next();
+ row0++;
+ if (row0 == 10) {
+ Assert.assertEquals("0_01", RowValue2.get(1));
+ Assert.assertEquals("0_00", RowValue2.get(0));
+ }
+ }
+ Assert.assertEquals(10, row0);
+ */
+
+ String orderby = "srecs = ORDER records BY SF_a;";
+ pigServer.registerQuery(orderby);
+
+ /*
+ * Use pig STORE to store testing data BasicTable.Writer writer = new
+ * BasicTable.Writer(pathTable, "SF_a,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g",
+ * "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", false, conf);
+ */
+ Path newPath = new Path(getCurrentMethodName());
+
+ /*
+ * Table1 creation
+ */
+ pigServer
+ .store(
+ "srecs",
+ newPath.toString()+"1",
+ TableStorer.class.getCanonicalName()
+ + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+
+ String query3 = "records1 = LOAD '"
+ + newPath.toString() + "1"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('', 'sorted');";
+ pigServer.registerQuery(query3);
+
+ /*
+ Iterator<Tuple> it3 = pigServer.openIterator("records1");
+ int row = 0;
+ Tuple RowValue3 = null;
+ while (it3.hasNext()) {
+ // Last row value
+ RowValue3 = it3.next();
+ Assert.assertEquals(2, RowValue3.size());
+ row++;
+ if (row == 10) {
+ Assert.assertEquals("9_01", RowValue3.get(1));
+ Assert.assertEquals("9_00", RowValue3.get(0));
+ }
+ }
+ Assert.assertEquals(10, row);
+ */
+
+ /*
+ * Table2 creation
+ */
+ pigServer
+ .store(
+ "srecs",
+ newPath.toString()+"2",
+ TableStorer.class.getCanonicalName()
+ + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+
+
+ String query4 = "records2 = LOAD '"
+ + newPath.toString() + "2"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query4);
+
+ String join = "joinRecords = JOIN records1 BY SF_a, records2 BY SF_a USING \"merge\";";
+ pigServer.registerQuery(join);
+ // check JOIN content
+ Iterator<Tuple> it3 = pigServer.openIterator("joinRecords");
+ int row = 0;
+ Tuple RowValue3 = null;
+ while (it3.hasNext()) {
+ // Last row value
+ RowValue3 = it3.next();
+ Assert.assertEquals(14, RowValue3.size());
+ row++;
+ if (row == 10) {
+ Assert.assertEquals("9_01", RowValue3.get(1));
+ Assert.assertEquals("9_00", RowValue3.get(0));
+ Assert.assertEquals("9_01", RowValue3.get(8));
+ Assert.assertEquals("9_00", RowValue3.get(7));
+ Assert.assertEquals("9_06", RowValue3.get(6));
+ Assert.assertEquals("9_05", RowValue3.get(5));
+ Assert.assertEquals("9_04", RowValue3.get(4));
+ Assert.assertEquals("9_03", RowValue3.get(3));
+ Assert.assertEquals("9_02", RowValue3.get(2));
+ }
+ }
+ Assert.assertEquals(10, row);
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinAfterFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinAfterFilter.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinAfterFilter.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinAfterFilter.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ *
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ *
+ */
+public class TestTableMergeJoinAfterFilter {
+ protected static ExecType execType = ExecType.MAPREDUCE;
+ private static MiniCluster cluster;
+ protected static PigServer pigServer;
+ private static Path pathTable;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System
+ .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+ }
+
+ if (execType == ExecType.MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ } else {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+
+ Configuration conf = new Configuration();
+ FileSystem fs = cluster.getFileSystem();
+ Path pathWorking = fs.getWorkingDirectory();
+ pathTable = new Path(pathWorking, "TestTableStorer");
+ System.out.println("pathTable =" + pathTable);
+ BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+ "SF_a:string,SF_b:string,SF_c,SF_d,SF_e,SF_f,SF_g",
+ "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", conf);
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+
+ final int numsBatch = 10;
+ final int numsInserters = 1;
+ TableInserter[] inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, (9-b) + "_" + i + "" + k);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+ }
+ }
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+ writer.close();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+
+ /**
+ * Return the name of the routine that called getCurrentMethodName
+ *
+ */
+ public String getCurrentMethodName() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter pw = new PrintWriter(baos);
+ (new Throwable()).printStackTrace(pw);
+ pw.flush();
+ String stackTrace = baos.toString();
+ pw.close();
+
+ StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+ tok.nextToken(); // 'java.lang.Throwable'
+ tok.nextToken(); // 'at ...getCurrentMethodName'
+ String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+ // Parse line 3
+ tok = new StringTokenizer(l.trim(), " <(");
+ String t = tok.nextToken(); // 'at'
+ t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+ return t;
+ }
+
+ @Test
+ public void testStorer() throws ExecException, IOException {
+ /*
+ * Use pig LOAD to load testing data for store
+ */
+ String query = "records = LOAD '" + pathTable.toString()
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query);
+
+ /*
+ Iterator<Tuple> it2 = pigServer.openIterator("records");
+ int row0 = 0;
+ Tuple RowValue2 = null;
+ while (it2.hasNext()) {
+ // Last row value
+ RowValue2 = it2.next();
+ row0++;
+ if (row0 == 10) {
+ Assert.assertEquals("0_01", RowValue2.get(1));
+ Assert.assertEquals("0_00", RowValue2.get(0));
+ }
+ }
+ Assert.assertEquals(10, row0);
+ */
+
+ String orderby = "srecs = ORDER records BY SF_a;";
+ pigServer.registerQuery(orderby);
+
+ /*
+ * Use pig STORE to store testing data BasicTable.Writer writer = new
+ * BasicTable.Writer(pathTable, "SF_a,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g",
+ * "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", false, conf);
+ */
+ Path newPath = new Path(getCurrentMethodName());
+
+ /*
+ * Table1 creation
+ */
+ pigServer
+ .store(
+ "srecs",
+ newPath.toString()+"1",
+ TableStorer.class.getCanonicalName()
+ + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+
+ String query3 = "records1 = LOAD '"
+ + newPath.toString() + "1"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('SF_a, SF_b', 'sorted');";
+ pigServer.registerQuery(query3);
+
+ /*
+ * Table2 creation
+ */
+ pigServer
+ .store(
+ "srecs",
+ newPath.toString()+"2",
+ TableStorer.class.getCanonicalName()
+ + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+
+
+ String query4 = "records2 = LOAD '"
+ + newPath.toString() + "2"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query4);
+
+ String filter = "records3 = FILTER records2 BY SF_a > '4';";
+ pigServer.registerQuery(filter);
+
+ Iterator<Tuple> it2 = pigServer.openIterator("records3");
+ int row2 = 0;
+ Tuple RowValue2 = null;
+ while (it2.hasNext()) {
+ // Last row value
+ RowValue2 = it2.next();
+ Assert.assertEquals(7, RowValue2.size());
+ row2++;
+ if (row2 == 5) {
+ Assert.assertEquals("8_01", RowValue2.get(1));
+ Assert.assertEquals("8_00", RowValue2.get(0));
+ }
+ }
+ Assert.assertEquals(6, row2);
+
+ String join = "joinRecords = JOIN records1 BY SF_a, records3 BY SF_a USING \"merge\";";
+ pigServer.registerQuery(join);
+ // check JOIN content
+ Iterator<Tuple> it3 = pigServer.openIterator("joinRecords");
+ int row = 0;
+ Tuple RowValue3 = null;
+ while (it3.hasNext()) {
+ // Last row value
+ RowValue3 = it3.next();
+ Assert.assertEquals(9, RowValue3.size());
+ row++;
+ if (row == 6) {
+ Assert.assertEquals("9_01", RowValue3.get(1));
+ Assert.assertEquals("9_00", RowValue3.get(0));
+ Assert.assertEquals("9_06", RowValue3.get(8));
+ Assert.assertEquals("9_05", RowValue3.get(7));
+ Assert.assertEquals("9_04", RowValue3.get(6));
+ Assert.assertEquals("9_03", RowValue3.get(5));
+ Assert.assertEquals("9_02", RowValue3.get(4));
+ Assert.assertEquals("9_01", RowValue3.get(3));
+ Assert.assertEquals("9_00", RowValue3.get(2));
+ }
+ }
+ Assert.assertEquals(6, row);
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinFloat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinFloat.java?rev=833166&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinFloat.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableMergeJoinFloat.java Thu Nov 5 21:02:57 2009
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ *
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ *
+ */
+public class TestTableMergeJoinFloat {
+ protected static ExecType execType = ExecType.MAPREDUCE;
+ private static MiniCluster cluster;
+ protected static PigServer pigServer;
+ private static Path pathTable;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System
+ .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+ }
+
+ if (execType == ExecType.MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ } else {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+
+ Configuration conf = new Configuration();
+ FileSystem fs = cluster.getFileSystem();
+ Path pathWorking = fs.getWorkingDirectory();
+ pathTable = new Path(pathWorking, "TestTableStorer");
+ System.out.println("pathTable =" + pathTable);
+ BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+ "SF_a:float,SF_b:string,SF_c,SF_d,SF_e,SF_f,SF_g",
+ "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", conf);
+ Schema schema = writer.getSchema();
+ System.out.println("typeName" + schema.getColumn("SF_a").getType().pigDataType());
+ Tuple tuple = TypesUtils.createTuple(schema);
+
+ final int numsBatch = 10;
+ final int numsInserters = 1;
+ TableInserter[] inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ if(k==0) {
+ Float f = 32.987f;
+ tuple.set(0, k+b-f);
+ } else {
+ tuple.set(k, b + "_" + i + "" + k);
+ }
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+ }
+ }
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+ writer.close();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+
+ /**
+ * Return the name of the routine that called getCurrentMethodName
+ *
+ */
+ public String getCurrentMethodName() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter pw = new PrintWriter(baos);
+ (new Throwable()).printStackTrace(pw);
+ pw.flush();
+ String stackTrace = baos.toString();
+ pw.close();
+
+ StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+ tok.nextToken(); // 'java.lang.Throwable'
+ tok.nextToken(); // 'at ...getCurrentMethodName'
+ String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+ // Parse line 3
+ tok = new StringTokenizer(l.trim(), " <(");
+ String t = tok.nextToken(); // 'at'
+ t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+ return t;
+ }
+
+ @Test
+ public void testStorer() throws ExecException, IOException {
+ /*
+ * Use pig LOAD to load testing data for store
+ */
+ String query = "records = LOAD '" + pathTable.toString()
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query);
+
+
+ String orderby = "srecs = ORDER records BY SF_a;";
+ pigServer.registerQuery(orderby);
+
+ Path newPath = new Path(getCurrentMethodName());
+
+ /*
+ * Table1 creation
+ */
+
+
+ pigServer
+ .store(
+ "srecs",
+ newPath.toString()+"1",
+ TableStorer.class.getCanonicalName()
+ + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+
+ String query3 = "records1 = LOAD '"
+ + newPath.toString() + "1"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('SF_a, SF_b', 'sorted');";
+ pigServer.registerQuery(query3);
+ /*
+ * Table2 creation
+ */
+
+ pigServer
+ .store(
+ "srecs",
+ newPath.toString()+"2",
+ TableStorer.class.getCanonicalName()
+ + "('[SF_a, SF_b, SF_c]; [SF_e]')");
+
+
+ String query4 = "records2 = LOAD '"
+ + newPath.toString() + "2"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query4);
+
+ String join = "joinRecords = JOIN records1 BY SF_a, records2 BY SF_a USING \"merge\";";
+ pigServer.registerQuery(join);
+ // check JOIN content
+ Iterator<Tuple> it3 = pigServer.openIterator("joinRecords");
+ int row = 0;
+ Tuple RowValue3 = null;
+ while (it3.hasNext()) {
+ // Last row value
+ RowValue3 = it3.next();
+ Assert.assertEquals(9, RowValue3.size());
+ row++;
+ if (row == 10) {
+ Assert.assertEquals("9_01", RowValue3.get(1));
+ Assert.assertEquals(-23.987f, RowValue3.get(0));
+ Assert.assertEquals("9_06", RowValue3.get(8));
+ Assert.assertEquals("9_05", RowValue3.get(7));
+ Assert.assertEquals("9_04", RowValue3.get(6));
+ Assert.assertEquals("9_03", RowValue3.get(5));
+ Assert.assertEquals("9_02", RowValue3.get(4));
+ Assert.assertEquals("9_01", RowValue3.get(3));
+ Assert.assertEquals(-23.987f, RowValue3.get(2));
+ }
+ }
+ Assert.assertEquals(10, row);
+ }
+}