You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/05/16 18:54:18 UTC

svn commit: r1744111 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/ test/org/a...

Author: rohini
Date: Mon May 16 18:54:18 2016
New Revision: 1744111

URL: http://svn.apache.org/viewvc?rev=1744111&view=rev
Log:
PIG-4884: Tez needs to use DistinctCombiner.Combine (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/JVMReuseImpl.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1744111&r1=1744110&r2=1744111&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon May 16 18:54:18 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4884: Tez needs to use DistinctCombiner.Combine (rohini)
+
 PIG-4874: Remove schema tuple reference overhead for replicate join hashmap (rohini)
 
 PIG-4879: Pull latest version of joda-time (rohini)

Modified: pig/trunk/src/org/apache/pig/JVMReuseImpl.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/JVMReuseImpl.java?rev=1744111&r1=1744110&r2=1744111&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/JVMReuseImpl.java (original)
+++ pig/trunk/src/org/apache/pig/JVMReuseImpl.java Mon May 16 18:54:18 2016
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -47,6 +48,7 @@ public class JVMReuseImpl {
         PigGenericMapReduce.staticDataCleanup();
         PigStatusReporter.staticDataCleanup();
         PigCombiner.Combine.staticDataCleanup();
+        DistinctCombiner.Combine.staticDataCleanup();
 
         String className = null;
         String msg = null;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java?rev=1744111&r1=1744110&r2=1744111&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java Mon May 16 18:54:18 2016
@@ -22,43 +22,48 @@ import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Reducer;
-
-import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
 
 /**
  * A special implementation of combiner used only for distinct.  This combiner
  * does not even parse out the records.  It just throws away duplicate values
- * in the key in order ot minimize the data being sent to the reduce.
+ * in the key in order to minimize the data being sent to the reduce.
  */
 public class DistinctCombiner {
 
-    public static class Combine 
+    public static class Combine
         extends Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
-        
+
         private final Log log = LogFactory.getLog(getClass());
 
-        ProgressableReporter pigReporter;
-        
-        /**
-         * Configures the reporter 
-         */
+        private static boolean firstTime = true;
+
+        //@StaticDataCleanup
+        public static void staticDataCleanup() {
+            firstTime = true;
+        }
+
         @Override
         protected void setup(Context context) throws IOException, InterruptedException {
             super.setup(context);
-            pigReporter = new ProgressableReporter();
+            Configuration jConf = context.getConfiguration();
+            // Avoid log spamming
+            if (firstTime) {
+                log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location"));
+                firstTime = false;
+            }
         }
-        
+
         /**
          * The reduce function which removes values.
          */
         @Override
-        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) 
+        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
                 throws IOException, InterruptedException {
-            
-            pigReporter.setRep(context);
 
             // Take the first value and the key and collect
             // just that.
@@ -66,6 +71,7 @@ public class DistinctCombiner {
             NullableTuple val = iter.next();
             context.write(key, val);
         }
+
     }
-    
+
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1744111&r1=1744110&r2=1744111&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon May 16 18:54:18 2016
@@ -56,6 +56,7 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
@@ -440,7 +441,14 @@ public class TezDagBuilder extends TezOp
 
         Configuration conf = new Configuration(pigContextConf);
 
-        if (!combinePlan.isEmpty()) {
+        if (edge.needsDistinctCombiner()) {
+            conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS,
+                    MRCombiner.class.getName());
+            conf.set(MRJobConfig.COMBINE_CLASS_ATTR,
+                    DistinctCombiner.Combine.class.getName());
+            log.info("Setting distinct combiner class between "
+                    + from.getOperatorKey() + " and " + to.getOperatorKey());
+        } else if (!combinePlan.isEmpty()) {
             udfContextSeparator.serializeUDFContextForEdge(conf, from, to, UDFType.USERFUNC);
             addCombiner(combinePlan, to, conf, isMergedInput);
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1744111&r1=1744110&r2=1744111&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Mon May 16 18:54:18 2016
@@ -655,15 +655,8 @@ public class TezCompiler extends PhyPlan
             blocking();
             TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
 
-            // Add the DISTINCT plan as the combine plan. In MR Pig, the combiner is implemented
-            // with a global variable and a specific DistinctCombiner class. This seems better.
-            PhysicalPlan combinePlan = curTezOp.inEdges.get(lastOp.getOperatorKey()).combinePlan;
-            addDistinctPlan(combinePlan, 1);
-
-            POLocalRearrangeTez clr = localRearrangeFactory.create();
-            clr.setOutputKey(curTezOp.getOperatorKey().toString());
-            clr.setDistinct(true);
-            combinePlan.addAsLeaf(clr);
+            TezEdgeDescriptor edge = curTezOp.inEdges.get(lastOp.getOperatorKey());
+            edge.setNeedsDistinctCombiner(true);
 
             curTezOp.markDistinct();
             addDistinctPlan(curTezOp.plan, op.getRequestedParallelism());

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1744111&r1=1744110&r2=1744111&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java Mon May 16 18:54:18 2016
@@ -33,6 +33,7 @@ import org.apache.tez.runtime.library.ou
 public class TezEdgeDescriptor implements Serializable {
     // Combiner runs on both input and output of Tez edge.
     transient public PhysicalPlan combinePlan;
+    private boolean needsDistinctCombiner;
 
     public String inputClassName;
     public String outputClassName;
@@ -65,6 +66,14 @@ public class TezEdgeDescriptor implement
         dataMovementType = DataMovementType.SCATTER_GATHER;
     }
 
+    public boolean needsDistinctCombiner() {
+        return needsDistinctCombiner;
+    }
+
+    public void setNeedsDistinctCombiner(boolean nic) {
+        needsDistinctCombiner = nic;
+    }
+
     public boolean isUseSecondaryKey() {
         return useSecondaryKey;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java?rev=1744111&r1=1744110&r2=1744111&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java Mon May 16 18:54:18 2016
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
@@ -80,6 +81,9 @@ public class TezPrinter extends TezOpPla
                     printer.setVerbose(isVerbose);
                     printer.visit();
                     mStream.println();
+                } else if (edgeDesc.needsDistinctCombiner()) {
+                    mStream.println("# Combine plan on edge <" + inEdge + ">");
+                    mStream.println(DistinctCombiner.Combine.class.getName());
                 }
             }
         }

Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld?rev=1744111&r1=1744110&r2=1744111&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld (original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld Mon May 16 18:54:18 2016
@@ -26,15 +26,7 @@ b: Local Rearrange[tuple]{tuple}(true) -
     |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-16
 # Combine plan on edge <scope-13>
-Local Rearrange[tuple]{tuple}(true) - scope-21	->	 scope-16
-|   |
-|   Project[tuple][*] - scope-20
-|
-|---New For Each(true)[bag] - scope-19
-    |   |
-    |   Project[tuple][0] - scope-18
-    |
-    |---Package(Packager)[tuple]{tuple} - scope-17
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Plan on vertex
 c: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-12
 |
@@ -42,8 +34,8 @@ c: Store(file:///tmp/pigoutput:org.apach
     |   |
     |   Project[int][1] - scope-9
     |
-    |---New For Each(true)[bag] - scope-24
+    |---New For Each(true)[bag] - scope-19
         |   |
-        |   Project[tuple][0] - scope-23
+        |   Project[tuple][0] - scope-18
         |
-        |---Package(Packager)[tuple]{tuple} - scope-22
+        |---Package(Packager)[tuple]{tuple} - scope-17