You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/04/13 01:38:28 UTC
svn commit: r1738879 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/
test/org/apache...
Author: rohini
Date: Tue Apr 12 23:38:28 2016
New Revision: 1738879
URL: http://svn.apache.org/viewvc?rev=1738879&view=rev
Log:
PIG-4868: Low values for bytes.per.reducer configured by user not honored in Tez for inputs (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1738879&r1=1738878&r2=1738879&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Apr 12 23:38:28 2016
@@ -109,6 +109,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4868: Low values for bytes.per.reducer configured by user not honored in Tez for inputs (rohini)
+
PIG-4869: Removing unwanted configuration in Tez broke ConfiguredFailoverProxyProvider (rohini)
PIG-4867: -stop_on_failure does not work with Tez (rohini)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1738879&r1=1738878&r2=1738879&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Tue Apr 12 23:38:28 2016
@@ -92,7 +92,7 @@ public class InputSizeReducerEstimator i
return reducers;
}
- static long getTotalInputFileSize(Configuration conf,
+ public static long getTotalInputFileSize(Configuration conf,
List<POLoad> lds, Job job) throws IOException {
return getTotalInputFileSize(conf, lds, job, Long.MAX_VALUE);
}
@@ -100,7 +100,7 @@ public class InputSizeReducerEstimator i
/**
* Get the input size for as many inputs as possible. Inputs that do not report
* their size nor can pig look that up itself are excluded from this size.
- *
+ *
* @param conf Configuration
* @param lds List of POLoads
* @param job Job
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1738879&r1=1738878&r2=1738879&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Tue Apr 12 23:38:28 2016
@@ -235,6 +235,7 @@ public class TezOperator extends Operato
}
private LoaderInfo loaderInfo = new LoaderInfo();
+ private long totalInputFilesSize = -1;
public TezOperator(OperatorKey k) {
super(k);
@@ -651,6 +652,14 @@ public class TezOperator extends Operato
return loaderInfo;
}
+ public long getTotalInputFilesSize() {
+ return totalInputFilesSize;
+ }
+
+ public void setTotalInputFilesSize(long totalInputFilesSize) {
+ this.totalInputFilesSize = totalInputFilesSize;
+ }
+
public void setUseGraceParallelism(boolean useGraceParallelism) {
this.useGraceParallelism = useGraceParallelism;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1738879&r1=1738878&r2=1738879&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java Tue Apr 12 23:38:28 2016
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
@@ -175,6 +176,7 @@ public class LoaderProcessor extends Tez
// splits can be moved to if(loads) block below
int parallelism = tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks();
tezOp.setRequestedParallelism(parallelism);
+ tezOp.setTotalInputFilesSize(InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job));
}
return lds;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1738879&r1=1738878&r2=1738879&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Tue Apr 12 23:38:28 2016
@@ -62,7 +62,6 @@ import org.apache.tez.dag.api.EdgeProper
*/
public class TezOperDependencyParallelismEstimator implements TezParallelismEstimator {
- static private int maxTaskCount;
static final double DEFAULT_FLATTEN_FACTOR = 10;
static final double DEFAULT_FILTER_FACTOR = 0.7;
static final double DEFAULT_LIMIT_FACTOR = 0.1;
@@ -76,6 +75,8 @@ public class TezOperDependencyParallelis
static final double DEFAULT_AGGREGATION_FACTOR = 0.7;
private PigContext pc;
+ private int maxTaskCount;
+ private long bytesPerReducer;
@Override
public void setPigContext(PigContext pc) {
@@ -94,6 +95,8 @@ public class TezOperDependencyParallelis
maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM,
PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
+ bytesPerReducer = conf.getLong(PigReducerEstimator.BYTES_PER_REDUCER_PARAM, PigReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
+
// If parallelism is set explicitly, respect it
if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) {
return tezOper.getRequestedParallelism();
@@ -130,6 +133,12 @@ public class TezOperDependencyParallelis
boolean applyFactor = !tezOper.isUnion();
if (!pred.isVertexGroup() && applyFactor) {
predParallelism = predParallelism * pred.getParallelismFactor(tezOper);
+ if (pred.getTotalInputFilesSize() > 0) {
+ // Estimate similar to mapreduce and use the maximum of two
+ int parallelismBySize = (int) Math.ceil((double) pred
+ .getTotalInputFilesSize() / bytesPerReducer);
+ predParallelism = Math.max(predParallelism, parallelismBySize);
+ }
}
estimatedParallelism += predParallelism;
}
@@ -156,12 +165,6 @@ public class TezOperDependencyParallelis
roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, maxTaskCount);
}
- if (roundedEstimatedParallelism == 0) {
- throw new IOException("Estimated parallelism for "
- + tezOper.getOperatorKey().toString()
- + " is 0 which is unexpected");
- }
-
return roundedEstimatedParallelism;
}
Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1738879&r1=1738878&r2=1738879&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Tue Apr 12 23:38:28 2016
@@ -149,6 +149,43 @@ public class TestTezAutoParallelism {
}
});
assertEquals(files.length, 1);
+ fs.delete(new Path("output1"), true);
+ }
+
+ @Test
+ public void testBytesPerReducer() throws IOException{
+
+ NodeIdGenerator.reset();
+ PigServer.resetScope();
+
+ pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
+
+ StringWriter writer = new StringWriter();
+ Util.createLogAppender("testAutoParallelism", writer, TezDagBuilder.class);
+ try {
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+ pigServer.registerQuery("B = group A by name;");
+ pigServer.store("B", "output1");
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus[] files = fs.listStatus(new Path("output1"), new PathFilter(){
+ @Override
+ public boolean accept(Path path) {
+ if (path.getName().startsWith("part")) {
+ return true;
+ }
+ return false;
+ }
+ });
+ assertEquals(files.length, 10);
+ String log = writer.toString();
+ assertTrue(log.contains("For vertex - scope-13: parallelism=3"));
+ assertTrue(log.contains("For vertex - scope-14: parallelism=10"));
+ } finally {
+ Util.removeLogAppender("testAutoParallelism", TezDagBuilder.class);
+ Util.deleteFile(cluster, "output1");
+ }
}
@Test