You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/10/02 01:07:05 UTC

svn commit: r1628863 - in /pig/branches/branch-0.14: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ src/org/apache/pig/impl/ src/org/apache/pig...

Author: rohini
Date: Wed Oct  1 23:07:05 2014
New Revision: 1628863

URL: http://svn.apache.org/r1628863
Log:
PIG-4175: PIG CROSS operation follow by STORE produces non-deterministic results each run - additional patch

Modified:
    pig/branches/branch-0.14/src/org/apache/pig/PigConfiguration.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
    pig/branches/branch-0.14/src/org/apache/pig/impl/PigImplConstants.java
    pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java
    pig/branches/branch-0.14/test/org/apache/pig/test/TestEvalPipeline2.java
    pig/branches/branch-0.14/test/org/apache/pig/test/TestGFCross.java

Modified: pig/branches/branch-0.14/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/PigConfiguration.java?rev=1628863&r1=1628862&r2=1628863&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/PigConfiguration.java Wed Oct  1 23:07:05 2014
@@ -280,8 +280,6 @@ public class PigConfiguration {
      */
     public static final String PIG_NO_TASK_REPORT = "pig.stats.notaskreport";
 
-    public static final String PIG_CROSS_PARALLELISM_HINT = "pig.cross.parallelism.hint";
-
     public static final String REDUCER_ESTIMATOR_KEY = "pig.exec.reducer.estimator";
     public static final String REDUCER_ESTIMATOR_ARG_KEY =  "pig.exec.reducer.estimator.arg";
 }

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1628863&r1=1628862&r2=1628863&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Oct  1 23:07:05 2014
@@ -656,7 +656,7 @@ public class JobControlCompiler{
                     Object func = PigContext.instantiateFuncFromSpec(new FuncSpec(udf));
                     if (func instanceof GFCross) {
                         String crossKey = ((GFCross)func).getCrossKey();
-                        conf.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
+                        conf.set(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey,
                                 Integer.toString(mro.getRequestedParallelism()));
                     }
                 }

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1628863&r1=1628862&r2=1628863&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Wed Oct  1 23:07:05 2014
@@ -34,6 +34,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
@@ -162,7 +163,7 @@ public class ParallelismSetter extends T
             tezOp.setVertexParallelism(parallelism);
 
             if (tezOp.getCrossKey()!=null) {
-                pc.getProperties().put(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + tezOp.getCrossKey(),
+                pc.getProperties().put(PigImplConstants.PIG_CROSS_PARALLELISM + "." + tezOp.getCrossKey(),
                         Integer.toString(tezOp.getVertexParallelism()));
             }
         } catch (Exception e) {

Modified: pig/branches/branch-0.14/src/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/impl/PigImplConstants.java?rev=1628863&r1=1628862&r2=1628863&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/impl/PigImplConstants.java Wed Oct  1 23:07:05 2014
@@ -58,4 +58,9 @@ public class PigImplConstants {
     public static final String REDUCER_DEFAULT_PARALLELISM = "pig.info.reducers.default.parallel";
     public static final String REDUCER_REQUESTED_PARALLELISM = "pig.info.reducers.requested.parallel";
     public static final String REDUCER_ESTIMATED_PARALLELISM = "pig.info.reducers.estimated.parallel";
+
+    /**
+     * Parallelism to be used for CROSS operation by GFCross UDF
+     */
+    public static final String PIG_CROSS_PARALLELISM = "pig.cross.parallelism";
 }

Modified: pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java?rev=1628863&r1=1628862&r2=1628863&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java Wed Oct  1 23:07:05 2014
@@ -22,13 +22,12 @@ import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.EvalFunc;
-import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.util.UDFContext;
 
 
@@ -56,7 +55,7 @@ public class GFCross extends EvalFunc<Da
             parallelism = DEFAULT_PARALLELISM;
             Configuration cfg = UDFContext.getUDFContext().getJobConf();
             if (cfg != null) {
-                String s = cfg.get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey);
+                String s = cfg.get(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey);
                 if (s == null) {
                     throw new IOException("Unable to get parallelism hint from job conf");
                 }
@@ -65,7 +64,7 @@ public class GFCross extends EvalFunc<Da
 
             numInputs = (Integer)input.get(0);
             myNumber = (Integer)input.get(1);
-        
+
             numGroupsPerInput = (int) Math.ceil(Math.pow(parallelism, 1.0/numInputs));
             numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1);
         }
