You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/10/01 08:44:43 UTC
svn commit: r1628616 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/newplan/logical/relational/ test/org/apac...
Author: daijy
Date: Wed Oct 1 06:44:42 2014
New Revision: 1628616
URL: http://svn.apache.org/r1628616
Log:
PIG-4175: PIG CROSS operation follow by STORE produces non-deterministic results each run
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1628616&r1=1628615&r2=1628616&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Oct 1 06:44:42 2014
@@ -92,6 +92,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4175: PIG CROSS operation follow by STORE produces non-deterministic results each run (daijy)
+
PIG-4202: Reset UDFContext state before OutputCommitter invocations in Tez (rohini)
PIG-4205: e2e test property-check does not check all prerequisites (kellyzly via daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1628616&r1=1628615&r2=1628616&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Oct 1 06:44:42 2014
@@ -640,16 +640,11 @@ public class JobControlCompiler{
for (String udf : mro.UDFs) {
if (udf.contains("GFCross")) {
- Object func = pigContext.instantiateFuncFromSpec(new FuncSpec(udf));
+ Object func = PigContext.instantiateFuncFromSpec(new FuncSpec(udf));
if (func instanceof GFCross) {
String crossKey = ((GFCross)func).getCrossKey();
- // If non GFCross has been processed yet
- if (pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey)==null) {
- pigContext.getProperties().setProperty(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
- Integer.toString(nwJob.getNumReduceTasks()));
- }
conf.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
- (String)pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey));
+ Integer.toString(mro.getRequestedParallelism()));
}
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1628616&r1=1628615&r2=1628616&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Oct 1 06:44:42 2014
@@ -1076,7 +1076,14 @@ public class MRCompiler extends PhyPlanV
@Override
public void visitPOForEach(POForEach op) throws VisitorException{
try{
- nonBlocking(op);
+ if (op.isMapSideOnly() && curMROp.isMapDone()) {
+ FileSpec fSpec = getTempFileSpec();
+ MapReduceOper prevMROper = endSingleInputPlanWithStr(fSpec);
+ curMROp = startNew(fSpec, prevMROper);
+ curMROp.mapPlan.addAsLeaf(op);
+ } else {
+ nonBlocking(op);
+ }
List<PhysicalPlan> plans = op.getInputPlans();
if(plans!=null)
for (PhysicalPlan plan : plans) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1628616&r1=1628615&r2=1628616&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Wed Oct 1 06:44:42 2014
@@ -92,6 +92,10 @@ public class POForEach extends PhysicalO
protected Tuple inpTuple;
+ // Indicate the foreach statement can only in map side
+ // Currently only used in MR cross (See PIG-4175)
+ protected boolean mapSideOnly = false;
+
private Schema schema;
public POForEach(OperatorKey k) {
@@ -782,4 +786,12 @@ public class POForEach extends PhysicalO
return (Tuple) out;
}
}
+
+ public void setMapSideOnly(boolean mapSideOnly) {
+ this.mapSideOnly = mapSideOnly;
+ }
+
+ public boolean isMapSideOnly() {
+ return mapSideOnly;
+ }
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1628616&r1=1628615&r2=1628616&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Wed Oct 1 06:44:42 2014
@@ -634,6 +634,7 @@ public class LogToPhyTranslationVisitor
List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst );
+ fe.setMapSideOnly(true);
fe.addOriginalLocation(cross.getAlias(), cross.getLocation());
currentPlan.add(fe);
currentPlan.connect(logToPhyMap.get(op), fe);
Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1628616&r1=1628615&r2=1628616&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Wed Oct 1 06:44:42 2014
@@ -31,10 +31,15 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
@@ -44,11 +49,14 @@ import org.apache.pig.data.DefaultBagFac
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.test.utils.Identity;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
@@ -1604,4 +1612,47 @@ public class TestEvalPipeline2 {
Assert.assertFalse(iter.hasNext());
}
+
+ @Test
+ public void testCrossAfterGroupAll() throws Exception{
+ String[] input = {
+ "1\tA",
+ "2\tB",
+ "3\tC",
+ "4\tD",
+ };
+
+ Util.createInputFile(cluster, "table_testCrossAfterGroupAll", input);
+
+ try {
+ pigServer.getPigContext().getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "40");
+ pigServer.registerQuery("A = load 'table_testCrossAfterGroupAll' as (a0, a1);");
+ pigServer.registerQuery("B = group A all;");
+ pigServer.registerQuery("C = foreach B generate COUNT(A);");
+ pigServer.registerQuery("D = cross A, C;");
+ Path output = FileLocalizer.getTemporaryPath(pigServer.getPigContext());
+ ExecJob job = pigServer.store("D", output.toString());
+ FileSystem fs = output.getFileSystem(cluster.getConfiguration());
+ FileStatus[] partFiles = fs.listStatus(output, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ if (path.getName().startsWith("part")) {
+ return true;
+ }
+ return false;
+ }
+ });
+ // auto-parallelism is 2 in MR, 20 in Tez, so check >=2
+ Assert.assertTrue(partFiles.length >= 2);
+ // Check the count of output
+ Iterator<Tuple> iter = job.getResults();
+ iter.next();
+ iter.next();
+ iter.next();
+ iter.next();
+ Assert.assertFalse(iter.hasNext());
+ } finally {
+ pigServer.getPigContext().getProperties().remove("pig.exec.reducers.bytes.per.reducer");
+ }
+ }
}