You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/10/06 16:30:26 UTC
svn commit: r1707064 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/
test/org/apache/pig/test/
Author: rohini
Date: Tue Oct 6 14:30:26 2015
New Revision: 1707064
URL: http://svn.apache.org/viewvc?rev=1707064&view=rev
Log:
PIG-3957: Refactor out resetting input key in TezDagBuilder (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
pig/trunk/test/org/apache/pig/test/TestFRJoin.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1707064&r1=1707063&r2=1707064&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Oct 6 14:30:26 2015
@@ -49,6 +49,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-3957: Refactor out resetting input key in TezDagBuilder (rohini)
+
PIG-4688: Limit followed by POPartialAgg can give empty or partial results in Tez (rohini)
PIG-4635: NPE while running pig script in tez mode (daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1707064&r1=1707063&r2=1707064&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Oct 6 14:30:26 2015
@@ -80,7 +80,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
@@ -94,12 +93,9 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPOPackageAnnotator.LoRearrangeDiscoverer;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POIdentityInOutTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POShuffleTezLoad;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PartitionerDefinedVertexManager;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigOutputFormatTez;
@@ -585,33 +581,6 @@ public class TezDagBuilder extends TezOp
//POShuffleTezLoad accesses the comparator setting
selectKeyComparator(keyType, payloadConf, tezOp, isMergedInput);
- } else if (roots.size() == 1 && roots.get(0) instanceof POIdentityInOutTez) {
- POIdentityInOutTez identityInOut = (POIdentityInOutTez) roots.get(0);
- // TODO Need to fix multiple input key mapping
- TezOperator identityInOutPred = null;
- for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
- if (!pred.isSampleAggregation()) {
- identityInOutPred = pred;
- break;
- }
- }
- identityInOut.setInputKey(identityInOutPred.getOperatorKey().toString());
- } else if (roots.size() == 1 && roots.get(0) instanceof POValueInputTez) {
- POValueInputTez valueInput = (POValueInputTez) roots.get(0);
-
- LinkedList<String> scalarInputs = new LinkedList<String>();
- for (POUserFunc userFunc : PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class) ) {
- if (userFunc.getFunc() instanceof ReadScalarsTez) {
- scalarInputs.add(((ReadScalarsTez)userFunc.getFunc()).getTezInputs()[0]);
- }
- }
- // Make sure we don't find the scalar
- for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
- if (!scalarInputs.contains(pred.getOperatorKey().toString())) {
- valueInput.setInputKey(pred.getOperatorKey().toString());
- break;
- }
- }
}
setOutputFormat(job);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1707064&r1=1707063&r2=1707064&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Tue Oct 6 14:30:26 2015
@@ -294,6 +294,7 @@ public class TezCompiler extends PhyPlan
storeSeen.get(store).addOutputKey(tezOp.getOperatorKey().toString());
} else {
POValueOutputTez output = new POValueOutputTez(OperatorKey.genOpKey(scope));
+ output.setScalarOutput(true);
output.addOutputKey(tezOp.getOperatorKey().toString());
from.plan.remove(from.plan.getOperator(store.getOperatorKey()));
from.plan.addAsLeaf(output);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java?rev=1707064&r1=1707063&r2=1707064&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java Tue Oct 6 14:30:26 2015
@@ -120,12 +120,10 @@ public class POValueInputTez extends Phy
}
hasNext = shuffleReader.next();
}
- } else {
- if (reader.next()) {
- Tuple origTuple = (Tuple)reader.getCurrentValue();
- Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
- return new Result(POStatus.STATUS_OK, copy);
- }
+ } else if (reader.next()) {
+ Tuple origTuple = (Tuple) reader.getCurrentValue();
+ Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+ return new Result(POStatus.STATUS_OK, copy);
}
finished = true;
// For certain operators (such as STREAM), we could still have some work
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java?rev=1707064&r1=1707063&r2=1707064&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java Tue Oct 6 14:30:26 2015
@@ -53,6 +53,8 @@ public class POValueOutputTez extends Ph
private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+ private boolean scalarOutput;
+ private transient Object scalarValue;
private boolean taskIndexWithRecordIndexAsKey;
// TODO Change this to outputKey and write only once
// when a shared edge support is available in Tez
@@ -71,6 +73,14 @@ public class POValueOutputTez extends Ph
super(k);
}
+ public boolean isScalarOutput() {
+ return scalarOutput;
+ }
+
+ public void setScalarOutput(boolean scalarOutput) {
+ this.scalarOutput = scalarOutput;
+ }
+
public boolean isTaskIndexWithRecordIndexAsKey() {
return taskIndexWithRecordIndexAsKey;
}
@@ -149,14 +159,25 @@ public class POValueOutputTez extends Ph
if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
}
+ if (scalarOutput) {
+ if (scalarValue == null) {
+ scalarValue = inp.result;
+ } else {
+ String msg = "Scalar has more than one row in the output. "
+ + "1st : " + scalarValue + ", 2nd :"
+ + inp.result
+ + " (common cause: \"JOIN\" then \"FOREACH ... GENERATE foo.bar\" should be \"foo::bar\" )";
+ throw new ExecException(msg);
+ }
+ }
+ if (taskIndexWithRecordIndexAsKey) {
+ Tuple tuple = tupleFactory.newTuple(2);
+ tuple.set(0, taskIndex);
+ tuple.set(1, count++);
+ key = tuple;
+ }
for (KeyValueWriter writer : writers) {
try {
- if (taskIndexWithRecordIndexAsKey) {
- Tuple tuple = tupleFactory.newTuple(2);
- tuple.set(0, taskIndex);
- tuple.set(1, count++);
- key = tuple;
- }
writer.write(key, inp.result);
} catch (IOException e) {
throw new ExecException(e);
Modified: pig/trunk/test/org/apache/pig/test/TestFRJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFRJoin.java?rev=1707064&r1=1707063&r2=1707064&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFRJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFRJoin.java Tue Oct 6 14:30:26 2015
@@ -437,7 +437,7 @@ public class TestFRJoin {
Map<String, Tuple> hashJoin = new HashMap<String, Tuple>();
{
pigServer.registerQuery("C = join A by $0 left, B by $0 using 'replicated';");
- pigServer.registerQuery("D = join A by $1 left, B by $1 using 'replicated';");
+ pigServer.registerQuery("D = join A by $1 left, B by $1 using 'repl';");
pigServer.registerQuery("E = union C,D;");
Iterator<Tuple> iter = pigServer.openIterator("E");
@@ -470,14 +470,14 @@ public class TestFRJoin {
public void testFRJoinOut9() throws IOException {
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (x:int,y:int);");
+ pigServer.registerQuery("C = UNION A, B;");
+ pigServer.registerQuery("D = FILTER C BY x == 1;");
DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance()
.newDefaultBag();
Map<String, Tuple> hashFRJoin = new HashMap<String, Tuple>();
Map<String, Tuple> hashJoin = new HashMap<String, Tuple>();
{
- pigServer.registerQuery("C = join A by $0 left, B by $0 using 'repl';");
- pigServer.registerQuery("D = join A by $1 left, B by $1 using 'repl';");
- pigServer.registerQuery("E = union C,D;");
+ pigServer.registerQuery("E = join C by $0 left, D by $0 using 'repl';");
Iterator<Tuple> iter = pigServer.openIterator("E");
while (iter.hasNext()) {
@@ -489,9 +489,7 @@ public class TestFRJoin {
}
}
{
- pigServer.registerQuery("C = join A by $0 left, B by $0;");
- pigServer.registerQuery("D = join A by $1 left, B by $1;");
- pigServer.registerQuery("E = union C,D;");
+ pigServer.registerQuery("E = join C by $0 left, D by $0;");
Iterator<Tuple> iter = pigServer.openIterator("E");
while (iter.hasNext()) {
Tuple tuple = iter.next();