You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/10/02 01:07:05 UTC
svn commit: r1628863 - in /pig/branches/branch-0.14: src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/
src/org/apache/pig/impl/ src/org/apache/pig...
Author: rohini
Date: Wed Oct 1 23:07:05 2014
New Revision: 1628863
URL: http://svn.apache.org/r1628863
Log:
PIG-4175: PIG CROSS operation follow by STORE produces non-deterministic results each run - additional patch
Modified:
pig/branches/branch-0.14/src/org/apache/pig/PigConfiguration.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
pig/branches/branch-0.14/src/org/apache/pig/impl/PigImplConstants.java
pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java
pig/branches/branch-0.14/test/org/apache/pig/test/TestEvalPipeline2.java
pig/branches/branch-0.14/test/org/apache/pig/test/TestGFCross.java
Modified: pig/branches/branch-0.14/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/PigConfiguration.java?rev=1628863&r1=1628862&r2=1628863&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/PigConfiguration.java Wed Oct 1 23:07:05 2014
@@ -280,8 +280,6 @@ public class PigConfiguration {
*/
public static final String PIG_NO_TASK_REPORT = "pig.stats.notaskreport";
- public static final String PIG_CROSS_PARALLELISM_HINT = "pig.cross.parallelism.hint";
-
public static final String REDUCER_ESTIMATOR_KEY = "pig.exec.reducer.estimator";
public static final String REDUCER_ESTIMATOR_ARG_KEY = "pig.exec.reducer.estimator.arg";
}
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1628863&r1=1628862&r2=1628863&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Oct 1 23:07:05 2014
@@ -656,7 +656,7 @@ public class JobControlCompiler{
Object func = PigContext.instantiateFuncFromSpec(new FuncSpec(udf));
if (func instanceof GFCross) {
String crossKey = ((GFCross)func).getCrossKey();
- conf.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
+ conf.set(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey,
Integer.toString(mro.getRequestedParallelism()));
}
}
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1628863&r1=1628862&r2=1628863&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Wed Oct 1 23:07:05 2014
@@ -34,6 +34,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
import org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
@@ -162,7 +163,7 @@ public class ParallelismSetter extends T
tezOp.setVertexParallelism(parallelism);
if (tezOp.getCrossKey()!=null) {
- pc.getProperties().put(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + tezOp.getCrossKey(),
+ pc.getProperties().put(PigImplConstants.PIG_CROSS_PARALLELISM + "." + tezOp.getCrossKey(),
Integer.toString(tezOp.getVertexParallelism()));
}
} catch (Exception e) {
Modified: pig/branches/branch-0.14/src/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/impl/PigImplConstants.java?rev=1628863&r1=1628862&r2=1628863&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/impl/PigImplConstants.java Wed Oct 1 23:07:05 2014
@@ -58,4 +58,9 @@ public class PigImplConstants {
public static final String REDUCER_DEFAULT_PARALLELISM = "pig.info.reducers.default.parallel";
public static final String REDUCER_REQUESTED_PARALLELISM = "pig.info.reducers.requested.parallel";
public static final String REDUCER_ESTIMATED_PARALLELISM = "pig.info.reducers.estimated.parallel";
+
+ /**
+ * Parallelism to be used for CROSS operation by GFCross UDF
+ */
+ public static final String PIG_CROSS_PARALLELISM = "pig.cross.parallelism";
}
Modified: pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java?rev=1628863&r1=1628862&r2=1628863&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java Wed Oct 1 23:07:05 2014
@@ -22,13 +22,12 @@ import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.EvalFunc;
-import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.util.UDFContext;
@@ -56,7 +55,7 @@ public class GFCross extends EvalFunc<Da
parallelism = DEFAULT_PARALLELISM;
Configuration cfg = UDFContext.getUDFContext().getJobConf();
if (cfg != null) {
- String s = cfg.get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey);
+ String s = cfg.get(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey);
if (s == null) {
throw new IOException("Unable to get parallelism hint from job conf");
}
@@ -65,7 +64,7 @@ public class GFCross extends EvalFunc<Da
numInputs = (Integer)input.get(0);
myNumber = (Integer)input.get(1);
-
+
numGroupsPerInput = (int) Math.ceil(Math.pow(parallelism, 1.0/numInputs));
numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1);
}
@@ -73,21 +72,21 @@ public class GFCross extends EvalFunc<Da
DataBag output = mBagFactory.newDefaultBag();
try{
-
+
int[] digits = new int[numInputs];
digits[myNumber] = r.nextInt(numGroupsPerInput);
for (int i=0; i<numGroupsGoingTo; i++){
output.add(toTuple(digits));
next(digits);
- }
-
+ }
+
return output;
}catch(ExecException e){
throw e;
}
}
-
+
private Tuple toTuple(int[] digits) throws IOException, ExecException{
Tuple t = mTupleFactory.newTuple(numInputs);
for (int i=0; i<numInputs; i++){
@@ -95,7 +94,7 @@ public class GFCross extends EvalFunc<Da
}
return t;
}
-
+
private void next(int[] digits){
for (int i=0; i<numInputs; i++){
if (i== myNumber)
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1628863&r1=1628862&r2=1628863&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/TestEvalPipeline2.java Wed Oct 1 23:07:05 2014
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -55,8 +56,6 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.test.utils.Identity;
-import org.apache.pig.tools.pigstats.JobStats;
-import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
@@ -1613,6 +1612,7 @@ public class TestEvalPipeline2 {
Assert.assertFalse(iter.hasNext());
}
+ @SuppressWarnings("unchecked")
@Test
public void testCrossAfterGroupAll() throws Exception{
String[] input = {
@@ -1626,7 +1626,7 @@ public class TestEvalPipeline2 {
try {
pigServer.getPigContext().getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "40");
- pigServer.registerQuery("A = load 'table_testCrossAfterGroupAll' as (a0, a1);");
+ pigServer.registerQuery("A = load 'table_testCrossAfterGroupAll' as (a0:int, a1:chararray);");
pigServer.registerQuery("B = group A all;");
pigServer.registerQuery("C = foreach B generate COUNT(A);");
pigServer.registerQuery("D = cross A, C;");
@@ -1644,13 +1644,18 @@ public class TestEvalPipeline2 {
});
// auto-parallelism is 2 in MR, 20 in Tez, so check >=2
Assert.assertTrue(partFiles.length >= 2);
- // Check the count of output
+ // Check the output
Iterator<Tuple> iter = job.getResults();
- iter.next();
- iter.next();
- iter.next();
- iter.next();
- Assert.assertFalse(iter.hasNext());
+ List<Tuple> results = new ArrayList<Tuple>();
+ while (iter.hasNext()) {
+ results.add(iter.next());
+ }
+ Collections.sort(results);
+ Assert.assertEquals(4, results.size());
+ Assert.assertEquals("(1,A,4)", results.get(0).toString());
+ Assert.assertEquals("(2,B,4)", results.get(1).toString());
+ Assert.assertEquals("(3,C,4)", results.get(2).toString());
+ Assert.assertEquals("(4,D,4)", results.get(3).toString());
} finally {
pigServer.getPigContext().getProperties().remove("pig.exec.reducers.bytes.per.reducer");
}
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/TestGFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/TestGFCross.java?rev=1628863&r1=1628862&r2=1628863&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/TestGFCross.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/TestGFCross.java Wed Oct 1 23:07:05 2014
@@ -20,16 +20,16 @@ package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.PigConfiguration;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.builtin.GFCross;
import org.apache.pig.impl.util.UDFContext;
import org.junit.Test;
public class TestGFCross {
-
+
// Test GFCross returns the correct number of default
// join groups
@Test
@@ -49,7 +49,7 @@ public class TestGFCross {
@Test
public void testSerial() throws Exception {
Configuration cfg = new Configuration();
- cfg.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + ".1", "1");
+ cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "1");
UDFContext.getUDFContext().addJobConf(cfg);
Tuple t = TupleFactory.getInstance().newTuple(2);
@@ -65,7 +65,7 @@ public class TestGFCross {
@Test
public void testParallelSet() throws Exception {
Configuration cfg = new Configuration();
- cfg.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + ".1", "10");
+ cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "10");
UDFContext.getUDFContext().addJobConf(cfg);
Tuple t = TupleFactory.getInstance().newTuple(2);