You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/10/01 08:44:43 UTC

svn commit: r1628616 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/newplan/logical/relational/ test/org/apac...

Author: daijy
Date: Wed Oct  1 06:44:42 2014
New Revision: 1628616

URL: http://svn.apache.org/r1628616
Log:
PIG-4175: PIG CROSS operation follow by STORE produces non-deterministic results each run

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1628616&r1=1628615&r2=1628616&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Oct  1 06:44:42 2014
@@ -92,6 +92,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4175: PIG CROSS operation follow by STORE produces non-deterministic results each run (daijy)
+
 PIG-4202: Reset UDFContext state before OutputCommitter invocations in Tez (rohini)
 
 PIG-4205: e2e test property-check does not check all prerequisites (kellyzly via daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1628616&r1=1628615&r2=1628616&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Oct  1 06:44:42 2014
@@ -640,16 +640,11 @@ public class JobControlCompiler{
 
             for (String udf : mro.UDFs) {
                 if (udf.contains("GFCross")) {
-                    Object func = pigContext.instantiateFuncFromSpec(new FuncSpec(udf));
+                    Object func = PigContext.instantiateFuncFromSpec(new FuncSpec(udf));
                     if (func instanceof GFCross) {
                         String crossKey = ((GFCross)func).getCrossKey();
-                        // If non GFCross has been processed yet
-                        if (pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey)==null) {
-                            pigContext.getProperties().setProperty(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
-                                    Integer.toString(nwJob.getNumReduceTasks()));
-                        }
                         conf.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
-                                (String)pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey));
+                                Integer.toString(mro.getRequestedParallelism()));
                     }
                 }
             }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1628616&r1=1628615&r2=1628616&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Oct  1 06:44:42 2014
@@ -1076,7 +1076,14 @@ public class MRCompiler extends PhyPlanV
     @Override
     public void visitPOForEach(POForEach op) throws VisitorException{
         try{
-            nonBlocking(op);
+            if (op.isMapSideOnly() && curMROp.isMapDone()) {
+                FileSpec fSpec = getTempFileSpec();
+                MapReduceOper prevMROper = endSingleInputPlanWithStr(fSpec);
+                curMROp = startNew(fSpec, prevMROper);
+                curMROp.mapPlan.addAsLeaf(op);
+            } else {
+                nonBlocking(op);
+            }
             List<PhysicalPlan> plans = op.getInputPlans();
             if(plans!=null)
                 for (PhysicalPlan plan : plans) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1628616&r1=1628615&r2=1628616&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Wed Oct  1 06:44:42 2014
@@ -92,6 +92,10 @@ public class POForEach extends PhysicalO
 
     protected Tuple inpTuple;
 
+    // Indicate the foreach statement can only in map side
+    // Currently only used in MR cross (See PIG-4175)
+    protected boolean mapSideOnly = false;
+
     private Schema schema;
 
     public POForEach(OperatorKey k) {
@@ -782,4 +786,12 @@ public class POForEach extends PhysicalO
             return (Tuple) out;
         }
     }
+
+    public void setMapSideOnly(boolean mapSideOnly) {
+        this.mapSideOnly = mapSideOnly;
+    }
+
+    public boolean isMapSideOnly() {
+        return mapSideOnly;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1628616&r1=1628615&r2=1628616&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Wed Oct  1 06:44:42 2014
@@ -634,6 +634,7 @@ public class LogToPhyTranslationVisitor 
                     List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
 
                     POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst );
+                    fe.setMapSideOnly(true);
                     fe.addOriginalLocation(cross.getAlias(), cross.getLocation());
                     currentPlan.add(fe);
                     currentPlan.connect(logToPhyMap.get(op), fe);

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1628616&r1=1628615&r2=1628616&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Wed Oct  1 06:44:42 2014
@@ -31,10 +31,15 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -44,11 +49,14 @@ import org.apache.pig.data.DefaultBagFac
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.test.utils.Identity;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -1604,4 +1612,47 @@ public class TestEvalPipeline2 {
 
         Assert.assertFalse(iter.hasNext());
     }
+
+    @Test
+    public void testCrossAfterGroupAll() throws Exception{
+        String[] input = {
+                "1\tA",
+                "2\tB",
+                "3\tC",
+                "4\tD",
+        };
+
+        Util.createInputFile(cluster, "table_testCrossAfterGroupAll", input);
+
+        try {
+            pigServer.getPigContext().getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "40");
+            pigServer.registerQuery("A = load 'table_testCrossAfterGroupAll' as (a0, a1);");
+            pigServer.registerQuery("B = group A all;");
+            pigServer.registerQuery("C = foreach B generate COUNT(A);");
+            pigServer.registerQuery("D = cross A, C;");
+            Path output = FileLocalizer.getTemporaryPath(pigServer.getPigContext());
+            ExecJob job = pigServer.store("D", output.toString());
+            FileSystem fs = output.getFileSystem(cluster.getConfiguration());
+            FileStatus[] partFiles = fs.listStatus(output, new PathFilter() {
+                @Override
+                public boolean accept(Path path) {
+                    if (path.getName().startsWith("part")) {
+                        return true;
+                    }
+                    return false;
+                }
+            });
+            // auto-parallelism is 2 in MR, 20 in Tez, so check >=2
+            Assert.assertTrue(partFiles.length >= 2);
+            // Check the count of output
+            Iterator<Tuple> iter = job.getResults();
+            iter.next();
+            iter.next();
+            iter.next();
+            iter.next();
+            Assert.assertFalse(iter.hasNext());
+        } finally {
+            pigServer.getPigContext().getProperties().remove("pig.exec.reducers.bytes.per.reducer");
+        }
+    }
 }