You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/08/19 02:16:33 UTC

svn commit: r986990 - /hadoop/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java

Author: thejas
Date: Thu Aug 19 00:16:33 2010
New Revision: 986990

URL: http://svn.apache.org/viewvc?rev=986990&view=rev
Log:
adding file missing from checkin for PIG-1461

Added:
    hadoop/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java?rev=986990&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java Thu Aug 19 00:16:33 2010
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+
+
+public class TestUnionOnSchema  {
+    static MiniCluster cluster ;
+    private static final String EMPTY_DIR = "emptydir";
+    private static final String INP_FILE_2NUMS = "input1";
+    private static final String INP_FILE_2NUM_1CHAR_1BAG = "input2";
+    
+    @Before
+    public void setUp() throws Exception {
+        FileLocalizer.setInitialized(false);
+    }
+
+
+    @After
+    public void tearDown() throws Exception {
+    }
+    
+    @BeforeClass
+    public static void oneTimeSetup() throws IOException, Exception {
+        cluster = MiniCluster.buildCluster();
+        FileSystem fs = cluster.getFileSystem();
+        if (!fs.mkdirs(new Path(EMPTY_DIR))) {
+            throw new Exception("failed to create empty dir");
+        }
+        // first input file
+        PrintWriter w = new PrintWriter(new FileWriter(INP_FILE_2NUMS));
+        w.println("1\t2");
+        w.println("5\t3");
+        w.close();
+        Util.copyFromLocalToCluster(cluster, INP_FILE_2NUMS, INP_FILE_2NUMS);
+        
+        // 2nd input file
+        w = new PrintWriter(new FileWriter(INP_FILE_2NUM_1CHAR_1BAG));
+        w.println("1\tabc\t2\t{(1,a),(1,b)}\t(1,c)");
+        w.println("5\tdef\t3\t{(2,a),(2,b)}\t(2,c)");
+        w.close();
+        Util.copyFromLocalToCluster(cluster, INP_FILE_2NUM_1CHAR_1BAG, INP_FILE_2NUM_1CHAR_1BAG);
+    }
+    
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+
+        new File(INP_FILE_2NUMS).delete();
+        new File(INP_FILE_2NUM_1CHAR_1BAG).delete();
+        cluster.shutDown();
+    }
+ 
+
+    /**
+     * Test UNION ONSCHEMA on two inputs with same schema
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaSameSchema() throws IOException, ParseException {
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        String query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+            + "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+            + "u = union onschema l1, l2;"
+        ; 
+        Util.registerMultiLineQuery(pig, query);
+        Iterator<Tuple> it = pig.openIterator("u");
+        
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(1,2)",
+                            "(5,3)",
+                            "(1,2)",
+                            "(5,3)"
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
+    
+    
+
+    /**
+     * Test UNION ONSCHEMA on two inputs with same column names, but different
+     * numeric types - test type promotion
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaDiffNumType() throws IOException, ParseException {
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        String query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : double);"
+            + "l2 = load '" + INP_FILE_2NUMS + "' as (i : long, j : float);"
+            + "u = union onschema l1, l2;"
+        ; 
+        Util.registerMultiLineQuery(pig, query);
+        Iterator<Tuple> it = pig.openIterator("u");
+        
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(1L,2.0)",
+                            "(5L,3.0)",
+                            "(1L,2.0)",
+                            "(5L,3.0)"
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
+    
+    
+
+    /**
+     * Test UNION ONSCHEMA on two inputs with no common columns
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaNoCommonCols() throws IOException, ParseException {
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        String query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+            + "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);"
+            + "u = union onschema l1, l2;"
+        ; 
+        Util.registerMultiLineQuery(pig, query);
+        Iterator<Tuple> it = pig.openIterator("u");
+        
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(1,2,null,null)",
+                            "(5,3,null,null)",
+                            "(null,null,1L,2.0F)",
+                            "(null,null,5L,3.0F)"
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
+    
+    /**
+     * Test UNION ONSCHEMA on two inputs , one input with additional columns
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaAdditionalColumn() throws IOException, ParseException {
+        PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        //PigServer pig = new PigServer(ExecType.LOCAL);
+        String query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+            + "l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " 
+            + "  (i : long, c : chararray, j : int " 
+            +       ", b : bag { t : tuple (c1 : int, c2 : chararray)}" 
+            +       ", t : tuple (tc1 : int, tc2 : chararray) );"
+            + "u = union onschema l1, l2;"
+        ; 
+        Util.registerMultiLineQuery(pig, query);
+        pig.explain("u", System.out);
+
+        Iterator<Tuple> it = pig.openIterator("u");
+        
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(1L,2,null,null,null)",
+                            "(5L,3,null,null,null)",
+                            "(1L,2,'abc',{(1,'a'),(1,'b')},(1,'c'))",
+                            "(5L,3,'def',{(2,'a'),(2,'b')},(2,'c'))",
+
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+    }
+    
+    
+    /**
+     * Test UNION ONSCHEMA on 3 inputs 
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchema3Inputs() throws IOException, ParseException {
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        String query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); "
+            + "l2 = load '" + INP_FILE_2NUMS + "' as (i : double, x : int); "            
+            + "l3 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " 
+            + "  (i : long, c : chararray, j : int " 
+            +       ", b : bag { t : tuple (c1 : int, c2 : chararray)} ); "
+            + "u = union onschema l1, l2, l3;"
+        ; 
+        Util.registerMultiLineQuery(pig, query);
+        pig.explain("u", System.out);
+
+        Iterator<Tuple> it = pig.openIterator("u");
+        
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(1.0,2,null,null,null)",
+                            "(5.0,3,null,null,null)",
+                            "(1.0,null,2,null,null)",
+                            "(5.0,null,3,null,null)",
+                            "(1.0,2,null,'abc',{(1,'a'),(1,'b')})",
+                            "(5.0,3,null,'def',{(2,'a'),(2,'b')})",
+
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+    }
+
+    /**
+     * Test UNION ONSCHEMA with bytearray type 
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaByteArrayConversions() throws IOException, ParseException {
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        String query =
+            " l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " 
+            + "  (i : bytearray, x : bytearray, j : bytearray " 
+            +       ", b : bytearray); "
+            + "l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " 
+            + "  (i : long, c : chararray, j : int " 
+            +       ", b : bag { t : tuple (c1 : int, c2 : chararray)} ); "
+            + "u = union onSchema l1, l2;"
+        ; 
+        Util.registerMultiLineQuery(pig, query);
+        pig.explain("u", System.out);
+
+        Iterator<Tuple> it = pig.openIterator("u");
+        
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(1L,null,2,{(1,'a'),(1,'b')},'abc')",
+                            "(1L,'abc',2,{(1,'a'),(1,'b')},null)",
+                            "(5L,null,3,{(2,'a'),(2,'b')},'def')",
+                            "(5L,'def',3,{(2,'a'),(2,'b')},null)",                            
+                    });
+        //update expectedRes to use bytearray instead of chararray in 2nd field
+        for(Tuple t : expectedRes){
+            if(t.get(1) != null){
+                t.set(1, new DataByteArray(t.get(1).toString()));
+            }
+        }
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+    }
+    
+    /**
+     * negative test - test error on no schema
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaNoSchema() throws IOException, ParseException {
+        String expectedErr = ".*UNION ONSCHEMA cannot be used with " +
+        "relations that have null schema.*";
+        String query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' ;"
+            + "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);"
+            + "u = union onschema l1, l2;"
+        ; 
+        checkSchemaEx(query, expectedErr);
+        
+        query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' ;"
+            + "l2 = load '" + INP_FILE_2NUMS + "' ;"
+            + "u = union onschema l1, l2;"
+        ; 
+       checkSchemaEx(query, expectedErr);
+
+    }
+
+
+    private void checkSchemaEx(String query, String expectedErr) throws IOException {
+        PigServer pig = new PigServer(ExecType.LOCAL);
+
+        boolean foundEx = false;
+        try{
+            Util.registerMultiLineQuery(pig, query);
+        }catch(FrontendException e){
+            foundEx = true;
+            if(!e.getMessage().matches(expectedErr)){
+                String msg = "Expected exception message matching '" 
+                    + expectedErr + "' but got '" + e.getMessage() + "'" ;
+                fail(msg);
+            }
+        }
+        
+        if(!foundEx)
+            fail("No exception thrown. Exception is expected.");
+        
+       
+    }
+    
+
+    /**
+     * negative test - test error on incompatible types in schema
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaIncompatibleTypes() throws IOException, ParseException {
+        String expectedErr = ".*Incompatible types for merging schemas.*";
+        String query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' as (x : long, y : chararray);"
+            + "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);"
+            + "u = union onschema l1, l2;"
+        ; 
+        checkSchemaEx(query, expectedErr);
+ 
+        query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' as (x : long, y : chararray);"
+            + "l2 = load '" + INP_FILE_2NUMS + "' as (x : map[ ], y : chararray);"
+            + "u = union onschema l1, l2;"
+        ; 
+        checkSchemaEx(query, expectedErr);
+
+        // Test error on different inner schemas
+        expectedErr = ".*Incompatible types for merging inner schemas.*";
+        // bag column with different internal column names
+        query =
+            "  l1 = load '" + INP_FILE_2NUMS 
+            + "' as (x : long, b : bag { t : tuple (c1 : int, c2 : chararray)}  );"
+            
+            + "l2 = load '" + INP_FILE_2NUMS 
+            + "' as (x : long, b : bag { t : tuple (c2 : int, c3 : chararray)} );"
+            + "u = union onschema l1, l2;"
+        ; 
+        checkSchemaEx(query, expectedErr);
+        
+        // bag column with different internal column types
+        query =
+            "  l1 = load '" + INP_FILE_2NUMS 
+            + "' as (x : long, b : bag { t : tuple (c1 : int, c2 : chararray)}  );"
+            
+            + "l2 = load '" + INP_FILE_2NUMS 
+            + "' as (x : long, b : bag { t : tuple (c1 : long, c2 : chararray)} );"
+            + "u = union onschema l1, l2;"
+        ; 
+        checkSchemaEx(query, expectedErr);
+        
+        // tuple column with different internal column types
+        query =
+            "  l1 = load '" + INP_FILE_2NUMS 
+            + "' as (t : tuple (c1 : int, c2 : chararray)  );"
+            
+            + "l2 = load '" + INP_FILE_2NUMS 
+            + "' as (t : tuple (c1 : long, c2 : chararray) );"
+            + "u = union onschema l1, l2;"
+        ; 
+        checkSchemaEx(query, expectedErr);
+    }
+    
+    
+
+    
+    
+    
+    
+}