You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/03/26 18:51:57 UTC
svn commit: r1581967 - in
/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez:
POLocalRearrangeTez.java POVertexGroupInputTez.java TezCompiler.java
TezDagBuilder.java
Author: cheolsoo
Date: Wed Mar 26 17:51:56 2014
New Revision: 1581967
URL: http://svn.apache.org/r1581967
Log:
PIG-3743: Use VertexGroup and Alias vertex for union (commit PIG-3743-fix-skew-2.patch)
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java?rev=1581967&r1=1581966&r2=1581967&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java Wed Mar 26 17:51:56 2014
@@ -68,7 +68,6 @@ public class POLocalRearrangeTez extends
this.isSkewedJoin = copyTez.isSkewedJoin;
this.outputKey = copyTez.outputKey;
}
-
}
public String getOutputKey() {
@@ -141,9 +140,10 @@ public class POLocalRearrangeTez extends
PigNullableWritable key = null;
NullableTuple val = null;
if (isUnion) {
- // Use the entire tuple as both key and value
+ // Use the whole tuple as key and set value to null
key = HDataType.getWritableComparableTypes(result.get(1), keyType);
- val = new NullableTuple((Tuple)result.get(1));
+ val = new NullableTuple();
+ val.setNull(true);
} else {
key = HDataType.getWritableComparableTypes(result.get(1), keyType);
val = new NullableTuple((Tuple)result.get(2));
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java?rev=1581967&r1=1581966&r2=1581967&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java Wed Mar 26 17:51:56 2014
@@ -65,7 +65,7 @@ public class POVertexGroupInputTez exten
try {
LogicalInput input = inputs.get(inputKey);
if (input == null) {
- throw new ExecException("Input GroupVertex " + inputKey + " is missing");
+ throw new ExecException("Input VertexGroup " + inputKey + " is missing");
}
reader = (KeyValuesReader) input.getReader();
hasNext = reader.next();
@@ -79,8 +79,12 @@ public class POVertexGroupInputTez exten
try {
while (hasNext) {
if (reader.getCurrentValues().iterator().hasNext()) {
- NullableTuple val = (NullableTuple) reader.getCurrentValues().iterator().next();
- return new Result(POStatus.STATUS_OK, val.getValueAsPigType());
+ NullableTuple key = (NullableTuple) reader.getCurrentKey();
+ // The value is always null since the whole record is
+ // shuffled as the key. Call next() on iterator to just move
+ // it forward
+ reader.getCurrentValues().iterator().next();
+ return new Result(POStatus.STATUS_OK, key.getValueAsPigType());
}
hasNext = reader.next();
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1581967&r1=1581966&r2=1581967&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Wed Mar 26 17:51:56 2014
@@ -1978,7 +1978,8 @@ public class TezCompiler extends PhyPlan
TezCompilerUtil.connect(tezPlan, prevTezOp, newTezOp);
// TODO: Use POValueOutputTez instead of POLocalRearrange and
// unsorted shuffle with TEZ-661 and PIG-3775.
- outputs[i] = localRearrangeFactory.create(LocalRearrangeType.NULL);
+ outputs[i] = localRearrangeFactory.create();
+ outputs[i].setUnion(true);
prevTezOp.plan.addAsLeaf(outputs[i]);
prevTezOp.setClosed(true);
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1581967&r1=1581966&r2=1581967&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Mar 26 17:51:56 2014
@@ -200,8 +200,8 @@ public class TezDagBuilder extends TezOp
private GroupInputEdge newGroupInputEdge(VertexGroup from, Vertex to)
throws IOException {
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
- setIntermediateInputKeyValue(DataType.BYTEARRAY, conf, null);
- setIntermediateOutputKeyValue(DataType.BYTEARRAY, conf, null);
+ setIntermediateInputKeyValue(DataType.TUPLE, conf, null);
+ setIntermediateOutputKeyValue(DataType.TUPLE, conf, null);
MRToTezHelper.convertMRToTezRuntimeConf(conf, globalConf);
return new GroupInputEdge(from, to, new EdgeProperty(