You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/10/16 22:48:46 UTC
svn commit: r1709109 - in /pig/branches/branch-0.15: ./
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/
src/org/apa...
Author: daijy
Date: Fri Oct 16 20:48:45 2015
New Revision: 1709109
URL: http://svn.apache.org/viewvc?rev=1709109&view=rev
Log:
PIG-4703: TezOperator.stores shall not ship to backend
Modified:
pig/branches/branch-0.15/CHANGES.txt
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
pig/branches/branch-0.15/test/org/apache/pig/tez/TestTezGraceParallelism.java
Modified: pig/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1709109&r1=1709108&r2=1709109&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Fri Oct 16 20:48:45 2015
@@ -28,6 +28,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4703: TezOperator.stores shall not ship to backend (daijy)
+
PIG-4688: Limit followed by POPartialAgg can give empty or partial results in Tez (rohini)
PIG-4635: NPE while running pig script in tez mode (daijy)
Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1709109&r1=1709108&r2=1709109&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Fri Oct 16 20:48:45 2015
@@ -490,7 +490,7 @@ public class TezDagBuilder extends TezOp
// usually followed by limit other than store. But would benefit
// cases like skewed join followed by group by.
if (tezOp.getSortOperator().getEstimatedParallelism() != -1
- && TezCompilerUtil.isIntermediateReducer(tezOp.getSortOperator())) {
+ && tezOp.getSortOperator().isIntermediateReducer()) {
payloadConf.setLong(
InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
intermediateTaskInputSize);
Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1709109&r1=1709108&r2=1709109&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java Fri Oct 16 20:48:45 2015
@@ -32,7 +32,7 @@ import org.apache.tez.runtime.library.ou
*/
public class TezEdgeDescriptor implements Serializable {
// Combiner runs on both input and output of Tez edge.
- public PhysicalPlan combinePlan;
+ transient public PhysicalPlan combinePlan;
public String inputClassName;
public String outputClassName;
Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1709109&r1=1709108&r2=1709109&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Fri Oct 16 20:48:45 2015
@@ -18,6 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.tez.plan;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.BitSet;
@@ -35,6 +36,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezOperDependencyParallelismEstimator.TezParallelismFactorVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
@@ -145,7 +147,7 @@ public class TezOperator extends Operato
private double parallelismFactor = -1;
- private LinkedList<POStore> stores = null;
+ private Boolean intermediateReducer = null;
// Types of blocking operators. For now, we only support the following ones.
public static enum OPER_FEATURE {
@@ -634,11 +636,22 @@ public class TezOperator extends Operato
return parallelismFactor;
}
- public LinkedList<POStore> getStores() throws VisitorException {
- if (stores == null) {
- stores = PlanHelper.getPhysicalOperators(plan, POStore.class);
+ public Boolean isIntermediateReducer() throws IOException {
+ if (intermediateReducer == null) {
+ intermediateReducer = false;
+ // set intermediateReducer to true if are no loads or stores in a TezOperator
+ LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(plan, POStore.class);
+ // Not map and not final reducer
+ if (stores.size() <= 0 &&
+ (getLoaderInfo().getLoads() == null || getLoaderInfo().getLoads().size() <= 0)) {
+ intermediateReducer = true;
+ }
}
- return stores;
+ return intermediateReducer;
+ }
+
+ public void setIntermediateReducer(Boolean intermediateReducer) {
+ this.intermediateReducer = intermediateReducer;
}
public static class VertexGroupInfo {
Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1709109&r1=1709108&r2=1709109&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Fri Oct 16 20:48:45 2015
@@ -17,7 +17,6 @@
*/
package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;
-import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -26,13 +25,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
-import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
import org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.PigImplConstants;
@@ -80,11 +77,6 @@ public class ParallelismSetter extends T
// Can only set parallelism here if the parallelism isn't derived from
// splits
int parallelism = -1;
- boolean intermediateReducer = false;
- LinkedList<POStore> stores = tezOp.getStores();
- if (stores.size() <= 0) {
- intermediateReducer = true;
- }
if (tezOp.getLoaderInfo().getLoads() != null && tezOp.getLoaderInfo().getLoads().size() > 0) {
// requestedParallelism of Loader vertex is handled in LoaderProcessor
// propogate to vertexParallelism
@@ -93,7 +85,6 @@ public class ParallelismSetter extends T
} else {
int prevParallelism = -1;
boolean isOneToOneParallelism = false;
- intermediateReducer = TezCompilerUtil.isIntermediateReducer(tezOp);
for (Map.Entry<OperatorKey,TezEdgeDescriptor> entry : tezOp.inEdges.entrySet()) {
if (entry.getValue().dataMovementType == DataMovementType.ONE_TO_ONE) {
@@ -126,7 +117,7 @@ public class ParallelismSetter extends T
boolean overrideRequestedParallelism = false;
if (parallelism != -1
&& autoParallelismEnabled
- && intermediateReducer
+ && tezOp.isIntermediateReducer()
&& !tezOp.isDontEstimateParallelism()
&& tezOp.isOverrideIntermediateParallelism()) {
overrideRequestedParallelism = true;
Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1709109&r1=1709108&r2=1709109&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Fri Oct 16 20:48:45 2015
@@ -36,6 +36,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
@@ -79,15 +80,13 @@ public class TezOperDependencyParallelis
return -1;
}
- boolean intermediateReducer = TezCompilerUtil.isIntermediateReducer(tezOper);
-
// TODO: If map opts and reduce opts are same estimate higher parallelism
// for tasks based on the count of number of map tasks else be conservative as now
maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM,
PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
// If parallelism is set explicitly, respect it
- if (!intermediateReducer && tezOper.getRequestedParallelism()!=-1) {
+ if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) {
return tezOper.getRequestedParallelism();
}
@@ -129,7 +128,7 @@ public class TezOperDependencyParallelis
int roundedEstimatedParallelism = (int)Math.ceil(estimatedParallelism);
- if (intermediateReducer && tezOper.isOverrideIntermediateParallelism()) {
+ if (tezOper.isIntermediateReducer() && tezOper.isOverrideIntermediateParallelism()) {
// Estimated reducers should not be more than the configured limit
roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, maxTaskCount);
int userSpecifiedParallelism = pc.defaultParallel;
Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1709109&r1=1709108&r2=1709109&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Fri Oct 16 20:48:45 2015
@@ -192,19 +192,4 @@ public class TezCompilerUtil {
edge.setIntermediateOutputValueClass(TUPLE_CLASS);
}
- /**
- * Returns true if there are no loads or stores in a TezOperator.
- * To be called only after LoaderProcessor is called
- */
- static public boolean isIntermediateReducer(TezOperator tezOper) throws VisitorException {
- boolean intermediateReducer = false;
- LinkedList<POStore> stores = tezOper.getStores();
- // Not map and not final reducer
- if (stores.size() <= 0 &&
- (tezOper.getLoaderInfo().getLoads() == null || tezOper.getLoaderInfo().getLoads().size() <= 0)) {
- intermediateReducer = true;
- }
- return intermediateReducer;
- }
-
}
Modified: pig/branches/branch-0.15/test/org/apache/pig/tez/TestTezGraceParallelism.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/tez/TestTezGraceParallelism.java?rev=1709109&r1=1709108&r2=1709109&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/org/apache/pig/tez/TestTezGraceParallelism.java (original)
+++ pig/branches/branch-0.15/test/org/apache/pig/tez/TestTezGraceParallelism.java Fri Oct 16 20:48:45 2015
@@ -26,17 +26,23 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Properties;
import java.util.Random;
import java.util.regex.Pattern;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.PigServer;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
+import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.test.Util;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.junit.AfterClass;
@@ -282,4 +288,40 @@ public class TestTezGraceParallelism {
Util.removeLogAppender(PigGraceShuffleVertexManager.class, "testJoinWithUnion");
}
}
+
+ @Test
+ // See PIG-4703
+ public void testUDFContextSetInBackend() throws IOException{
+ NodeIdGenerator.reset();
+ PigServer.resetScope();
+ File outputDir = File.createTempFile("intemediate", "txt");
+ outputDir.delete();
+ pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+ pigServer.registerQuery("B = order A by name;");
+ pigServer.registerQuery("C = distinct B;");
+ pigServer.registerQuery("D = load '" + INPUT_DIR + "/" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+ pigServer.registerQuery("E = group D by name;");
+ pigServer.registerQuery("F = foreach E generate group as name, AVG(D.age) as avg_age;");
+ pigServer.registerQuery("G = join C by name left, F by name;");
+ ExecJob job = pigServer.store("G", Util.removeColon(outputDir.getAbsolutePath()), StorerWithUDFContextCheck.class.getName());
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+
+ static public class StorerWithUDFContextCheck extends PigStorage {
+ @Override
+ public void checkSchema(ResourceSchema resourceSchema) throws IOException {
+ UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{signature})
+ .setProperty("schema", ObjectSerializer.serialize(resourceSchema));
+ }
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ Properties udfProps = UDFContext.getUDFContext().getUDFProperties(
+ this.getClass(), new String[]{signature});
+ ResourceSchema rs = (ResourceSchema)ObjectSerializer.deserialize(udfProps.getProperty("schema"));
+ if (rs == null) {
+ throw new IOException("Should not be null");
+ }
+ super.setStoreLocation(location, job);
+ }
+ }
}