You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by kn...@apache.org on 2017/04/03 16:43:18 UTC

svn commit: r1790022 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java test/org/apache/pig/test/TestFlatten.java

Author: knoguchi
Date: Mon Apr  3 16:43:18 2017
New Revision: 1790022

URL: http://svn.apache.org/viewvc?rev=1790022&view=rev
Log:
PIG-5209: Cross product on flatten(map) fails with ClassCastException (knoguchi)

Added:
    pig/trunk/test/org/apache/pig/test/TestFlatten.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1790022&r1=1790021&r2=1790022&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr  3 16:43:18 2017
@@ -91,6 +91,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5209: Cross product on flatten(map) fails with ClassCastException (knoguchi)
+
 PIG-5153: Change of behavior in FLATTEN(map) (szita via rohini)
 
 PIG-4677: Display failure information on stop on failure (rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1790022&r1=1790021&r2=1790022&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Mon Apr  3 16:43:18 2017
@@ -511,7 +511,11 @@ public class POForEach extends PhysicalO
                             // when the first index which needs to be flattened has reached the
                             // last element in its iterator, we won't come here - instead, we reset
                             // all iterators at the beginning of this method.
-                            its[index] = ((DataBag)bags[index]).iterator();
+                            if(bags[index] instanceof DataBag) {
+                                its[index] = ((DataBag)bags[index]).iterator();
+                            } else { // if (bags[i] instanceof Map)) {
+                                its[index] = ((Map)bags[index]).entrySet().iterator();
+                            }
                             data[index] = its[index].next();
                         }
                     }

Added: pig/trunk/test/org/apache/pig/test/TestFlatten.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFlatten.java?rev=1790022&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFlatten.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestFlatten.java Mon Apr  3 16:43:18 2017
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFlatten {
+
+    private PigServer pig ;
+
+    @Before
+    public void setUp() throws Exception{
+        pig = new PigServer(Util.getLocalTestMode()) ;
+    }
+
+    @Test
+    public void testTwoBagFlatten() throws Exception {
+        Storage.Data data = Storage.resetData(pig);
+        data.set("input",
+            Storage.tuple(
+                Storage.bag(
+                    Storage.tuple("a","b"),
+                    Storage.tuple("c","d")),
+                Storage.bag(
+                    Storage.tuple("1","2"),
+                    Storage.tuple("3","4"))
+            )
+        );
+        pig.setBatchOn();
+        pig.registerQuery("A = load 'input' using mock.Storage() as (bag1:bag {(a1_1:chararray, a1_2:chararray)}, bag2:bag{(a2_1:chararray, a2_2:chararray)});");
+        pig.registerQuery("B = foreach A GENERATE FLATTEN(bag1), FLATTEN(bag2);");
+        pig.registerQuery("store B into 'output' using mock.Storage();");
+        List<ExecJob> execJobs = pig.executeBatch();
+        for( ExecJob execJob : execJobs ) {
+          assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED );
+        }
+        Schema expectedSch = Utils.getSchemaFromString("bag1::a1_1: chararray,bag1::a1_2: chararray,bag2::a2_1: chararray,bag2::a2_2: chararray");
+        assertEquals(expectedSch, data.getSchema("output"));
+        List<Tuple> actualResults = data.get("output");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {
+                "('a', 'b', '1', '2')", "('a', 'b', '3', '4')", "('c', 'd', '1', '2')", "('c', 'd', '3', '4')" });
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+    }
+
+    @Test
+    public void testTwoMapFlatten() throws Exception {
+        Storage.Data data = Storage.resetData(pig);
+        data.set("input",
+            Storage.tuple(
+                Storage.map("a","b",
+                            "c","d"),
+                Storage.map("1","2",
+                            "3","4")
+            )
+        );
+        pig.setBatchOn();
+        pig.registerQuery("A = load 'input' using mock.Storage() as (map1:map [chararray], map2:map [chararray]);");
+        pig.registerQuery("B = foreach A GENERATE FLATTEN(map1), FLATTEN(map2);");
+        pig.registerQuery("store B into 'output' using mock.Storage();");
+        List<ExecJob> execJobs = pig.executeBatch();
+        for( ExecJob execJob : execJobs ) {
+          assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED );
+        }
+        Schema expectedSch = Utils.getSchemaFromString("map1::key: chararray,map1::value: chararray,map2::key: chararray,map2::value: chararray");
+        assertEquals(expectedSch, data.getSchema("output"));
+        List<Tuple> actualResults = data.get("output");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {
+                "('a', 'b', '1', '2')", "('a', 'b', '3', '4')", "('c', 'd', '1', '2')", "('c', 'd', '3', '4')" });
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+    }
+}