@@ -73,21 +72,21 @@ public class GFCross extends EvalFunc<Da
         DataBag output = mBagFactory.newDefaultBag();
 
         try{
-               
+
             int[] digits = new int[numInputs];
             digits[myNumber] = r.nextInt(numGroupsPerInput);
 
             for (int i=0; i<numGroupsGoingTo; i++){
                 output.add(toTuple(digits));
                 next(digits);
-            }            
-    
+            }
+
             return output;
         }catch(ExecException e){
             throw e;
         }
     }
-    
+
     private Tuple toTuple(int[] digits) throws IOException, ExecException{
         Tuple t = mTupleFactory.newTuple(numInputs);
         for (int i=0; i<numInputs; i++){
@@ -95,7 +94,7 @@ public class GFCross extends EvalFunc<Da
         }
         return t;
     }
-    
+
     private void next(int[] digits){
         for (int i=0; i<numInputs; i++){
             if (i== myNumber)

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1628863&r1=1628862&r2=1628863&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/TestEvalPipeline2.java Wed Oct  1 23:07:05 2014
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -55,8 +56,6 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.test.utils.Identity;
-import org.apache.pig.tools.pigstats.JobStats;
-import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -1613,6 +1612,7 @@ public class TestEvalPipeline2 {
         Assert.assertFalse(iter.hasNext());
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testCrossAfterGroupAll() throws Exception{
         String[] input = {
@@ -1626,7 +1626,7 @@ public class TestEvalPipeline2 {
 
         try {
             pigServer.getPigContext().getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "40");
-            pigServer.registerQuery("A = load 'table_testCrossAfterGroupAll' as (a0, a1);");
+            pigServer.registerQuery("A = load 'table_testCrossAfterGroupAll' as (a0:int, a1:chararray);");
             pigServer.registerQuery("B = group A all;");
             pigServer.registerQuery("C = foreach B generate COUNT(A);");
             pigServer.registerQuery("D = cross A, C;");
@@ -1644,13 +1644,18 @@ public class TestEvalPipeline2 {
             });
             // auto-parallelism is 2 in MR, 20 in Tez, so check >=2
             Assert.assertTrue(partFiles.length >= 2);
-            // Check the count of output
+            // Check the output
             Iterator<Tuple> iter = job.getResults();
-            iter.next();
-            iter.next();
-            iter.next();
-            iter.next();
-            Assert.assertFalse(iter.hasNext());
+            List<Tuple> results = new ArrayList<Tuple>();
+            while (iter.hasNext()) {
+                results.add(iter.next());
+            }
+            Collections.sort(results);
+            Assert.assertEquals(4, results.size());
+            Assert.assertEquals("(1,A,4)", results.get(0).toString());
+            Assert.assertEquals("(2,B,4)", results.get(1).toString());
+            Assert.assertEquals("(3,C,4)", results.get(2).toString());
+            Assert.assertEquals("(4,D,4)", results.get(3).toString());
         } finally {
             pigServer.getPigContext().getProperties().remove("pig.exec.reducers.bytes.per.reducer");
         }

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/TestGFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/TestGFCross.java?rev=1628863&r1=1628862&r2=1628863&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/TestGFCross.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/TestGFCross.java Wed Oct  1 23:07:05 2014
@@ -20,16 +20,16 @@ package org.apache.pig.test;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.PigConfiguration;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.builtin.GFCross;
 import org.apache.pig.impl.util.UDFContext;
 import org.junit.Test;
 
 public class TestGFCross {
-    
+
     // Test GFCross returns the correct number of default
     // join groups
     @Test
@@ -49,7 +49,7 @@ public class TestGFCross {
     @Test
     public void testSerial() throws Exception {
         Configuration cfg = new Configuration();
-        cfg.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + ".1", "1");
+        cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "1");
         UDFContext.getUDFContext().addJobConf(cfg);
         Tuple t = TupleFactory.getInstance().newTuple(2);
 
@@ -65,7 +65,7 @@ public class TestGFCross {
     @Test
     public void testParallelSet() throws Exception {
         Configuration cfg = new Configuration();
-        cfg.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + ".1", "10");
+        cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "10");
         UDFContext.getUDFContext().addJobConf(cfg);
         Tuple t = TupleFactory.getInstance().newTuple(2);