You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/03/26 18:51:57 UTC

svn commit: r1581967 - in /pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez: POLocalRearrangeTez.java POVertexGroupInputTez.java TezCompiler.java TezDagBuilder.java

Author: cheolsoo
Date: Wed Mar 26 17:51:56 2014
New Revision: 1581967

URL: http://svn.apache.org/r1581967
Log:
PIG-3743: Use VertexGroup and Alias vertex for union (commit PIG-3743-fix-skew-2.patch)

Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java?rev=1581967&r1=1581966&r2=1581967&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java Wed Mar 26 17:51:56 2014
@@ -68,7 +68,6 @@ public class POLocalRearrangeTez extends
             this.isSkewedJoin = copyTez.isSkewedJoin;
             this.outputKey = copyTez.outputKey;
         }
-
     }
 
     public String getOutputKey() {
@@ -141,9 +140,10 @@ public class POLocalRearrangeTez extends
                     PigNullableWritable key = null;
                     NullableTuple val = null;
                     if (isUnion) {
-                        // Use the entire tuple as both key and value
+                        // Use the whole tuple as key and set value to null
                         key = HDataType.getWritableComparableTypes(result.get(1), keyType);
-                        val = new NullableTuple((Tuple)result.get(1));
+                        val = new NullableTuple();
+                        val.setNull(true);
                     } else {
                         key = HDataType.getWritableComparableTypes(result.get(1), keyType);
                         val = new NullableTuple((Tuple)result.get(2));

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java?rev=1581967&r1=1581966&r2=1581967&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POVertexGroupInputTez.java Wed Mar 26 17:51:56 2014
@@ -65,7 +65,7 @@ public class POVertexGroupInputTez exten
         try {
             LogicalInput input = inputs.get(inputKey);
             if (input == null) {
-                throw new ExecException("Input GroupVertex " + inputKey + " is missing");
+                throw new ExecException("Input VertexGroup " + inputKey + " is missing");
             }
             reader = (KeyValuesReader) input.getReader();
             hasNext = reader.next();
@@ -79,8 +79,12 @@ public class POVertexGroupInputTez exten
         try {
             while (hasNext) {
                 if (reader.getCurrentValues().iterator().hasNext()) {
-                    NullableTuple val = (NullableTuple) reader.getCurrentValues().iterator().next();
-                    return new Result(POStatus.STATUS_OK, val.getValueAsPigType());
+                    NullableTuple key = (NullableTuple) reader.getCurrentKey();
+                    // The value is always null since the whole record is
+                    // shuffled as the key. Call next() on iterator to just move
+                    // it forward
+                    reader.getCurrentValues().iterator().next();
+                    return new Result(POStatus.STATUS_OK, key.getValueAsPigType());
                 }
                 hasNext = reader.next();
             }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1581967&r1=1581966&r2=1581967&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Wed Mar 26 17:51:56 2014
@@ -1978,7 +1978,8 @@ public class TezCompiler extends PhyPlan
                 TezCompilerUtil.connect(tezPlan, prevTezOp, newTezOp);
                 // TODO: Use POValueOutputTez instead of POLocalRearrange and
                 // unsorted shuffle with TEZ-661 and PIG-3775.
-                outputs[i] = localRearrangeFactory.create(LocalRearrangeType.NULL);
+                outputs[i] = localRearrangeFactory.create();
+                outputs[i].setUnion(true);
                 prevTezOp.plan.addAsLeaf(outputs[i]);
                 prevTezOp.setClosed(true);
             }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1581967&r1=1581966&r2=1581967&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Mar 26 17:51:56 2014
@@ -200,8 +200,8 @@ public class TezDagBuilder extends TezOp
     private GroupInputEdge newGroupInputEdge(VertexGroup from, Vertex to)
             throws IOException {
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
-        setIntermediateInputKeyValue(DataType.BYTEARRAY, conf, null);
-        setIntermediateOutputKeyValue(DataType.BYTEARRAY, conf, null);
+        setIntermediateInputKeyValue(DataType.TUPLE, conf, null);
+        setIntermediateOutputKeyValue(DataType.TUPLE, conf, null);
         MRToTezHelper.convertMRToTezRuntimeConf(conf, globalConf);
 
         return new GroupInputEdge(from, to, new EdgeProperty(