You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/04/23 21:56:36 UTC
svn commit: r1589504 - in /pig/branches/tez:
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/
src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/...
Author: rohini
Date: Wed Apr 23 19:56:35 2014
New Revision: 1589504
URL: http://svn.apache.org/r1589504
Log:
PIG-3908: Fix UnionOptimizer bug with expressions and MR compressions settings not honored (rohini)
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
pig/branches/tez/test/e2e/pig/tests/nightly.conf
pig/branches/tez/test/tez-tests
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1589504&r1=1589503&r2=1589504&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Apr 23 19:56:35 2014
@@ -519,16 +519,7 @@ public class JobControlCompiler{
log.info("mapred.job.reduce.markreset.buffer.percent is set to " + conf.get("mapred.job.reduce.markreset.buffer.percent"));
}
- // Convert mapred.output.* to output.compression.*, See PIG-1791
- if( "true".equals( conf.get( "mapred.output.compress" ) ) ) {
- conf.set( "output.compression.enabled", "true" );
- String codec = conf.get( "mapred.output.compression.codec" );
- if( codec == null ) {
- throw new JobCreationException("'mapred.output.compress' is set but no value is specified for 'mapred.output.compression.codec'." );
- } else {
- conf.set( "output.compression.codec", codec );
- }
- }
+ configureCompression(conf);
try{
//Process the POLoads
@@ -701,7 +692,7 @@ public class JobControlCompiler{
// the OutputFormat we report to Hadoop is always PigOutputFormat which
// can be wrapped with LazyOutputFormat provided if it is supported by
- // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+ // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
if ("true".equalsIgnoreCase(conf.get(PigConfiguration.PIG_OUTPUT_LAZY))) {
try {
Class<?> clazz = PigContext.resolveClassName(
@@ -734,7 +725,7 @@ public class JobControlCompiler{
if(!pigContext.inIllustrator)
mro.reducePlan.remove(st);
}
-
+
MapRedUtil.setupStreamingDirsConfSingle(st, pigContext, conf);
}
else if (mapStores.size() + reduceStores.size() > 0) { // multi store case
@@ -964,6 +955,19 @@ public class JobControlCompiler{
}
}
+ public static void configureCompression(Configuration conf) {
+ // Convert mapred.output.* to output.compression.*, See PIG-1791
+ if( "true".equals( conf.get( "mapred.output.compress" ) ) ) {
+ conf.set( "output.compression.enabled", "true" );
+ String codec = conf.get( "mapred.output.compression.codec" );
+ if( codec == null ) {
+ throw new IllegalArgumentException("'mapred.output.compress' is set but no value is specified for 'mapred.output.compression.codec'." );
+ } else {
+ conf.set( "output.compression.codec", codec );
+ }
+ }
+ }
+
/**
* Adjust the number of reducers based on the default_parallel, requested parallel and estimated
* parallel. For sampler jobs, we also adjust the next job in advance to get its runtime parallel as
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=1589504&r1=1589503&r2=1589504&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Wed Apr 23 19:56:35 2014
@@ -29,9 +29,10 @@ import java.util.Map;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ComparisonOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryComparisonOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryExpressionOperator;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.PlanException;
@@ -39,18 +40,18 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.impl.util.MultiMap;
/**
- *
- * The base class for all types of physical plans.
+ *
+ * The base class for all types of physical plans.
* This extends the Operator Plan.
*
*/
public class PhysicalPlan extends OperatorPlan<PhysicalOperator> implements Cloneable {
/**
- *
+ *
*/
private static final long serialVersionUID = 1L;
-
+
// marker to indicate whether all input for this plan
// has been sent - this is currently used in POStream
// to know if all map() calls and reduce() calls are finished
@@ -58,18 +59,18 @@ public class PhysicalPlan extends Operat
public boolean endOfAllInput = false;
private MultiMap<PhysicalOperator, PhysicalOperator> opmap = null;
-
+
public PhysicalPlan() {
super();
}
-
+
public void attachInput(Tuple t){
List<PhysicalOperator> roots = getRoots();
for (PhysicalOperator operator : roots) {
operator.attachInput(t);
}
}
-
+
public void detachInput(){
for(PhysicalOperator op : getRoots())
op.detachInput();
@@ -117,7 +118,7 @@ public class PhysicalPlan extends Operat
ps.println("<physicalPlan>XML Not Supported</physicalPlan>");
return;
}
-
+
ps.println("#-----------------------------------------------");
ps.println("# Physical Plan:");
ps.println("#-----------------------------------------------");
@@ -132,22 +133,22 @@ public class PhysicalPlan extends Operat
}
ps.println("");
}
-
+
@Override
public void connect(PhysicalOperator from, PhysicalOperator to)
throws PlanException {
-
+
super.connect(from, to);
to.setInputs(getPredecessors(to));
}
-
+
/*public void connect(List<PhysicalOperator> from, PhysicalOperator to) throws IOException{
if(!to.supportsMultipleInputs()){
throw new IOException("Invalid Operation on " + to.name() + ". It doesn't support multiple inputs.");
}
-
+
}*/
-
+
@Override
public void remove(PhysicalOperator op) {
op.setInputs(null);
@@ -169,7 +170,7 @@ public class PhysicalPlan extends Operat
}
super.remove(op);
}
-
+
/* (non-Javadoc)
* @see org.apache.pig.impl.plan.OperatorPlan#replace(org.apache.pig.impl.plan.Operator, org.apache.pig.impl.plan.Operator)
*/
@@ -187,10 +188,10 @@ public class PhysicalPlan extends Operat
if(inputs.get(i) == oldNode) {
inputs.set(i, newNode);
}
- }
+ }
}
}
-
+
}
/* (non-Javadoc)
@@ -226,7 +227,7 @@ public class PhysicalPlan extends Operat
// clones, create a map between clone and original. Then walk the
// connections in this plan and create equivalent connections in the
// clone.
- Map<PhysicalOperator, PhysicalOperator> matches =
+ Map<PhysicalOperator, PhysicalOperator> matches =
new HashMap<PhysicalOperator, PhysicalOperator>(mOps.size());
for (PhysicalOperator op : mOps.keySet()) {
PhysicalOperator c = op.clone();
@@ -264,7 +265,7 @@ public class PhysicalPlan extends Operat
for (PhysicalOperator op : mOps.keySet()) {
List<PhysicalOperator> inputs = op.getInputs();
if (inputs == null || inputs.size() == 0) continue;
- List<PhysicalOperator> newInputs =
+ List<PhysicalOperator> newInputs =
new ArrayList<PhysicalOperator>(inputs.size());
PhysicalOperator cloneOp = matches.get(op);
if (cloneOp == null) {
@@ -281,13 +282,17 @@ public class PhysicalPlan extends Operat
}
cloneOp.setInputs(newInputs);
}
-
+
for (PhysicalOperator op : mOps.keySet()) {
- if (op instanceof UnaryComparisonOperator) {
- UnaryComparisonOperator orig = (UnaryComparisonOperator)op;
- UnaryComparisonOperator cloneOp = (UnaryComparisonOperator)matches.get(op);
- cloneOp.setExpr((ExpressionOperator)matches.get(orig.getExpr()));
+ if (op instanceof ComparisonOperator) {
+ ComparisonOperator orig = (ComparisonOperator)op;
+ ComparisonOperator cloneOp = (ComparisonOperator)matches.get(op);
cloneOp.setOperandType(orig.getOperandType());
+ }
+ if (op instanceof UnaryExpressionOperator) {
+ UnaryExpressionOperator orig = (UnaryExpressionOperator)op;
+ UnaryExpressionOperator cloneOp = (UnaryExpressionOperator)matches.get(op);
+ cloneOp.setExpr((ExpressionOperator)matches.get(orig.getExpr()));
} else if (op instanceof BinaryExpressionOperator) {
BinaryExpressionOperator orig = (BinaryExpressionOperator)op;
BinaryExpressionOperator cloneOp = (BinaryExpressionOperator)matches.get(op);
@@ -304,11 +309,11 @@ public class PhysicalPlan extends Operat
return clone;
}
-
+
public void setOpMap(MultiMap<PhysicalOperator, PhysicalOperator> opmap) {
this.opmap = opmap;
}
-
+
public void resetOpMap()
{
opmap = null;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java?rev=1589504&r1=1589503&r2=1589504&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java Wed Apr 23 19:56:35 2014
@@ -65,7 +65,7 @@ public class SecondaryKeyOptimizerTez ex
break;
}
}
-
+
if (connectingLR == null) {
continue;
}
@@ -101,9 +101,9 @@ public class SecondaryKeyOptimizerTez ex
inEdge.setUseSecondaryKey(true);
inEdge.setSecondarySortOrder(info.getSecondarySortOrder());
log.info("Using Secondary Key Optimization in the edge between vertex - "
- + to.getOperatorKey()
+ + from.getOperatorKey()
+ " and vertex - "
- + from.getOperatorKey());
+ + to.getOperatorKey());
}
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1589504&r1=1589503&r2=1589504&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Apr 23 19:56:35 2014
@@ -113,8 +113,8 @@ import org.apache.tez.mapreduce.hadoop.M
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
-import org.apache.tez.runtime.library.input.SortedGroupedMergedInput;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
+import org.apache.tez.runtime.library.input.SortedGroupedMergedInput;
/**
* A visitor to construct DAG out of Tez plan.
@@ -330,7 +330,7 @@ public class TezDagBuilder extends TezOp
conf.set("udf.import.list",
ObjectSerializer.serialize(PigContext.getPackageImportList()));
- MRToTezHelper.convertMRToTezRuntimeConf(conf, globalConf);
+ MRToTezHelper.processMRSettings(conf, globalConf);
in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
@@ -506,7 +506,7 @@ public class TezDagBuilder extends TezOp
UDFContext.getUDFContext().serialize(payloadConf);
- MRToTezHelper.convertMRToTezRuntimeConf(payloadConf, globalConf);
+ MRToTezHelper.processMRSettings(payloadConf, globalConf);
if (!pc.inIllustrator) {
for (POStore store : stores) {
@@ -559,6 +559,7 @@ public class TezDagBuilder extends TezOp
}
tezOp.setRequestedParallelism(parallelism);
}
+
Vertex vertex = new Vertex(tezOp.getOperatorKey().toString(), procDesc, parallelism,
isMap ? MRHelpers.getMapResource(globalConf) : MRHelpers.getReduceResource(globalConf));
@@ -913,4 +914,5 @@ public class TezDagBuilder extends TezOp
comparatorForKeyType(keyType), RawComparator.class);
}
}
+
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1589504&r1=1589503&r2=1589504&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Wed Apr 23 19:56:35 2014
@@ -17,7 +17,9 @@
*/
package org.apache.pig.backend.hadoop.executionengine.tez.util;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -25,7 +27,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.impl.util.Pair;
import org.apache.tez.common.TezJobConfig;
@@ -43,11 +49,14 @@ public class MRToTezHelper {
*/
private static Map<String, Pair<String, String>> mrToTezIOParamMap = new HashMap<String, Pair<String, String>>();
+ private static List<String> mrSettingsToRetain = new ArrayList<String>();
+
private MRToTezHelper() {
}
static {
populateMRToTezIOParamMap();
+ populateMRSettingsToRetain();
}
private static void populateMRToTezIOParamMap() {
@@ -63,6 +72,25 @@ public class MRToTezHelper {
}
+ private static void populateMRSettingsToRetain() {
+
+ // FileInputFormat
+ mrSettingsToRetain.add(FileInputFormat.INPUT_DIR);
+ mrSettingsToRetain.add(FileInputFormat.SPLIT_MAXSIZE);
+ mrSettingsToRetain.add(FileInputFormat.SPLIT_MINSIZE);
+ mrSettingsToRetain.add(FileInputFormat.PATHFILTER_CLASS);
+ mrSettingsToRetain.add(FileInputFormat.NUM_INPUT_FILES);
+ mrSettingsToRetain.add(FileInputFormat.INPUT_DIR_RECURSIVE);
+
+ // FileOutputFormat
+ mrSettingsToRetain.add("mapreduce.output.basename");
+ mrSettingsToRetain.add(FileOutputFormat.COMPRESS);
+ mrSettingsToRetain.add(FileOutputFormat.COMPRESS_CODEC);
+ mrSettingsToRetain.add(FileOutputFormat.COMPRESS_TYPE);
+ mrSettingsToRetain.add(FileOutputFormat.OUTDIR);
+ mrSettingsToRetain.add(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER);
+ }
+
public static TezConfiguration getDAGAMConfFromMRConf(
TezConfiguration tezConf) {
@@ -107,26 +135,50 @@ public class MRToTezHelper {
return dagAMConf;
}
- public static void convertMRToTezRuntimeConf(Configuration conf, Configuration baseConf) {
- for (Entry<String, String> dep : DeprecatedKeys
- .getMRToTezRuntimeParamMap().entrySet()) {
- if (baseConf.get(dep.getKey()) != null) {
+ /**
+ * Process the mapreduce configuration settings and
+ * - copy as is the still required ones (like those used by FileInputFormat/FileOutputFormat)
+ * - convert and set equivalent tez runtime settings
+ * - handle compression related settings
+ *
+ * @param conf Configuration on which the mapreduce settings will have to be transferred
+ * @param mrConf Configuration that contains mapreduce settings
+ */
+ public static void processMRSettings(Configuration conf, Configuration mrConf) {
+ for (String mrSetting : mrSettingsToRetain) {
+ if (mrConf.get(mrSetting) != null) {
+ conf.set(mrSetting, mrConf.get(mrSetting));
+ }
+ }
+ JobControlCompiler.configureCompression(conf);
+ convertMRToTezRuntimeConf(conf, mrConf);
+ }
+
+ /**
+ * Convert MR settings to Tez settings and set on conf.
+ *
+ * @param conf Configuration on which MR equivalent Tez settings should be set
+ * @param mrConf Configuration that contains MR settings
+ */
+ private static void convertMRToTezRuntimeConf(Configuration conf, Configuration mrConf) {
+ for (Entry<String, String> dep : DeprecatedKeys.getMRToTezRuntimeParamMap().entrySet()) {
+ if (mrConf.get(dep.getKey()) != null) {
conf.unset(dep.getKey());
LOG.info("Setting " + dep.getValue() + " to "
- + baseConf.get(dep.getKey()) + " from MR setting "
+ + mrConf.get(dep.getKey()) + " from MR setting "
+ dep.getKey());
- conf.setIfUnset(dep.getValue(), baseConf.get(dep.getKey()));
+ conf.setIfUnset(dep.getValue(), mrConf.get(dep.getKey()));
}
}
for (Entry<String, Pair<String, String>> dep : mrToTezIOParamMap.entrySet()) {
- if (baseConf.get(dep.getKey()) != null) {
+ if (mrConf.get(dep.getKey()) != null) {
conf.unset(dep.getKey());
LOG.info("Setting " + dep.getValue() + " to "
- + baseConf.get(dep.getKey()) + " from MR setting "
+ + mrConf.get(dep.getKey()) + " from MR setting "
+ dep.getKey());
- conf.setIfUnset(dep.getValue().first, baseConf.get(dep.getKey()));
- conf.setIfUnset(dep.getValue().second, baseConf.get(dep.getKey()));
+ conf.setIfUnset(dep.getValue().first, mrConf.get(dep.getKey()));
+ conf.setIfUnset(dep.getValue().second, mrConf.get(dep.getKey()));
}
}
}
Modified: pig/branches/tez/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/nightly.conf?rev=1589504&r1=1589503&r2=1589504&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/nightly.conf Wed Apr 23 19:56:35 2014
@@ -1455,6 +1455,16 @@ i = group h by name;
i = foreach i generate group, SUM(h.age);
store i into ':OUTPATH:';\,
},
+ {
+ 'num' => 12,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa:double);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age:int, gpa:double);
+c = union a, b;
+-- Exercise all expression operators --
+d = foreach c generate (name is not NULL? UPPER(name) : 'FNU LNU') as name, (age < 30 ? -1 : age) as age, (gpa is NULL ? 0.0 : ((gpa > 0.5 AND gpa < 1.0) ? 1 : gpa)) as gpa;
+e = filter d by (name matches '.*MIKE.*') OR (NOT (gpa + 1.5 > 4));
+store e into ':OUTPATH:';\,
+ },
]
},
{
Modified: pig/branches/tez/test/tez-tests
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/tez-tests?rev=1589504&r1=1589503&r2=1589504&view=diff
==============================================================================
--- pig/branches/tez/test/tez-tests (original)
+++ pig/branches/tez/test/tez-tests Wed Apr 23 19:56:35 2014
@@ -1,21 +1,21 @@
-**/TestTezCompiler.java
-**/TestTezJobControlCompiler.java
-**/TestTezLauncher.java
-**/TestCombiner.java
**/TestAccumulator.java
-**/TestSkewedJoin.java
-**/TestSplitStore.java
-**/TestCustomPartitioner.java
-**/TestPigContext.java
-**/TestPigStorage.java
-**/TestNestedForeach.java
-**/TestEvalPipeline.java
-**/TestPigServer.java
**/TestAlgebraicEval.java
+**/TestBZip.java
**/TestBestFitCast.java
**/TestBinaryExpressionOps.java
**/TestBuiltin.java
-**/TestBZip.java
-**/TestCompressedFiles.java
**/TestCharArrayToNumeric.java
+**/TestCombiner.java
+**/TestCompressedFiles.java
+**/TestCustomPartitioner.java
+**/TestEvalPipeline.java
+**/TestNestedForeach.java
+**/TestPigContext.java
+**/TestPigServer.java
+**/TestPigStorage.java
**/TestSecondarySortTez.java
+**/TestSkewedJoin.java
+**/TestSplitStore.java
+**/TestTezCompiler.java
+**/TestTezJobControlCompiler.java
+**/TestTezLauncher.java