You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/05/16 18:54:18 UTC
svn commit: r1744111 - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/ test/org/a...
Author: rohini
Date: Mon May 16 18:54:18 2016
New Revision: 1744111
URL: http://svn.apache.org/viewvc?rev=1744111&view=rev
Log:
PIG-4884: Tez needs to use DistinctCombiner.Combine (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/JVMReuseImpl.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1744111&r1=1744110&r2=1744111&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon May 16 18:54:18 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4884: Tez needs to use DistinctCombiner.Combine (rohini)
+
PIG-4874: Remove schema tuple reference overhead for replicate join hashmap (rohini)
PIG-4879: Pull latest version of joda-time (rohini)
Modified: pig/trunk/src/org/apache/pig/JVMReuseImpl.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/JVMReuseImpl.java?rev=1744111&r1=1744110&r2=1744111&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/JVMReuseImpl.java (original)
+++ pig/trunk/src/org/apache/pig/JVMReuseImpl.java Mon May 16 18:54:18 2016
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -47,6 +48,7 @@ public class JVMReuseImpl {
PigGenericMapReduce.staticDataCleanup();
PigStatusReporter.staticDataCleanup();
PigCombiner.Combine.staticDataCleanup();
+ DistinctCombiner.Combine.staticDataCleanup();
String className = null;
String msg = null;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java?rev=1744111&r1=1744110&r2=1744111&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java Mon May 16 18:54:18 2016
@@ -22,43 +22,48 @@ import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
-
-import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
/**
* A special implementation of combiner used only for distinct. This combiner
* does not even parse out the records. It just throws away duplicate values
- * in the key in order ot minimize the data being sent to the reduce.
+ * in the key in order to minimize the data being sent to the reduce.
*/
public class DistinctCombiner {
- public static class Combine
+ public static class Combine
extends Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
-
+
private final Log log = LogFactory.getLog(getClass());
- ProgressableReporter pigReporter;
-
- /**
- * Configures the reporter
- */
+ private static boolean firstTime = true;
+
+ //@StaticDataCleanup
+ public static void staticDataCleanup() {
+ firstTime = true;
+ }
+
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
- pigReporter = new ProgressableReporter();
+ Configuration jConf = context.getConfiguration();
+ // Avoid log spamming
+ if (firstTime) {
+ log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location"));
+ firstTime = false;
+ }
}
-
+
/**
* The reduce function which removes values.
*/
@Override
- protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
+ protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
throws IOException, InterruptedException {
-
- pigReporter.setRep(context);
// Take the first value and the key and collect
// just that.
@@ -66,6 +71,7 @@ public class DistinctCombiner {
NullableTuple val = iter.next();
context.write(key, val);
}
+
}
-
+
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1744111&r1=1744110&r2=1744111&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon May 16 18:54:18 2016
@@ -56,6 +56,7 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
@@ -440,7 +441,14 @@ public class TezDagBuilder extends TezOp
Configuration conf = new Configuration(pigContextConf);
- if (!combinePlan.isEmpty()) {
+ if (edge.needsDistinctCombiner()) {
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS,
+ MRCombiner.class.getName());
+ conf.set(MRJobConfig.COMBINE_CLASS_ATTR,
+ DistinctCombiner.Combine.class.getName());
+ log.info("Setting distinct combiner class between "
+ + from.getOperatorKey() + " and " + to.getOperatorKey());
+ } else if (!combinePlan.isEmpty()) {
udfContextSeparator.serializeUDFContextForEdge(conf, from, to, UDFType.USERFUNC);
addCombiner(combinePlan, to, conf, isMergedInput);
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1744111&r1=1744110&r2=1744111&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Mon May 16 18:54:18 2016
@@ -655,15 +655,8 @@ public class TezCompiler extends PhyPlan
blocking();
TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
- // Add the DISTINCT plan as the combine plan. In MR Pig, the combiner is implemented
- // with a global variable and a specific DistinctCombiner class. This seems better.
- PhysicalPlan combinePlan = curTezOp.inEdges.get(lastOp.getOperatorKey()).combinePlan;
- addDistinctPlan(combinePlan, 1);
-
- POLocalRearrangeTez clr = localRearrangeFactory.create();
- clr.setOutputKey(curTezOp.getOperatorKey().toString());
- clr.setDistinct(true);
- combinePlan.addAsLeaf(clr);
+ TezEdgeDescriptor edge = curTezOp.inEdges.get(lastOp.getOperatorKey());
+ edge.setNeedsDistinctCombiner(true);
curTezOp.markDistinct();
addDistinctPlan(curTezOp.plan, op.getRequestedParallelism());
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1744111&r1=1744110&r2=1744111&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java Mon May 16 18:54:18 2016
@@ -33,6 +33,7 @@ import org.apache.tez.runtime.library.ou
public class TezEdgeDescriptor implements Serializable {
// Combiner runs on both input and output of Tez edge.
transient public PhysicalPlan combinePlan;
+ private boolean needsDistinctCombiner;
public String inputClassName;
public String outputClassName;
@@ -65,6 +66,14 @@ public class TezEdgeDescriptor implement
dataMovementType = DataMovementType.SCATTER_GATHER;
}
+ public boolean needsDistinctCombiner() {
+ return needsDistinctCombiner;
+ }
+
+ public void setNeedsDistinctCombiner(boolean nic) {
+ needsDistinctCombiner = nic;
+ }
+
public boolean isUseSecondaryKey() {
return useSecondaryKey;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java?rev=1744111&r1=1744110&r2=1744111&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java Mon May 16 18:54:18 2016
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
@@ -80,6 +81,9 @@ public class TezPrinter extends TezOpPla
printer.setVerbose(isVerbose);
printer.visit();
mStream.println();
+ } else if (edgeDesc.needsDistinctCombiner()) {
+ mStream.println("# Combine plan on edge <" + inEdge + ">");
+ mStream.println(DistinctCombiner.Combine.class.getName());
}
}
}
Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld?rev=1744111&r1=1744110&r2=1744111&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld (original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld Mon May 16 18:54:18 2016
@@ -26,15 +26,7 @@ b: Local Rearrange[tuple]{tuple}(true) -
|---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-16
# Combine plan on edge <scope-13>
-Local Rearrange[tuple]{tuple}(true) - scope-21 -> scope-16
-| |
-| Project[tuple][*] - scope-20
-|
-|---New For Each(true)[bag] - scope-19
- | |
- | Project[tuple][0] - scope-18
- |
- |---Package(Packager)[tuple]{tuple} - scope-17
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
# Plan on vertex
c: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-12
|
@@ -42,8 +34,8 @@ c: Store(file:///tmp/pigoutput:org.apach
| |
| Project[int][1] - scope-9
|
- |---New For Each(true)[bag] - scope-24
+ |---New For Each(true)[bag] - scope-19
| |
- | Project[tuple][0] - scope-23
+ | Project[tuple][0] - scope-18
|
- |---Package(Packager)[tuple]{tuple} - scope-22
+ |---Package(Packager)[tuple]{tuple} - scope-17