You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/24 20:54:34 UTC
svn commit: r883836 [10/23] - in /hadoop/pig/branches/load-store-redesign:
./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/
contrib/zebra/ contrib/zebr...
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjection.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjection.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjection.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjection.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,1020 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestOrderPreserveProjection {
+
+ final static String TABLE1_SCHEMA = "a:int,b:float,c:long,d:double,e:string,f:bytes,m1:map(string)";
+ final static String TABLE1_STORAGE = "[a, b, c]; [d, e, f]; [m1#{a}]";
+
+ final static String TABLE2_SCHEMA = "a:int,b:float,c:long,d:double,e:string,f:bytes,m1:map(string)";
+ final static String TABLE2_STORAGE = "[a, b, c]; [d, e, f]; [m1#{a}]";
+
+ final static String TABLE3_SCHEMA = "e:string,str1:string,str2:string,int1:int,map1:map(string)";
+ final static String TABLE3_STORAGE = "[e, str1]; [str2,int1,map1#{a}]";
+
+ final static String TABLE4_SCHEMA = "int1:int,str1:string,long1:long";
+ final static String TABLE4_STORAGE = "[int1]; [str1,long1]";
+
+ final static String TABLE5_SCHEMA = "b:float,d:double,c:long,e:string,f:bytes,int1:int";
+ final static String TABLE5_STORAGE = "[b,d]; [c]; [e]; [f,int1]";
+
+ final static String TABLE6_SCHEMA = "e:string,str3:string,str4:string";
+ final static String TABLE6_STORAGE = "[e,str3,str4]";
+
+ final static String TABLE7_SCHEMA = "int2:int";
+ final static String TABLE7_STORAGE = "[int2]";
+
+ static int fileId = 0;
+ static int sortId = 0;
+
+ protected static ExecType execType = ExecType.MAPREDUCE;
+ private static MiniCluster cluster;
+ protected static PigServer pigServer;
+ protected static ExecJob pigJob;
+
+ private static Path pathTable1;
+ private static Path pathTable2;
+ private static Path pathTable3;
+ private static Path pathTable4;
+ private static Path pathTable5;
+ private static Path pathTable6;
+ private static Path pathTable7;
+ private static HashMap<String, String> tableStorage;
+
+ private static ArrayList<String> tableList;
+
+ private static Configuration conf;
+
+ private static Object[][] table1;
+ private static Object[][] table2;
+ private static Object[][] table3;
+ private static Object[][] table4;
+ private static Object[][] table5;
+ private static Object[][] table6;
+ private static Object[][] table7;
+
+ private static Map<String, String> m1;
+ private static Map<String, String> m2;
+ private static Map<String, String> m3;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System.setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+ }
+
+ if (execType == ExecType.MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ } else {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+
+ conf = new Configuration();
+ FileSystem fs = cluster.getFileSystem();
+ Path pathWorking = fs.getWorkingDirectory();
+
+ pathTable1 = new Path(pathWorking, "table1");
+ pathTable2 = new Path(pathWorking, "table2");
+ pathTable3 = new Path(pathWorking, "table3");
+ pathTable4 = new Path(pathWorking, "table4");
+ pathTable5 = new Path(pathWorking, "table5");
+ pathTable6 = new Path(pathWorking, "table6");
+ pathTable7 = new Path(pathWorking, "table7");
+
+ // Create table1 data
+ m1 = new HashMap<String, String>();
+ m1.put("a","m1-a");
+ m1.put("b","m1-b");
+
+ table1 = new Object[][]{
+ {5, -3.25f, 1001L, 51e+2, "Zebra", new DataByteArray("Zebra"), m1},
+ {-1, 3.25f, 1000L, 50e+2, "zebra", new DataByteArray("zebra"), m1},
+ {1001, 100.0f, 1003L, 50e+2, "Apple", new DataByteArray("Apple"), m1},
+ {1001, 101.0f, 1001L, 50e+2, "apple", new DataByteArray("apple"), m1},
+ {1001, 50.0f, 1000L, 50e+2, "Pig", new DataByteArray("Pig"), m1},
+ {1001, 52.0f, 1001L, 50e+2, "pig", new DataByteArray("pig"), m1},
+ {1002, 28.0f, 1000L, 50e+2, "Hadoop", new DataByteArray("Hadoop"), m1},
+ {1000, 0.0f, 1002L, 52e+2, "hadoop", new DataByteArray("hadoop"), m1} };
+
+ // Create table1
+ createTable(pathTable1, TABLE1_SCHEMA, TABLE1_STORAGE, table1);
+
+ // Create table2 data
+ m2 = new HashMap<String, String>();
+ m2.put("a","m2-a");
+ m2.put("b","m2-b");
+
+ table2 = new Object[][] {
+ {15, 56.0f, 1004L, 50e+2, "green", new DataByteArray("green"), m2},
+ {-1, -99.0f, 1002L, 51e+2, "orange", new DataByteArray("orange"), m2},
+ {1001, 100.0f, 1003L, 55e+2, "white", new DataByteArray("white"), m2},
+ {1001, 102.0f, 1001L, 52e+2, "purple", new DataByteArray("purple"), m2},
+ {1001, 50.0f, 1008L, 52e+2, "gray", new DataByteArray("gray"), m2},
+ {1001, 53.0f, 1001L, 52e+2, "brown", new DataByteArray("brown"), m2},
+ {2000, 33.0f, 1006L, 52e+2, "beige", new DataByteArray("beige"), m2} };
+
+ // Create table2
+ createTable(pathTable2, TABLE2_SCHEMA, TABLE2_STORAGE, table2);
+
+ // Create table3 data
+ m3 = new HashMap<String, String>();
+ m3.put("a","m3-a");
+ m3.put("b","m3-b");
+ m3.put("c","m3-b");
+
+ table3 = new Object[][] {
+ {"a 8", "Cupertino", "California", 1, m3},
+ {"a 7", "San Jose", "California", 2, m3},
+ {"a 6", "Santa Cruz", "California", 3, m3},
+ {"a 5", "Las Vegas", "Nevada", 4, m3},
+ {"a 4", "New York", "New York", 5, m3},
+ {"a 3", "Phoenix", "Arizona", 6, m3},
+ {"a 2", "Dallas", "Texas", 7, m3},
+ {"a 1", "Reno", "Nevada", 8, m3} };
+
+ // Create table3
+ createTable(pathTable3, TABLE3_SCHEMA, TABLE3_STORAGE, table3);
+
+ // Create table4 data
+ table4 = new Object[][] {
+ {1, "Cupertino", 1001L},
+ {2, "San Jose", 1008L},
+ {3, "Santa Cruz", 1008L},
+ {4, "Las Vegas", 1008L},
+ {5, "Dallas", 1010L},
+ {6, "Reno", 1000L} };
+
+ // Create table4
+ createTable(pathTable4, TABLE4_SCHEMA, TABLE4_STORAGE, table4);
+
+ // Create table5 data
+ table5 = new Object[][] {
+ {3.25f, 51e+2, 1001L, "string1", new DataByteArray("green"), 100},
+ {3.25f, 50e+2, 1000L, "string1", new DataByteArray("blue"), 100},
+ {3.26f, 51e+2, 1001L, "string0", new DataByteArray("purple"),100},
+ {3.24f, 53e+2, 1001L, "string1", new DataByteArray("yellow"),100},
+ {3.28f, 51e+2, 1001L, "string2", new DataByteArray("white"), 100},
+ {3.25f, 53e+2, 1000L, "string0", new DataByteArray("orange"),100} };
+
+ // Create table5
+ createTable(pathTable5, TABLE5_SCHEMA, TABLE5_STORAGE, table5);
+
+ // Create table6 data
+ table6 = new Object[][] {
+ {"a 8", "Cupertino", "California"},
+ {"a 7", "San Jose", "California"},
+ {"a 6", "Santa Cruz", "California"},
+ {"a 5", "Las Vegas", "Nevada"},
+ {"a 4", "New York", "New York"},
+ {"a 3", "Phoenix", "Arizona"},
+ {"a 2", "Dallas", "Texas"},
+ {"a 1", "Reno", "Nevada"} };
+
+ // Create table6
+ createTable(pathTable6, TABLE6_SCHEMA, TABLE6_STORAGE, table6);
+
+ // Create table7 data
+ table7 = new Object[][] {
+ {1001},
+ {1000},
+ {1010},
+ {1009},
+ {1001} };
+
+ // Create table7
+ createTable(pathTable7, TABLE7_SCHEMA, TABLE7_STORAGE, table7);
+
+ // Load tables
+ String query1 = "table1 = LOAD '" + pathTable1.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query1);
+ String query2 = "table2 = LOAD '" + pathTable2.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query2);
+ String query3 = "table3 = LOAD '" + pathTable3.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query3);
+ String query4 = "table4 = LOAD '" + pathTable4.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query4);
+ String query5 = "table5 = LOAD '" + pathTable5.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query5);
+ String query6 = "table6 = LOAD '" + pathTable6.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query6);
+ String query7 = "table7 = LOAD '" + pathTable7.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query7);
+
+ // Map for table storage
+ tableStorage = new HashMap<String, String>();
+ tableStorage.put("table1", TABLE1_STORAGE);
+ tableStorage.put("table2", TABLE2_STORAGE);
+ tableStorage.put("table3", TABLE3_STORAGE);
+ tableStorage.put("table4", TABLE4_STORAGE);
+ tableStorage.put("table5", TABLE5_STORAGE);
+ tableStorage.put("table6", TABLE6_STORAGE);
+ tableStorage.put("table7", TABLE7_STORAGE);
+ }
+
+ private static void createTable(Path path, String schemaString, String storageString, Object[][] tableData)
+ throws IOException {
+ //
+ // Create table from tableData array
+ //
+ BasicTable.Writer writer = new BasicTable.Writer(path, schemaString, storageString, conf);
+
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+ TableInserter inserter = writer.getInserter("ins", false);
+
+ for (int i = 0; i < tableData.length; ++i) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tableData[i].length; ++k) {
+ tuple.set(k, tableData[i][k]);
+ System.out.println("DEBUG: setting tuple k=" + k + "value= " + tableData[i][k]);
+ }
+ inserter.insert(new BytesWritable(("key" + i).getBytes()), tuple);
+ }
+ inserter.close();
+ writer.close();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+
+ private Iterator<Tuple> testOrderPreserveUnion(Map<String, String> inputTables, String columns) throws IOException {
+ //
+ // Test order preserve union from input tables and provided output columns
+ //
+ Assert.assertTrue("Table union requires two or more input tables", inputTables.size() >= 2);
+
+ Path newPath = new Path(getCurrentMethodName());
+ ArrayList<String> pathList = new ArrayList<String>();
+ tableList = new ArrayList<String>(); // list of tables to load
+
+ // Load and store each of the input tables
+ for (Iterator<String> it = inputTables.keySet().iterator(); it.hasNext();) {
+
+ String tablename = it.next();
+ String sortkey = inputTables.get(tablename);
+
+ String sortName = "sort" + ++sortId;
+
+ // Sort tables
+ String orderby = sortName + " = ORDER " + tablename + " BY " + sortkey + " ;"; // need single quotes?
+ pigServer.registerQuery(orderby);
+
+ String sortPath = new String(newPath.toString() + ++fileId); // increment fileId suffix
+
+ // Store sorted tables
+ pigJob = pigServer.store(sortName, sortPath, TableStorer.class.getCanonicalName() +
+ "('" + tableStorage.get(tablename) + "')");
+ Assert.assertNull(pigJob.getException());
+
+ tableList.add(tablename); // add table name to list
+ pathList.add(sortPath); // add table path to list
+ }
+
+ String paths = new String();
+ for (String path:pathList)
+ paths += path + ",";
+ paths = paths.substring(0, paths.lastIndexOf(",")); // remove trailing comma
+
+ String queryLoad = "records1 = LOAD '"
+ + paths
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('" + columns + "', 'sorted');";
+
+ System.out.println("queryLoad: " + queryLoad);
+
+ pigServer.registerQuery(queryLoad);
+
+ // Return iterator
+ Iterator<Tuple> it1 = pigServer.openIterator("records1");
+ return it1;
+ }
+
+
+ @Test
+ public void test_sorted_table_union_01() throws ExecException, IOException {
+ //
+ // Test sorted union
+ //
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table1", "a,b,c"); // input table and sort keys
+ inputTables.put("table2", "a,b,c"); // input table and sort keys
+
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "a,b,c,d,e,f,m1");
+
+ // Verify union table
+ ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+
+ addResultRow(resultTable, -1, -99.0f, 1002L, 51e+2, "orange", new DataByteArray("orange"),m2);
+ addResultRow(resultTable, -1, 3.25f, 1000L, 50e+2, "zebra", new DataByteArray("zebra"), m1);
+ addResultRow(resultTable, 5, -3.25f, 1001L, 51e+2, "Zebra", new DataByteArray("Zebra"), m1);
+ addResultRow(resultTable, 15, 56.0f, 1004L, 50e+2, "green", new DataByteArray("green"), m2);
+
+ addResultRow(resultTable, 1000, 0.0f, 1002L, 52e+2, "hadoop", new DataByteArray("hadoop"),m1);
+ addResultRow(resultTable, 1001, 50.0f, 1000L, 50e+2, "Pig", new DataByteArray("Pig"), m1);
+ addResultRow(resultTable, 1001, 50.0f, 1008L, 52e+2, "gray", new DataByteArray("gray"), m2);
+ addResultRow(resultTable, 1001, 52.0f, 1001L, 50e+2, "pig", new DataByteArray("pig"), m1);
+
+ addResultRow(resultTable, 1001, 53.0f, 1001L, 52e+2, "brown", new DataByteArray("brown"), m2);
+ addResultRow(resultTable, 1001, 100.0f, 1003L, 50e+2, "Apple", new DataByteArray("Apple"), m1);
+ addResultRow(resultTable, 1001, 100.0f, 1003L, 55e+2, "white", new DataByteArray("white"), m2);
+ addResultRow(resultTable, 1001, 101.0f, 1001L, 50e+2, "apple", new DataByteArray("apple"), m1);
+
+ addResultRow(resultTable, 1001, 102.0f, 1001L, 52e+2, "purple", new DataByteArray("purple"),m2);
+ addResultRow(resultTable, 1002, 28.0f, 1000L, 50e+2, "Hadoop", new DataByteArray("Hadoop"),m1);
+ addResultRow(resultTable, 2000, 33.0f, 1006L, 52e+2, "beige", new DataByteArray("beige"), m2);
+
+ // Verify union table
+ Iterator<Tuple> it = pigServer.openIterator("records1");
+ int numbRows = verifyTable(resultTable, 0, it);
+
+ Assert.assertEquals(numbRows, table1.length + table2.length);
+ }
+
+ @Test
+ public void test_sorted_table_union_02() throws ExecException, IOException {
+ //
+ // Test sorted union
+ //
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table1", "a,b,c"); // input table and sort keys
+ inputTables.put("table2", "a,b,c"); // input table and sort keys
+
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "a,b,c,d,e,f,m1,source_table");
+
+ // Verify union table
+ ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+
+ addResultRow(resultTable, -1, -99.0f, 1002L, 51e+2, "orange", new DataByteArray("orange"),m2, 1);
+ addResultRow(resultTable, -1, 3.25f, 1000L, 50e+2, "zebra", new DataByteArray("zebra"), m1, 0);
+ addResultRow(resultTable, 5, -3.25f, 1001L, 51e+2, "Zebra", new DataByteArray("Zebra"), m1, 0);
+ addResultRow(resultTable, 15, 56.0f, 1004L, 50e+2, "green", new DataByteArray("green"), m2, 1);
+
+ addResultRow(resultTable, 1000, 0.0f, 1002L, 52e+2, "hadoop", new DataByteArray("hadoop"),m1, 0);
+ addResultRow(resultTable, 1001, 50.0f, 1000L, 50e+2, "Pig", new DataByteArray("Pig"), m1, 0);
+ addResultRow(resultTable, 1001, 50.0f, 1008L, 52e+2, "gray", new DataByteArray("gray"), m2, 1);
+ addResultRow(resultTable, 1001, 52.0f, 1001L, 50e+2, "pig", new DataByteArray("pig"), m1, 0);
+
+ addResultRow(resultTable, 1001, 53.0f, 1001L, 52e+2, "brown", new DataByteArray("brown"), m2, 1);
+ addResultRow(resultTable, 1001, 100.0f, 1003L, 50e+2, "Apple", new DataByteArray("Apple"), m1, 0);
+ addResultRow(resultTable, 1001, 100.0f, 1003L, 55e+2, "white", new DataByteArray("white"), m2, 1);
+ addResultRow(resultTable, 1001, 101.0f, 1001L, 50e+2, "apple", new DataByteArray("apple"), m1, 0);
+
+ addResultRow(resultTable, 1001, 102.0f, 1001L, 52e+2, "purple", new DataByteArray("purple"),m2, 1);
+ addResultRow(resultTable, 1002, 28.0f, 1000L, 50e+2, "Hadoop", new DataByteArray("Hadoop"),m1, 0);
+ addResultRow(resultTable, 2000, 33.0f, 1006L, 52e+2, "beige", new DataByteArray("beige"), m2, 1);
+
+ // Verify union table
+ Iterator<Tuple> it = pigServer.openIterator("records1");
+ int numbRows = verifyTable(resultTable, 0, it);
+
+ Assert.assertEquals(numbRows, table1.length + table2.length);
+ }
+
+ @Test
+ public void test_sorted_table_union_03() throws ExecException, IOException {
+ //
+ // Test sorted union
+ //
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table1", "a,b,c"); // input table and sort keys
+ inputTables.put("table2", "a,b"); // input table and sort keys
+
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "a,b,c,d,e,f,m1,source_table");
+
+ // Verify union table
+ ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+
+ addResultRow(resultTable, -1, -99.0f, 1002L, 51e+2, "orange", new DataByteArray("orange"),m2, 1);
+ addResultRow(resultTable, -1, 3.25f, 1000L, 50e+2, "zebra", new DataByteArray("zebra"), m1, 0);
+ addResultRow(resultTable, 5, -3.25f, 1001L, 51e+2, "Zebra", new DataByteArray("Zebra"), m1, 0);
+ addResultRow(resultTable, 15, 56.0f, 1004L, 50e+2, "green", new DataByteArray("green"), m2, 1);
+
+ addResultRow(resultTable, 1000, 0.0f, 1002L, 52e+2, "hadoop", new DataByteArray("hadoop"),m1, 0);
+ addResultRow(resultTable, 1001, 50.0f, 1008L, 52e+2, "gray", new DataByteArray("gray"), m2, 1);
+ addResultRow(resultTable, 1001, 50.0f, 1000L, 50e+2, "Pig", new DataByteArray("Pig"), m1, 0);
+ addResultRow(resultTable, 1001, 52.0f, 1001L, 50e+2, "pig", new DataByteArray("pig"), m1, 0);
+
+ addResultRow(resultTable, 1001, 53.0f, 1001L, 52e+2, "brown", new DataByteArray("brown"), m2, 1);
+ addResultRow(resultTable, 1001, 100.0f, 1003L, 55e+2, "white", new DataByteArray("white"), m2, 1);
+ addResultRow(resultTable, 1001, 100.0f, 1003L, 50e+2, "Apple", new DataByteArray("Apple"), m1, 0);
+ addResultRow(resultTable, 1001, 101.0f, 1001L, 50e+2, "apple", new DataByteArray("apple"), m1, 0);
+
+ addResultRow(resultTable, 1001, 102.0f, 1001L, 52e+2, "purple", new DataByteArray("purple"),m2, 1);
+ addResultRow(resultTable, 1002, 28.0f, 1000L, 50e+2, "Hadoop", new DataByteArray("Hadoop"),m1, 0);
+ addResultRow(resultTable, 2000, 33.0f, 1006L, 52e+2, "beige", new DataByteArray("beige"), m2, 1);
+
+ // Verify union table
+ Iterator<Tuple> it = pigServer.openIterator("records1");
+ int numbRows = verifyTable(resultTable, 0, it);
+
+ Assert.assertEquals(numbRows, table1.length + table2.length);
+ }
+
+ @Test
+ public void test_sorted_table_union_04() throws ExecException, IOException {
+ //
+ // Test sorted union
+ //
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table1", "e"); // input table and sort keys
+ inputTables.put("table2", "e"); // input table and sort keys
+
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "source_table,e,c,b,a");
+
+ // Verify union table
+ ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+
+ addResultRow(resultTable, 0, "Apple", 1003L, 100.0f, 1001);
+ addResultRow(resultTable, 0, "Hadoop", 1000L, 28.0f, 1002);
+ addResultRow(resultTable, 0, "Pig", 1000L, 50.0f, 1001);
+ addResultRow(resultTable, 0, "Zebra", 1001L, -3.25f, 5);
+
+ addResultRow(resultTable, 0, "apple", 1001L, 101.0f, 1001);
+ addResultRow(resultTable, 1, "beige", 1006L, 33.0f, 2000);
+ addResultRow(resultTable, 1, "brown", 1001L, 53.0f, 1001);
+ addResultRow(resultTable, 1, "gray", 1008L, 50.0f, 1001);
+
+ addResultRow(resultTable, 1, "green", 1004L, 56.0f, 15);
+ addResultRow(resultTable, 0, "hadoop", 1002L, 0.0f, 1000);
+ addResultRow(resultTable, 1, "orange", 1002L, -99.0f, -1);
+ addResultRow(resultTable, 0, "pig", 1001L, 52.0f, 1001);
+
+ addResultRow(resultTable, 1, "purple", 1001L, 102.0f, 1001);
+ addResultRow(resultTable, 1, "white", 1003L, 100.0f, 1001);
+ addResultRow(resultTable, 0, "zebra", 1000L, 3.25f, -1);
+
+ // Verify union table
+ Iterator<Tuple> it = pigServer.openIterator("records1");
+ int numbRows = verifyTable(resultTable, 0, it);
+
+ Assert.assertEquals(numbRows, table1.length + table2.length);
+ }
+
+
+ @Test
+ public void test_sorted_table_union_05() throws ExecException, IOException {
+ //
+ // Test sorted union
+ //
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table1", "e,c"); // input table and sort keys
+ inputTables.put("table2", "e,c"); // input table and sort keys
+ inputTables.put("table5", "e,c"); // input table and sort keys
+
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "e,c,a,b,d,f,m1,source_table");
+
+ // Verify union table
+ ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+
+ addResultRow(resultTable, "Apple", 1003L, 1001, 100.0f, 50e+2, new DataByteArray("Apple"), m1, 0);
+ addResultRow(resultTable, "Hadoop", 1000L, 1002, 28.0f, 50e+2, new DataByteArray("Hadoop"),m1, 0);
+ addResultRow(resultTable, "Pig", 1000L, 1001, 50.0f, 50e+2, new DataByteArray("Pig"), m1, 0);
+ addResultRow(resultTable, "Zebra", 1001L, 5, -3.25f, 51e+2, new DataByteArray("Zebra"), m1, 0);
+
+ addResultRow(resultTable, "apple", 1001L, 1001, 101.0f, 50e+2, new DataByteArray("apple"), m1, 0);
+ addResultRow(resultTable, "beige", 1006L, 2000, 33.0f, 52e+2, new DataByteArray("beige"), m2, 1);
+ addResultRow(resultTable, "brown", 1001L, 1001, 53.0f, 52e+2, new DataByteArray("brown"), m2, 1);
+ addResultRow(resultTable, "gray", 1008L, 1001, 50.0f, 52e+2, new DataByteArray("gray"), m2, 1);
+
+ addResultRow(resultTable, "green", 1004L, 15, 56.0f, 50e+2, new DataByteArray("green"), m2, 1);
+ addResultRow(resultTable, "hadoop", 1002L, 1000, 0.0f, 52e+2, new DataByteArray("hadoop"),m1, 0);
+ addResultRow(resultTable, "orange", 1002L, -1, -99.0f, 51e+2, new DataByteArray("orange"),m2, 1);
+ addResultRow(resultTable, "pig", 1001L, 1001, 52.0f, 50e+2, new DataByteArray("pig"), m1, 0);
+
+ addResultRow(resultTable, "purple", 1001L, 1001, 102.0f, 52e+2, new DataByteArray("purple"),m2, 1);
+ addResultRow(resultTable, "string0",1000L, null, 3.25f, 53e+2, new DataByteArray("orange"),null, 2);
+ addResultRow(resultTable, "string0",1001L, null, 3.26f, 51e+2, new DataByteArray("purple"),null, 2);
+ addResultRow(resultTable, "string1",1000L, null, 3.25f, 50e+2, new DataByteArray("blue"), null, 2);
+
+ addResultRow(resultTable, "string1",1001L, null, 3.25f, 51e+2, new DataByteArray("green"), null, 2);
+ addResultRow(resultTable, "string1",1001L, null, 3.24f, 53e+2, new DataByteArray("yellow"),null, 2);
+ addResultRow(resultTable, "string2",1001L, null, 3.28f, 51e+2, new DataByteArray("white"), null, 2);
+ addResultRow(resultTable, "white", 1003L, 1001, 100.0f, 55e+2, new DataByteArray("white"), m2, 1);
+
+ addResultRow(resultTable, "zebra", 1000L, -1, 3.25f, 50e+2, new DataByteArray("zebra"), m1, 0);
+
+ // Verify union table
+ Iterator<Tuple> it = pigServer.openIterator("records1");
+ int numbRows = verifyTable(resultTable, 0, it);
+
+ Assert.assertEquals(numbRows, table1.length + table2.length + table5.length);
+ }
+
+ @Test
+ public void test_sorted_table_union_06() throws ExecException, IOException {
+ //
+ // Test sorted union
+ //
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table5", "e"); // input table and sort keys
+ inputTables.put("table1", "e"); // input table and sort keys
+ inputTables.put("table2", "e"); // input table and sort keys
+
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "b,a,source_table,c,d,f,m1,e");
+
+ // Verify union table
+ ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+
+ addResultRow(resultTable, 100.0f, 1001, 0, 1003L, 50e+2, new DataByteArray("Apple"), m1, "Apple");
+ addResultRow(resultTable, 28.0f, 1002, 0, 1000L, 50e+2, new DataByteArray("Hadoop"),m1, "Hadoop");
+ addResultRow(resultTable, 50.0f, 1001, 0, 1000L, 50e+2, new DataByteArray("Pig"), m1, "Pig");
+ addResultRow(resultTable, -3.25f, 5, 0, 1001L, 51e+2, new DataByteArray("Zebra"), m1, "Zebra");
+
+ addResultRow(resultTable, 101.0f, 1001, 0, 1001L, 50e+2, new DataByteArray("apple"), m1, "apple");
+ addResultRow(resultTable, 33.0f, 2000, 1, 1006L, 52e+2, new DataByteArray("beige"), m2, "beige");
+ addResultRow(resultTable, 53.0f, 1001, 1, 1001L, 52e+2, new DataByteArray("brown"), m2, "brown");
+ addResultRow(resultTable, 50.0f, 1001, 1, 1008L, 52e+2, new DataByteArray("gray"), m2, "gray");
+
+ addResultRow(resultTable, 56.0f, 15, 1, 1004L, 50e+2, new DataByteArray("green"), m2, "green");
+ addResultRow(resultTable, 0.0f, 1000, 0, 1002L, 52e+2, new DataByteArray("hadoop"),m1, "hadoop");
+ addResultRow(resultTable, -99.0f, -1, 1, 1002L, 51e+2, new DataByteArray("orange"),m2, "orange");
+ addResultRow(resultTable, 52.0f, 1001, 0, 1001L, 50e+2, new DataByteArray("pig"), m1, "pig");
+
+ addResultRow(resultTable, 102.0f, 1001, 1, 1001L, 52e+2, new DataByteArray("purple"),m2, "purple");
+ addResultRow(resultTable, 3.26f, null, 2, 1001L, 51e+2, new DataByteArray("purple"),null,"string0");
+ addResultRow(resultTable, 3.25f, null, 2, 1000L, 53e+2, new DataByteArray("orange"),null,"string0");
+ addResultRow(resultTable, 3.25f, null, 2, 1001L, 51e+2, new DataByteArray("green"), null,"string1");
+
+ addResultRow(resultTable, 3.25f, null, 2, 1000L, 50e+2, new DataByteArray("blue"), null,"string1");
+ addResultRow(resultTable, 3.24f, null, 2, 1001L, 53e+2, new DataByteArray("yellow"),null,"string1");
+ addResultRow(resultTable, 3.28f, null, 2, 1001L, 51e+2, new DataByteArray("white"), null,"string2");
+ addResultRow(resultTable, 100.0f, 1001, 1, 1003L, 55e+2, new DataByteArray("white"), m2, "white");
+
+ addResultRow(resultTable, 3.25f, -1, 0, 1000L, 50e+2, new DataByteArray("zebra"), m1, "zebra");
+
+ // Verify union table
+ Iterator<Tuple> it = pigServer.openIterator("records1");
+ int numbRows = verifyTable(resultTable, 7, it);
+
+ Assert.assertEquals(numbRows, table1.length + table2.length + table5.length);
+ }
+
+ @Test
+ public void test_sorted_table_union_07() throws ExecException, IOException {
+ //
+ // Test sorted union
+ //
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table1", "e,b,c"); // input table and sort keys
+ inputTables.put("table2", "e,b"); // input table and sort keys
+ inputTables.put("table5", "e"); // input table and sort keys
+
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "e,source_table,b,c,int1,f,m1");
+
+ // Verify union table
+ ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+
+ addResultRow(resultTable, "Apple", 0, 100.0f, 1003L, null, new DataByteArray("Apple"), m1);
+ addResultRow(resultTable, "Hadoop", 0, 28.0f, 1000L, null, new DataByteArray("Hadoop"),m1);
+ addResultRow(resultTable, "Pig", 0, 50.0f, 1000L, null, new DataByteArray("Pig"), m1);
+ addResultRow(resultTable, "Zebra", 0, -3.25f, 1001L, null, new DataByteArray("Zebra"), m1);
+
+ addResultRow(resultTable, "apple", 0, 101.0f, 1001L, null, new DataByteArray("apple"), m1);
+ addResultRow(resultTable, "beige", 1, 33.0f, 1006L, null, new DataByteArray("beige"), m2);
+ addResultRow(resultTable, "brown", 1, 53.0f, 1001L, null, new DataByteArray("brown"), m2);
+ addResultRow(resultTable, "gray", 1, 50.0f, 1008L, null, new DataByteArray("gray"), m2);
+
+ addResultRow(resultTable, "green", 1, 56.0f, 1004L, null, new DataByteArray("green"), m2);
+ addResultRow(resultTable, "hadoop", 0, 0.0f, 1002L, null, new DataByteArray("hadoop"),m1);
+ addResultRow(resultTable, "orange", 1, -99.0f, 1002L, null, new DataByteArray("orange"),m2);
+ addResultRow(resultTable, "pig", 0, 52.0f, 1001L, null, new DataByteArray("pig"), m1);
+
+ addResultRow(resultTable, "purple", 1, 102.0f, 1001L, null, new DataByteArray("purple"),m2);
+ addResultRow(resultTable, "string0",2, 3.26f, 1001L, 100, new DataByteArray("purple"),null);
+ addResultRow(resultTable, "string0",2, 3.25f, 1000L, 100, new DataByteArray("orange"),null);
+ addResultRow(resultTable, "string1",2, 3.25f, 1001L, 100, new DataByteArray("green"),null);
+
+ addResultRow(resultTable, "string1",2, 3.25f, 1000L, 100, new DataByteArray("blue"),null);
+ addResultRow(resultTable, "string1",2, 3.24f, 1001L, 100, new DataByteArray("yellow"),null);
+ addResultRow(resultTable, "string2",2, 3.28f, 1001L, 100, new DataByteArray("white"),null);
+ addResultRow(resultTable, "white", 1, 100.0f, 1003L, null, new DataByteArray("white"), m2);
+
+ addResultRow(resultTable, "zebra", 0, 3.25f, 1000L, null, new DataByteArray("zebra"), m1);
+
+ // Verify union table
+ Iterator<Tuple> it = pigServer.openIterator("records1");
+ int numbRows = verifyTable(resultTable, 0, it);
+
+ Assert.assertEquals(numbRows, table1.length + table2.length + table5.length);
+ }
+
+ @Test
+ public void test_sorted_table_union_08() throws ExecException, IOException {
+ //
+ // Test sorted union
+ //
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table1", "e"); // input table and sort keys
+ inputTables.put("table2", "e"); // input table and sort keys
+ inputTables.put("table3", "e"); // input table and sort keys
+ inputTables.put("table5", "e"); // input table and sort keys
+ inputTables.put("table6", "e"); // input table and sort keys
+
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "e,source_table,a,b,c,m1,int1,str1,str2,str3,str4,map1");
+
+ // Verify union table
+ ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+
+ addResultRow(resultTable, "Apple", 0, 1001, 100.0f, 1003L, m1, null, null, null, null, null, null);
+ addResultRow(resultTable, "Hadoop", 0, 1002, 28.0f, 1000L, m1, null, null, null, null, null, null);
+ addResultRow(resultTable, "Pig", 0, 1001, 50.0f, 1000L, m1, null, null, null, null, null, null);
+ addResultRow(resultTable, "Zebra", 0, 5, -3.25f, 1001L, m1, null, null, null, null, null, null);
+
+ addResultRow(resultTable, "a 1", 1, null, null, null, null, 8, "Reno", "Nevada", null, null, m3);
+ addResultRow(resultTable, "a 1", 4, null, null, null, null, null, null, null, "Reno", "Nevada", null);
+ addResultRow(resultTable, "a 2", 1, null, null, null, null, 7, "Dallas", "Texas", null, null, m3);
+ addResultRow(resultTable, "a 2", 4, null, null, null, null, null, null, null, "Dallas", "Texas", null);
+
+ addResultRow(resultTable, "a 3", 1, null, null, null, null, 6, "Phoenix", "Arizona", null, null, m3);
+ addResultRow(resultTable, "a 3", 4, null, null, null, null, null, null, null, "Phoenix", "Arizona", null);
+ addResultRow(resultTable, "a 4", 1, null, null, null, null, 5, "New York", "New York", null, null, m3);
+ addResultRow(resultTable, "a 4", 4, null, null, null, null, null, null, null, "New York", "New York", null);
+
+ addResultRow(resultTable, "a 5", 1, null, null, null, null, 4, "Las Vegas","Nevada", null, null, m3);
+ addResultRow(resultTable, "a 5", 4, null, null, null, null, null, null, null, "Las Vegas","Nevada", null);
+ addResultRow(resultTable, "a 6", 1, null, null, null, null, 3, "Santa Cruz","California", null, null, m3);
+ addResultRow(resultTable, "a 6", 4, null, null, null, null, null, null, null, "Santa Cruz","California", null);
+
+ addResultRow(resultTable, "a 7", 1, null, null, null, null, 2, "San Jose","California",null, null, m3);
+ addResultRow(resultTable, "a 7", 4, null, null, null, null, null, null, null, "San Jose","California", null);
+ addResultRow(resultTable, "a 8", 1, null, null, null, null, 1, "Cupertino","California",null, null, m3);
+ addResultRow(resultTable, "a 8", 4, null, null, null, null, null, null, null, "Cupertino","California", null);
+
+ addResultRow(resultTable, "apple", 0, 1001, 101.0f, 1001L, m1, null, null, null, null, null, null);
+ addResultRow(resultTable, "beige", 2, 2000, 33.0f, 1006L, m2, null, null, null, null, null, null);
+ addResultRow(resultTable, "brown", 2, 1001, 53.0f, 1001L, m2, null, null, null, null, null, null);
+ addResultRow(resultTable, "gray", 2, 1001, 50.0f, 1008L, m2, null, null, null, null, null, null);
+
+ addResultRow(resultTable, "green", 2, 15, 56.0f, 1004L, m2, null, null, null, null, null, null);
+ addResultRow(resultTable, "hadoop", 0, 1000, 0.0f, 1002L, m1, null, null, null, null, null, null);
+ addResultRow(resultTable, "orange", 2, -1, -99.0f, 1002L, m2, null, null, null, null, null, null);
+ addResultRow(resultTable, "pig", 0, 1001, 52.0f, 1001L, m1, null, null, null, null, null, null);
+
+ addResultRow(resultTable, "purple", 2, 1001, 102.0f, 1001L, m2, null, null, null, null, null, null);
+ addResultRow(resultTable, "string0",3, null, 3.26f, 1001L, null, 100, null, null, null, null, null);
+ addResultRow(resultTable, "string0",3, null, 3.25f, 1000L, null, 100, null, null, null, null, null);
+ addResultRow(resultTable, "string1",3, null, 3.25f, 1001L, null, 100, null, null, null, null, null);
+
+ addResultRow(resultTable, "string1",3, null, 3.25f, 1000L, null, 100, null, null, null, null, null);
+ addResultRow(resultTable, "string1",3, null, 3.24f, 1001L, null, 100, null, null, null, null, null);
+ addResultRow(resultTable, "string2",3, null, 3.28f, 1001L, null, 100, null, null, null, null, null);
+ addResultRow(resultTable, "white", 2, 1001, 100.0f, 1003L, m2, null, null, null, null, null, null);
+
+ addResultRow(resultTable, "zebra", 0, -1, 3.25f, 1000L, m1, null, null, null, null, null, null);
+
+ // Verify union table
+ Iterator<Tuple> it = pigServer.openIterator("records1");
+ int numbRows = verifyTable(resultTable, 0, it);
+
+ Assert.assertEquals(numbRows, table1.length + table2.length + table3.length +
+ table5.length + table6.length);
+ }
+
+ @Test
+ public void test_sorted_table_union_09() throws ExecException, IOException {
+ //
+ // Test sorted union
+ //
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table1", "e"); // input table and sort keys
+ inputTables.put("table2", "e"); // input table and sort keys
+
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "e,source_table,xx,yy,zz"); // extra undefined columns
+
+ // Verify union table
+ ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+
+ addResultRow(resultTable, "Apple", 0, null, null, null);
+ addResultRow(resultTable, "Hadoop", 0, null, null, null);
+ addResultRow(resultTable, "Pig", 0, null, null, null);
+ addResultRow(resultTable, "Zebra", 0, null, null, null);
+
+ addResultRow(resultTable, "apple", 0, null, null, null);
+ addResultRow(resultTable, "beige", 1, null, null, null);
+ addResultRow(resultTable, "brown", 1, null, null, null);
+ addResultRow(resultTable, "gray", 1, null, null, null);
+
+ addResultRow(resultTable, "green", 1, null, null, null);
+ addResultRow(resultTable, "hadoop", 0, null, null, null);
+ addResultRow(resultTable, "orange", 1, null, null, null);
+ addResultRow(resultTable, "pig", 0, null, null, null);
+
+ addResultRow(resultTable, "purple", 1, null, null, null);
+ addResultRow(resultTable, "white", 1, null, null, null);
+ addResultRow(resultTable, "zebra", 0, null, null, null);
+
+ // Verify union table
+ Iterator<Tuple> it = pigServer.openIterator("records1");
+ int numbRows = verifyTable(resultTable, 0, it);
+
+ Assert.assertEquals(numbRows, table1.length + table2.length);
+ }
+
+ @Test
+ public void test_sorted_table_union_10() throws ExecException, IOException {
+ //
+ // Test sorted union
+ //
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table1", "a,b"); // input table and sort keys
+ inputTables.put("table2", "a,b"); // input table and sort keys
+
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "a, b, c ,,, d , e,,,"); // extra commas and spaces
+
+ // Verify union table
+ ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+
+ addResultRow(resultTable, -1, -99.0f, 1002L, null, null, 51e+2, "orange");
+ addResultRow(resultTable, -1, 3.25f, 1000L, null, null, 50e+2, "zebra");
+ addResultRow(resultTable, 5, -3.25f, 1001L, null, null, 51e+2, "Zebra");
+ addResultRow(resultTable, 15, 56.0f, 1004L, null, null, 50e+2, "green");
+
+ addResultRow(resultTable, 1000, 0.0f, 1002L, null, null, 52e+2, "hadoop");
+ addResultRow(resultTable, 1001, 50.0f, 1008L, null, null, 52e+2, "gray");
+ addResultRow(resultTable, 1001, 50.0f, 1000L, null, null, 50e+2, "Pig");
+ addResultRow(resultTable, 1001, 52.0f, 1001L, null, null, 50e+2, "pig");
+
+ addResultRow(resultTable, 1001, 53.0f, 1001L, null, null, 52e+2, "brown");
+ addResultRow(resultTable, 1001, 100.0f, 1003L, null, null, 50e+2, "Apple");
+ addResultRow(resultTable, 1001, 100.0f, 1003L, null, null, 55e+2, "white");
+ addResultRow(resultTable, 1001, 101.0f, 1001L, null, null, 50e+2, "apple");
+
+ addResultRow(resultTable, 1001, 102.0f, 1001L, null, null, 52e+2, "purple");
+ addResultRow(resultTable, 1002, 28.0f, 1000L, null, null, 50e+2, "Hadoop");
+ addResultRow(resultTable, 2000, 33.0f, 1006L, null, null, 52e+2, "beige");
+
+ // Verify union table
+ Iterator<Tuple> it = pigServer.openIterator("records1");
+ int numbRows = verifyTable(resultTable, 0, it);
+
+ Assert.assertEquals(numbRows, table1.length + table2.length);
+ }
+
+ @Test
+ public void test_sorted_table_union_11() throws ExecException, IOException {
+ //
+ // Test sorted union
+ //
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table1", "e"); // input table and sort keys
+ inputTables.put("table2", "e"); // input table and sort keys
+
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "source_table,b,c,int1,f,m1"); // sort key e not in projection
+
+ // Verify union table
+ ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>();
+
+ addResultRow(resultTable, 0, 100.0f, 1003L, null, new DataByteArray("Apple"), m1);
+ addResultRow(resultTable, 0, 28.0f, 1000L, null, new DataByteArray("Hadoop"),m1);
+ addResultRow(resultTable, 0, 50.0f, 1000L, null, new DataByteArray("Pig"), m1);
+ addResultRow(resultTable, 0, -3.25f, 1001L, null, new DataByteArray("Zebra"), m1);
+
+ addResultRow(resultTable, 0, 101.0f, 1001L, null, new DataByteArray("apple"), m1);
+ addResultRow(resultTable, 1, 33.0f, 1006L, null, new DataByteArray("beige"), m2);
+ addResultRow(resultTable, 1, 53.0f, 1001L, null, new DataByteArray("brown"), m2);
+ addResultRow(resultTable, 1, 50.0f, 1008L, null, new DataByteArray("gray"), m2);
+
+ addResultRow(resultTable, 1, 56.0f, 1004L, null, new DataByteArray("green"), m2);
+ addResultRow(resultTable, 0, 0.0f, 1002L, null, new DataByteArray("hadoop"),m1);
+ addResultRow(resultTable, 1, -99.0f, 1002L, null, new DataByteArray("orange"),m2);
+ addResultRow(resultTable, 0, 52.0f, 1001L, null, new DataByteArray("pig"), m1);
+
+ addResultRow(resultTable, 1, 102.0f, 1001L, null, new DataByteArray("purple"),m2);
+ addResultRow(resultTable, 1, 100.0f, 1003L, null, new DataByteArray("white"), m2);
+ addResultRow(resultTable, 0, 3.25f, 1000L, null, new DataByteArray("zebra"), m1);
+
+ // Verify union table
+ Iterator<Tuple> it = pigServer.openIterator("records1");
+ int numbRows = verifyTable(resultTable, 4, it);
+
+ Assert.assertEquals(numbRows, table1.length + table2.length);
+ }
+
+ /**
+ * Verify union output table with expected results
+ *
+ */
+ private int verifyTable(ArrayList<ArrayList<Object>> resultTable, int keyColumn, Iterator<Tuple> it) throws IOException {
+ int numbRows = 0;
+ int index = 0;
+ Object value = resultTable.get(index).get(keyColumn); // get value of primary key
+
+ while (it.hasNext()) {
+ Tuple rowValues = it.next();
+
+ // If last primary sort key does match then search for next matching key
+ if (! compareObj(value, rowValues.get(keyColumn))) {
+ int subIndex = index + 1;
+ while (subIndex < resultTable.size()) {
+ if ( ! compareObj(value, resultTable.get(subIndex).get(keyColumn)) ) { // found new key
+ index = subIndex;
+ value = resultTable.get(index).get(keyColumn);
+ break;
+ }
+ ++subIndex;
+ }
+ Assert.assertEquals("Table comparison error for row : " + numbRows + " - no key found for : "
+ + rowValues.get(keyColumn), value, rowValues.get(keyColumn));
+ }
+ // Search for matching row with this primary key
+ int subIndex = index;
+
+ while (subIndex < resultTable.size()) {
+ // Compare row
+ ArrayList<Object> resultRow = resultTable.get(subIndex);
+ if ( compareRow(rowValues, resultRow) )
+ break; // found matching row
+ ++subIndex;
+ Assert.assertEquals("Table comparison error for row : " + numbRows + " - no matching row found for : "
+ + rowValues.get(keyColumn), value, resultTable.get(subIndex).get(keyColumn));
+ }
+ ++numbRows;
+ }
+ Assert.assertEquals(resultTable.size(), numbRows); // verify expected row count
+ return numbRows;
+ }
+
+ /**
+ * Compare table rows
+ *
+ */
+ private boolean compareRow(Tuple rowValues, ArrayList<Object> resultRow) throws IOException {
+ boolean result = true;
+ Assert.assertEquals(resultRow.size(), rowValues.size());
+ for (int i = 0; i < rowValues.size(); ++i) {
+ if (! compareObj(rowValues.get(i), resultRow.get(i)) ) {
+ result = false;
+ break;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Compare table values
+ *
+ */
+ private boolean compareObj(Object object1, Object object2) {
+ if (object1 == null) {
+ if (object2 == null)
+ return true;
+ else
+ return false;
+ } else if (object1.equals(object2))
+ return true;
+ else
+ return false;
+ }
+
+ /**
+ *Add a row to expected results table
+ *
+ */
+ private void addResultRow(ArrayList<ArrayList<Object>> resultTable, Object ... values) {
+ ArrayList<Object> resultRow = new ArrayList<Object>();
+
+ for (int i = 0; i < values.length; i++) {
+ resultRow.add(values[i]);
+ }
+ resultTable.add(resultRow);
+ }
+
+ /**
+ * Print Pig Table (for debugging)
+ *
+ */
+ private int printTable(String tablename) throws IOException {
+ Iterator<Tuple> it1 = pigServer.openIterator(tablename);
+ int numbRows = 0;
+ while (it1.hasNext()) {
+ Tuple RowValue1 = it1.next();
+ ++numbRows;
+ System.out.println();
+ for (int i = 0; i < RowValue1.size(); ++i)
+ System.out.println("DEBUG: " + tablename + " RowValue.get(" + i + ") = " + RowValue1.get(i));
+ }
+ System.out.println("\nRow count : " + numbRows);
+ return numbRows;
+ }
+
+ /**
+ * Print loaded tables with indexes (for debugging)
+ *
+ */
+ private void printTableList() {
+ System.out.println();
+ for (int i=0; i<tableList.size(); ++i) {
+ System.out.println("Load table " + i + " : " + tableList.get(i));
+ }
+ }
+
+ /**
+ * Return the name of the routine that called getCurrentMethodName
+ *
+ */
+ private String getCurrentMethodName() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter pw = new PrintWriter(baos);
+ (new Throwable()).printStackTrace(pw);
+ pw.flush();
+ String stackTrace = baos.toString();
+ pw.close();
+
+ StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+ tok.nextToken(); // 'java.lang.Throwable'
+ tok.nextToken(); // 'at ...getCurrentMethodName'
+ String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+ // Parse line 3
+ tok = new StringTokenizer(l.trim(), " <(");
+ String t = tok.nextToken(); // 'at'
+ t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+ return t;
+ }
+
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,838 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestOrderPreserveProjectionNegative {
+
+ final static String TABLE1_SCHEMA = "a:int,b:float,c:long,d:double,e:string,f:bytes,m1:map(string)";
+ final static String TABLE1_STORAGE = "[a, b, c]; [d, e, f]; [m1#{a}]";
+
+ final static String TABLE2_SCHEMA = "a:int,b:float,c:long,d:double,e:string,f:bytes,m1:map(string)";
+ final static String TABLE2_STORAGE = "[a, b, c]; [d, e, f]; [m1#{a}]";
+
+ final static String TABLE3_SCHEMA = "e:string,str1:string,str2:string,int1:int,map1:map(string)";
+ final static String TABLE3_STORAGE = "[e, str1]; [str2,int1,map1#{a}]";
+
+ final static String TABLE4_SCHEMA = "int1:int,str1:string,long1:long";
+ final static String TABLE4_STORAGE = "[int1]; [str1,long1]";
+
+ final static String TABLE5_SCHEMA = "b:float,d:double,c:long,e:string,f:bytes,int1:int";
+ final static String TABLE5_STORAGE = "[b,d]; [c]; [e]; [f,int1]";
+
+ final static String TABLE6_SCHEMA = "a:string,b:string,str1:string";
+ final static String TABLE6_STORAGE = "[a,b,str1]";
+
+ final static String TABLE7_SCHEMA = "int2:int";
+ final static String TABLE7_STORAGE = "[int2]";
+
+ static Path pathWorking = null;
+
+ static int fileId = 0;
+ static int sortId = 0;
+
+ protected static ExecType execType = ExecType.MAPREDUCE;
+ private static MiniCluster cluster;
+ protected static PigServer pigServer;
+ protected static ExecJob pigJob;
+
+ private static Path pathTable1;
+ private static Path pathTable2;
+ private static Path pathTable3;
+ private static Path pathTable4;
+ private static Path pathTable5;
+ private static Path pathTable6;
+ private static Path pathTable7;
+ private static HashMap<String, String> tableStorage;
+
+ private static Configuration conf;
+
+ private static Object[][] table1;
+ private static Object[][] table2;
+ private static Object[][] table3;
+ private static Object[][] table4;
+ private static Object[][] table5;
+ private static Object[][] table6;
+ private static Object[][] table7;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System.setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+ }
+
+ if (execType == ExecType.MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ } else {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+
+ conf = new Configuration();
+ FileSystem fs = cluster.getFileSystem();
+ pathWorking = fs.getWorkingDirectory();
+
+ pathTable1 = new Path(pathWorking, "table1");
+ pathTable2 = new Path(pathWorking, "table2");
+ pathTable3 = new Path(pathWorking, "table3");
+ pathTable4 = new Path(pathWorking, "table4");
+ pathTable5 = new Path(pathWorking, "table5");
+ pathTable6 = new Path(pathWorking, "table6");
+ pathTable7 = new Path(pathWorking, "table7");
+
+ // Create table1 data
+ Map<String, String> m1 = new HashMap<String, String>();
+ m1.put("a","m1-a");
+ m1.put("b","m1-b");
+
+ table1 = new Object[][]{
+ {5, -3.25f, 1001L, 51e+2, "Zebra", new DataByteArray("Zebra"), m1},
+ {-1, 3.25f, 1000L, 50e+2, "zebra", new DataByteArray("zebra"), m1},
+ {1001, 100.0f, 1003L, 50e+2, "Apple", new DataByteArray("Apple"), m1},
+ {1001, 101.0f, 1001L, 50e+2, "apple", new DataByteArray("apple"), m1},
+ {1001, 50.0f, 1000L, 50e+2, "Pig", new DataByteArray("Pig"), m1},
+ {1001, 52.0f, 1001L, 50e+2, "pig", new DataByteArray("pig"), m1},
+ {1002, 28.0f, 1000L, 50e+2, "Hadoop", new DataByteArray("Hadoop"), m1},
+ {1000, 0.0f, 1002L, 52e+2, "hadoop", new DataByteArray("hadoop"), m1} };
+
+ // Create table1
+ createTable(pathTable1, TABLE1_SCHEMA, TABLE1_STORAGE, table1);
+
+ // Create table2 data
+ Map<String, String> m2 = new HashMap<String, String>();
+ m2.put("a","m2-a");
+ m2.put("b","m2-b");
+
+ table2 = new Object[][] {
+ {15, 56.0f, 1004L, 50e+2, "green", new DataByteArray("green"), m2},
+ {-1, -99.0f, 1002L, 51e+2, "orange", new DataByteArray("orange"), m2},
+ {1001, 100.0f, 1003L, 55e+2, "white", new DataByteArray("white"), m2},
+ {1001, 102.0f, 1001L, 52e+2, "purple", new DataByteArray("purple"), m2},
+ {1001, 50.0f, 1008L, 52e+2, "gray", new DataByteArray("gray"), m2},
+ {1001, 53.0f, 1001L, 52e+2, "brown", new DataByteArray("brown"), m2},
+ {2000, 33.0f, 1006L, 52e+2, "beige", new DataByteArray("beige"), m2} };
+
+ // Create table2
+ createTable(pathTable2, TABLE2_SCHEMA, TABLE2_STORAGE, table2);
+
+ // Create table3 data
+ Map<String, String> m3 = new HashMap<String, String>();
+ m3.put("a","m3-a");
+ m3.put("b","m3-b");
+ m3.put("c","m3-b");
+
+ table3 = new Object[][] {
+ {"a 8", "Cupertino", "California", 1, m3},
+ {"a 7", "San Jose", "California", 2, m3},
+ {"a 6", "Santa Cruz", "California", 3, m3},
+ {"a 5", "Las Vegas", "Nevada", 4, m3},
+ {"a 4", "New York", "New York", 5, m3},
+ {"a 3", "Phoenix", "Arizona", 6, m3},
+ {"a 2", "Dallas", "Texas", 7, m3},
+ {"a 1", "Reno", "Nevada", 8, m3} };
+
+ // Create table3
+ createTable(pathTable3, TABLE3_SCHEMA, TABLE3_STORAGE, table3);
+
+ // Create table4 data
+ table4 = new Object[][] {
+ {1, "Cupertino", 1001L},
+ {2, "San Jose", 1008L},
+ {3, "Santa Cruz", 1008L},
+ {4, "Las Vegas", 1008L},
+ {5, "Dallas", 1010L},
+ {6, "Reno", 1000L} };
+
+ // Create table4
+ createTable(pathTable4, TABLE4_SCHEMA, TABLE4_STORAGE, table4);
+
+ // Create table5 data
+ table5 = new Object[][] {
+ {3.25f, 51e+2, 1001L, "string1", new DataByteArray("green"), 100},
+ {3.25f, 51e+2, 1001L, "string1", new DataByteArray("green"), 100},
+ {3.25f, 51e+2, 1001L, "string1", new DataByteArray("green"), 100},
+ {3.25f, 51e+2, 1001L, "string1", new DataByteArray("green"), 100},
+ {3.25f, 51e+2, 1001L, "string1", new DataByteArray("green"), 100},
+ {3.25f, 51e+2, 1001L, "string1", new DataByteArray("green"), 100} };
+
+ // Create table5
+ createTable(pathTable5, TABLE5_SCHEMA, TABLE5_STORAGE, table5);
+
+ // Create table6 data
+ table6 = new Object[][] {
+ {"a 8", "Cupertino", "California"},
+ {"a 7", "San Jose", "California"},
+ {"a 6", "Santa Cruz", "California"},
+ {"a 5", "Las Vegas", "Nevada"},
+ {"a 4", "New York", "New York"},
+ {"a 3", "Phoenix", "Arizona"},
+ {"a 2", "Dallas", "Texas"},
+ {"a 1", "Reno", "Nevada"} };
+
+ // Create table6
+ createTable(pathTable6, TABLE6_SCHEMA, TABLE6_STORAGE, table6);
+
+ // Create table7 data
+ table7 = new Object[][] {
+ {1001},
+ {1000},
+ {1010},
+ {1009},
+ {1001} };
+
+ // Create table7
+ createTable(pathTable7, TABLE7_SCHEMA, TABLE7_STORAGE, table7);
+
+ // Load tables
+ String query1 = "table1 = LOAD '" + pathTable1.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query1);
+ String query2 = "table2 = LOAD '" + pathTable2.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query2);
+ String query3 = "table3 = LOAD '" + pathTable3.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query3);
+ String query4 = "table4 = LOAD '" + pathTable4.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query4);
+ String query5 = "table5 = LOAD '" + pathTable5.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query5);
+ String query6 = "table6 = LOAD '" + pathTable6.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query6);
+ String query7 = "table7 = LOAD '" + pathTable7.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query7);
+ String query8 = "table8 = LOAD '" + pathTable7.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query8);
+
+ // Map for table storage
+ tableStorage = new HashMap<String, String>();
+ tableStorage.put("table1", TABLE1_STORAGE);
+ tableStorage.put("table2", TABLE2_STORAGE);
+ tableStorage.put("table3", TABLE3_STORAGE);
+ tableStorage.put("table4", TABLE4_STORAGE);
+ tableStorage.put("table5", TABLE5_STORAGE);
+ tableStorage.put("table6", TABLE6_STORAGE);
+ tableStorage.put("table7", TABLE7_STORAGE);
+ }
+
+ private static void createTable(Path path, String schemaString, String storageString, Object[][] tableData)
+ throws IOException {
+ //
+ // Create table from tableData array
+ //
+ BasicTable.Writer writer = new BasicTable.Writer(path, schemaString, storageString, conf);
+
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+ TableInserter inserter = writer.getInserter("ins", false);
+
+ for (int i = 0; i < tableData.length; ++i) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tableData[i].length; ++k) {
+ tuple.set(k, tableData[i][k]);
+ System.out.println("DEBUG: setting tuple k=" + k + "value= " + tableData[i][k]);
+ }
+ inserter.insert(new BytesWritable(("key" + i).getBytes()), tuple);
+ }
+ inserter.close();
+ writer.close();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+
+ private Iterator<Tuple> testOrderPreserveUnion(Map<String, String> inputTables, String columns) throws IOException {
+ //
+ // Test order preserve union from input tables and provided output columns
+ //
+ Assert.assertTrue("Table union requires two or more input tables", inputTables.size() >= 2);
+
+ Path newPath = new Path(getCurrentMethodName());
+ ArrayList<String> pathList = new ArrayList<String>();
+
+ // Load and store each of the input tables
+ for (Iterator<String> it = inputTables.keySet().iterator(); it.hasNext();) {
+
+ String tablename = it.next();
+ String sortkey = inputTables.get(tablename);
+
+ String sortName = "sort" + ++sortId;
+
+ // Sort tables
+ String orderby = sortName + " = ORDER " + tablename + " BY " + sortkey + " ;"; // need single quotes?
+ pigServer.registerQuery(orderby);
+
+ String sortPath = new String(newPath.toString() + ++fileId); // increment fileId suffix
+
+ // Store sorted tables
+ pigJob = pigServer.store(sortName, sortPath, TableStorer.class.getCanonicalName() +
+ "('" + tableStorage.get(tablename) + "')");
+ Assert.assertNull(pigJob.getException());
+
+ pathList.add(sortPath); // add table path to list
+ }
+
+ String paths = new String();
+ for (String path:pathList)
+ paths += path + ",";
+ paths = paths.substring(0, paths.lastIndexOf(",")); // remove trailing comma
+
+ String queryLoad = "records1 = LOAD '"
+ + paths
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('" + columns + "', 'sorted');";
+
+ System.out.println("queryLoad: " + queryLoad);
+
+ pigServer.registerQuery(queryLoad);
+
+ // Return iterator
+ Iterator<Tuple> it1 = pigServer.openIterator("records1");
+ return it1;
+ }
+
+ @Test
+ public void union_error_exist_source_table() throws ExecException {
+ //
+ // Test that we do not allow user to create table with column "source_table" (Negative test)
+ //
+ IOException exception = null;
+
+ String STR_SCHEMA_TEST = "a:string,b:string,source_table:int";
+ String STR_STORAGE_TEST = "[a, b]; [source_table]";
+
+ // Create table1 data
+ Object[][] table_test = new Object[][] {
+ {"a1", "z", 5},
+ {"a2", "r", 4},
+ {"a3", "e", 3},
+ {"a4", "a", 1} };
+
+ Path pathTable1 = new Path(pathWorking, "table_test");
+
+ try {
+ // Create table1
+ createTable(pathTable1, STR_SCHEMA_TEST, STR_STORAGE_TEST, table_test);
+
+ } catch (IOException e) {
+ exception = e;
+ } finally {
+ //System.out.println(getStackTrace(exception));
+ Assert.assertNotNull(exception);
+ Assert.assertTrue(exception.toString().contains("[source_table] is a reserved virtual column name"));
+ }
+ }
+
+ @Test
+ public void union_error_diff_type() throws ExecException {
+ //
+ // Test sorted union error handling when tables have same sort key of different types (Negative test)
+ //
+ IOException exception = null;
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table1", "a"); // input table and sort keys
+ inputTables.put("table6", "a"); // input table and sort keys
+
+ try {
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "a,b,c,d,e,f,m1");
+
+ } catch (IOException e) {
+ exception = e;
+ } finally {
+ //System.out.println(getStackTrace(exception));
+ Assert.assertNotNull(exception);
+ Assert.assertTrue(getStackTrace(exception).contains("Different types of column"));
+ }
+ }
+
+ @Test
+ public void union_error_complex_type() throws ExecException {
+ //
+ // Test sorted union error handling when tables have complex map sort keys (Negative test)
+ //
+ IOException exception = null;
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table1", "m1"); // input table and sort keys
+ inputTables.put("table2", "m1"); // input table and sort keys
+
+ try {
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "a,b,c,d,e,f,m1");
+
+ } catch (IOException e) {
+ exception = e;
+ } finally {
+ //System.out.println(getStackTrace(exception));
+ Assert.assertNotNull(exception);
+ Assert.assertTrue(getStackTrace(exception).contains("Using Map as key not supported"));
+ }
+ }
+
+ @Test
+ public void union_error_invalid_path1() throws ExecException {
+ //
+ // Test sorted union error handling when one of the table paths is invalid (Negative test)
+ //
+ IOException exception = null;
+
+ try {
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby1);
+
+ Path newPath = new Path(getCurrentMethodName());
+
+ // Store sorted tables
+ String pathSort1 = newPath.toString() + "1";
+ pigJob = pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('" + TABLE1_STORAGE + "')");
+ Assert.assertNull(pigJob.getException());
+
+ String pathSort2 = newPath.toString() + "2"; // invalid path
+
+ String queryLoad = "records1 = LOAD '"
+ + pathSort1 + ","
+ + pathSort2
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('a,b,c', 'sorted');";
+ pigServer.registerQuery(queryLoad);
+
+ } catch (IOException e) {
+ exception = e;
+ } finally {
+ //System.out.println(getStackTrace(exception));
+ Assert.assertNotNull(exception);
+ Assert.assertTrue(getStackTrace(exception).contains("Schema file doesn't exist"));
+ }
+ }
+
+ @Test
+ public void union_error_invalid_path2() throws ExecException {
+ //
+ // Test sorted union error handling when one of the table paths is invalid (Negative test)
+ //
+ IOException exception = null;
+
+ try {
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby1);
+
+ Path newPath = new Path(getCurrentMethodName());
+
+ // Store sorted tables
+ String pathSort1 = newPath.toString() + "1";
+ pigJob = pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('" + TABLE1_STORAGE + "')");
+ Assert.assertNull(pigJob.getException());
+
+ String pathSort2 = newPath.toString() + "2"; // invalid path
+
+ String queryLoad = "records1 = LOAD '"
+ + pathSort1 + ","
+ + pathSort2
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('a,b,c,source_table', 'sorted');";
+ pigServer.registerQuery(queryLoad);
+
+ printTable("records1");
+
+ } catch (IOException e) {
+ exception = e;
+ } finally {
+ System.out.println(getStackTrace(exception));
+ //Assert.assertNotNull(exception);
+ //Assert.assertTrue(getStackTrace(exception).contains("Schema file doesn't exist"));
+ }
+ }
+
+ @Test
+ public void union_error_invalid_column() throws ExecException {
+ //
+ // Test sorted union error handling when some of the output columns are not in any
+ // of the tables (Negative test)
+ //
+ IOException exception = null;
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table1", "e"); // input table and sort keys
+ inputTables.put("table2", "e"); // input table and sort keys
+
+ try {
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "e,source_table,a,xx,yy,zz");
+
+ printTable("records1");
+
+ } catch (IOException e) {
+ exception = e;
+ } finally {
+ System.out.println(getStackTrace(exception));
+ //Assert.assertNotNull(exception);
+ //Assert.assertTrue(getStackTrace(exception).contains("Using Map as key not supported"));
+ }
+ }
+
+ @Test
+ public void union_error_invalid_key() throws ExecException {
+ //
+ // Test sorted union error handling where column sort keys are missing from
+ // projection (Negative test)
+ //
+ IOException exception = null;
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table1", "e"); // input table and sort keys
+ inputTables.put("table2", "e"); // input table and sort keys
+
+ try {
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "a");
+
+ printTable("records1");
+
+ } catch (IOException e) {
+ exception = e;
+ } finally {
+ System.out.println(getStackTrace(exception));
+ //Assert.assertNotNull(exception);
+ //Assert.assertTrue(getStackTrace(exception).contains("Using Map as key not supported"));
+ }
+ }
+
+ @Test
+ public void union_error_unsorted_left() throws ExecException {
+ //
+ // Test sorted union error handling where left table is not sorted (Negative test)
+ //
+ IOException exception = null;
+
+ try {
+ // Sort tables
+ String orderby2 = "sort2 = ORDER table2 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby2);
+
+ Path newPath = new Path(getCurrentMethodName());
+
+ // Store sorted tables
+ String pathSort1 = newPath.toString() + "1";
+ pigJob = pigServer.store("table1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('" + TABLE1_STORAGE + "')");
+ Assert.assertNull(pigJob.getException());
+
+ String pathSort2 = newPath.toString() + "2";
+ pigJob = pigServer.store("sort2", pathSort2, TableStorer.class.getCanonicalName() +
+ "('" + TABLE2_STORAGE + "')");
+ Assert.assertNull(pigJob.getException());
+
+ String queryLoad = "records1 = LOAD '"
+ + pathSort1 + ","
+ + pathSort2
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('a,source_table,b,c,d,e,f,m1', 'sorted');";
+ pigServer.registerQuery(queryLoad);
+
+ } catch (IOException e) {
+ exception = e;
+ } finally {
+ //System.out.println(getStackTrace(exception));
+ Assert.assertNotNull(exception);
+ Assert.assertTrue(getStackTrace(exception).contains("The table is not (properly) sorted"));
+ }
+ }
+
+ @Test
+ public void union_error_unsorted_right() throws ExecException {
+ //
+ // Test sorted union error handling where right table is not sorted (Negative test)
+ //
+ IOException exception = null;
+
+ try {
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby1);
+
+ Path newPath = new Path(getCurrentMethodName());
+
+ // Store sorted tables
+ String pathSort1 = newPath.toString() + "1";
+ pigJob = pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('" + TABLE1_STORAGE + "')");
+ Assert.assertNull(pigJob.getException());
+
+ String pathSort2 = newPath.toString() + "2";
+ pigJob = pigServer.store("table2", pathSort2, TableStorer.class.getCanonicalName() +
+ "('" + TABLE2_STORAGE + "')");
+ Assert.assertNull(pigJob.getException());
+
+ String queryLoad = "records1 = LOAD '"
+ + pathSort1 + ","
+ + pathSort2
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('a,source_table,b,c,d,e,f,m1', 'sorted');";
+ pigServer.registerQuery(queryLoad);
+
+ } catch (IOException e) {
+ exception = e;
+ } finally {
+ //System.out.println(getStackTrace(exception));
+ Assert.assertNotNull(exception);
+ Assert.assertTrue(getStackTrace(exception).contains("The table is not (properly) sorted"));
+ }
+ }
+
+ @Test
+ public void union_error_unsorted_middle() throws ExecException {
+ //
+ // Test sorted union error handling where middle table is not sorted (Negative test)
+ //
+ IOException exception = null;
+
+ try {
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "e" + " ;";
+ pigServer.registerQuery(orderby1);
+
+ String orderby3 = "sort3 = ORDER table3 BY " + "e" + " ;";
+ pigServer.registerQuery(orderby3);
+
+ Path newPath = new Path(getCurrentMethodName());
+
+ // Store sorted tables
+ String pathSort1 = newPath.toString() + "1";
+ pigJob = pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('" + TABLE1_STORAGE + "')");
+ Assert.assertNull(pigJob.getException());
+
+ String pathSort2 = newPath.toString() + "2";
+ pigJob = pigServer.store("table2", pathSort2, TableStorer.class.getCanonicalName() +
+ "('" + TABLE2_STORAGE + "')");
+ Assert.assertNull(pigJob.getException());
+
+ String pathSort3 = newPath.toString() + "3";
+ pigJob = pigServer.store("sort3", pathSort3, TableStorer.class.getCanonicalName() +
+ "('" + TABLE3_STORAGE + "')");
+ Assert.assertNull(pigJob.getException());
+
+ String queryLoad = "records1 = LOAD '"
+ + pathSort1 + ","
+ + pathSort2 + ","
+ + pathSort3
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('e,source_table,a,b,c,d,f,m1,str1', 'sorted');";
+ pigServer.registerQuery(queryLoad);
+
+ } catch (IOException e) {
+ exception = e;
+ } finally {
+ //System.out.println(getStackTrace(exception));
+ Assert.assertNotNull(exception);
+ Assert.assertTrue(getStackTrace(exception).contains("The table is not (properly) sorted"));
+ }
+ }
+
+ @Test
+ public void union_error_sort_key() throws ExecException {
+ //
+ // Test sorted union error handling with mixed sort keys (Negative test)
+ //
+ IOException exception = null;
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table1", "a"); // input table and sort keys
+ inputTables.put("table2", "b"); // input table and sort keys
+ inputTables.put("table3", "e"); // input table and sort keys
+
+ try {
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "source_table,a,b,e");
+
+ } catch (IOException e) {
+ exception = e;
+ } finally {
+ //System.out.println(getStackTrace(exception));
+ Assert.assertNotNull(exception);
+ Assert.assertTrue(getStackTrace(exception).contains("does not exist in schema"));
+ }
+ }
+
+ @Test
+ public void union_error_sort_key_partial() throws ExecException {
+ //
+ // Test sorted union error handling with mixed multiple sort keys (Negative test)
+ //
+ IOException exception = null;
+
+ // Create input tables for order preserve union
+ Map<String, String> inputTables = new HashMap<String, String>(); // Input two or more tables
+ inputTables.put("table1", "a,e"); // input table and sort keys
+ inputTables.put("table2", "a,b"); // input table and sort keys
+
+ try {
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "a,e,b");
+
+ } catch (IOException e) {
+ exception = e;
+ } finally {
+ //System.out.println(getStackTrace(exception));
+ Assert.assertNotNull(exception);
+ Assert.assertTrue(getStackTrace(exception).contains("The table is not (properly) sorted"));
+ }
+ }
+
+ @Test
+ public void test_pig_foreach_error() throws ExecException, IOException {
+ //
+ // Test sorted union after Pig foreach that does not change the sort order (Negative test)
+ // The union will fail as no sort info is added when store is done after Pig foreach statement
+ //
+ IOException exception = null;
+
+ // Sort tables
+ String orderby1 = "sort1 = ORDER table1 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby1);
+
+ String orderby2 = "sort2 = ORDER table2 BY " + "a" + " ;";
+ pigServer.registerQuery(orderby2);
+
+ // Table after pig foreach
+ String foreach2 = "foreachsort2 = FOREACH sort2 GENERATE a as a, b as b, c as c, d as d, e as e, f as f, m1 as m1;";
+ pigServer.registerQuery(foreach2);
+
+ Path newPath = new Path(getCurrentMethodName());
+
+ // Store sorted tables
+ String pathSort1 = newPath.toString() + "1";
+ pigJob = pigServer.store("sort1", pathSort1, TableStorer.class.getCanonicalName() +
+ "('" + TABLE1_STORAGE + "')");
+ Assert.assertNull(pigJob.getException());
+
+ String pathSort2 = newPath.toString() + "2";
+ pigJob = pigServer.store("foreachsort2", pathSort2, TableStorer.class.getCanonicalName() +
+ "('" + TABLE2_STORAGE + "')");
+ Assert.assertNull(pigJob.getException());
+
+ try {
+ String queryLoad = "records1 = LOAD '"
+ + pathSort1 + ","
+ + pathSort2
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('a,source_table,b,c,d,e,f,m1', 'sorted');";
+
+ pigServer.registerQuery(queryLoad);
+ } catch (IOException e) {
+ exception = e;
+ } finally {
+ //System.out.println(getStackTrace(exception));
+ Assert.assertNotNull(exception);
+ Assert.assertTrue(getStackTrace(exception).contains("The table is not (properly) sorted"));
+ }
+ }
+
+ /**
+ * Get stack trace as string that includes nested exceptions
+ *
+ */
+ private static String getStackTrace(Throwable throwable) {
+ Writer writer = new StringWriter();
+ PrintWriter printWriter = new PrintWriter(writer);
+ if (throwable != null)
+ throwable.printStackTrace(printWriter);
+ return writer.toString();
+ }
+
+ /**
+ * Print Pig Table (for debugging)
+ *
+ */
+ private int printTable(String tablename) throws IOException {
+ Iterator<Tuple> it1 = pigServer.openIterator(tablename);
+ int numbRows = 0;
+ while (it1.hasNext()) {
+ Tuple RowValue1 = it1.next();
+ ++numbRows;
+ System.out.println();
+ for (int i = 0; i < RowValue1.size(); ++i)
+ System.out.println("DEBUG: " + tablename + " RowValue.get(" + i + ") = " + RowValue1.get(i));
+ }
+ System.out.println("\nRow count : " + numbRows);
+ return numbRows;
+ }
+
+ /**
+ * Return the name of the routine that called getCurrentMethodName
+ *
+ */
+ private String getCurrentMethodName() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter pw = new PrintWriter(baos);
+ (new Throwable()).printStackTrace(pw);
+ pw.flush();
+ String stackTrace = baos.toString();
+ pw.close();
+
+ StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+ tok.nextToken(); // 'java.lang.Throwable'
+ tok.nextToken(); // 'at ...getCurrentMethodName'
+ String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+ // Parse line 3
+ tok = new StringTokenizer(l.trim(), " <(");
+ String t = tok.nextToken(); // 'at'
+ t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+ return t;
+ }
+
+}