You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2016/05/31 19:47:20 UTC

svn commit: r1746332 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java test/org/apache/pig/tez/TestTezGraceParallelism.java

Author: daijy
Date: Tue May 31 19:47:20 2016
New Revision: 1746332

URL: http://svn.apache.org/viewvc?rev=1746332&view=rev
Log:
PIG-4786: CROSS will not work correctly with Grace Parallelism

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1746332&r1=1746331&r2=1746332&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue May 31 19:47:20 2016
@@ -141,6 +141,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4786: CROSS will not work correctly with Grace Parallelism (daijy)
+
 PIG-3227: SearchEngineExtractor does not work for bing (dannyant via daijy)
 
 PIG-4902: Fix UT failures on 0.16 branch: TestTezGraceParallelism, TestPigScriptParser (daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1746332&r1=1746331&r2=1746332&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue May 31 19:47:20 2016
@@ -807,7 +807,8 @@ public class TezDagBuilder extends TezOp
                     // Use auto-parallelism feature of ShuffleVertexManager to dynamically
                     // reduce the parallelism of the vertex
                     if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
-                            && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()) {
+                            && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()
+                            && tezOp.getCrossKeys() == null) {
                         vmPluginName = PigGraceShuffleVertexManager.class.getName();
                         tezOp.setUseGraceParallelism(true);
                         vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());

Modified: pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java?rev=1746332&r1=1746331&r2=1746332&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java Tue May 31 19:47:20 2016
@@ -322,4 +322,33 @@ public class TestTezGraceParallelism {
             super.setStoreLocation(location, job);
         }
     }
+
+    @Test
+    // See PIG-4786
+    public void testCross() throws IOException{
+        // scope-90 is the cross vertex. It should not use PigGraceShuffleVertexManager
+        NodeIdGenerator.reset();
+        PigServer.resetScope();
+        StringWriter writer = new StringWriter();
+        Util.createLogAppender("testCross", writer, PigGraceShuffleVertexManager.class);
+        File outputDir = File.createTempFile("intemediate", "txt");
+        outputDir.delete();
+        pigServer.getPigContext().getProperties().setProperty("mapreduce.input.fileinputformat.split.maxsize", "3000");
+        pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true");
+        pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+        pigServer.registerQuery("B = order A by name;");
+        pigServer.registerQuery("C = distinct B;");
+        pigServer.registerQuery("D = load '" + INPUT_DIR + "/" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+        pigServer.registerQuery("E = group D by name;");
+        pigServer.registerQuery("F = foreach E generate group as name, AVG(D.age) as avg_age;");
+        pigServer.registerQuery("G = cross C, F;");
+        Iterator<Tuple> iter = pigServer.openIterator("G");
+        int count = 0;
+        while (iter.hasNext()) {
+            iter.next();
+            count++;
+        }
+        assertEquals(count, 400);
+        assertFalse(writer.toString().contains("scope-90"));
+    }
 }