You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/07/07 22:08:16 UTC
svn commit: r1608593 [1/3] - in /pig/trunk: ./
shims/test/hadoop23/org/apache/pig/test/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/
src/org...
Author: rohini
Date: Mon Jul 7 20:08:14 2014
New Revision: 1608593
URL: http://svn.apache.org/r1608593
Log:
PIG-3935: Port more mini cluster tests to Tez - part 5 (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/build.xml
pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
pig/trunk/test/e2e/pig/tests/nightly.conf
pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java
pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
pig/trunk/test/org/apache/pig/test/TestGrunt.java
pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java
pig/trunk/test/org/apache/pig/test/TestLimitVariable.java
pig/trunk/test/org/apache/pig/test/TestParser.java
pig/trunk/test/org/apache/pig/test/TestPigSplit.java
pig/trunk/test/org/apache/pig/test/TestScalarAliases.java
pig/trunk/test/org/apache/pig/test/TestStore.java
pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld
pig/trunk/test/tez-tests
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jul 7 20:08:14 2014
@@ -42,6 +42,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-3935: Port more mini cluster tests to Tez - part 5 (rohini)
+
PIG-3984: PigServer.shutdown removes the tez resource folder (daijy via rohini)
PIG-4048: TEZ-692 has a incompatible API change removing TezSession (rohini)
Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Mon Jul 7 20:08:14 2014
@@ -157,6 +157,15 @@
<condition property="isWindows">
<equals arg1="${windows}" arg2="true" />
</condition>
+
+ <target name="setTezEnv">
+ <propertyreset name="hadoopversion" value="23" />
+ <propertyreset name="isHadoop23" value="true" />
+ <propertyreset name="src.shims.dir" value="${basedir}/shims/src/hadoop${hadoopversion}" />
+ <propertyreset name="src.shims.test.dir" value="${basedir}/shims/test/hadoop${hadoopversion}" />
+ <propertyreset name="src.exclude.dir" value="" />
+ <propertyreset name="test.exec.type" value="tez" />
+ </target>
<target name="setWindowsPath" if="${isWindows}">
<property name="build.path" value="${env.Path};${hadoop.root}\bin" />
@@ -869,10 +878,10 @@
<macro-test-runner test.file="${test.smoke.file}" />
</target>
- <target name="test-tez" depends="setWindowsPath,setLinuxPath,compile-test,jar-withouthadoop,debugger.check" description="Run tez unit tests">
+ <target name="test-tez" depends="setTezEnv,setWindowsPath,setLinuxPath,compile-test,jar-withouthadoop,debugger.check" description="Run tez unit tests">
<macro-test-runner test.file="${test.tez.file}" />
</target>
-
+
<target name="debugger.check" depends="debugger.set,debugger.unset"/>
<target name="debugger.set" if="debugPort">
<property name="debugArgs" value="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=${debugPort}"/>
Modified: pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Mon Jul 7 20:08:14 2014
@@ -105,8 +105,6 @@ public class TezMiniCluster extends Mini
// Write tez-site.xml
Configuration tez_conf = new Configuration(false);
- // TODO Remove this once TezSession reuse in mini cluster is fixed
- tez_conf.set(PigConfiguration.TEZ_SESSION_REUSE, "false");
// TODO PIG-3659 - Remove this once memory management is fixed
tez_conf.set(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, "20");
tez_conf.set("tez.lib.uris", "hdfs:///tez,hdfs:///tez/lib");
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon Jul 7 20:08:14 2014
@@ -81,6 +81,12 @@ public class PigConfiguration {
*/
public static final String OPT_ACCUMULATOR = "opt.accumulator";
+
+ /**
+ * This key is used to configure auto parallelism in tez. Default is true.
+ */
+ public static final String TEZ_AUTO_PARALLELISM = "pig.tez.auto.parallelism";
+
/**
* This key is used to enable union optimization.
*/
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Jul 7 20:08:14 2014
@@ -690,7 +690,7 @@ public class JobControlCompiler{
}
}
- setOutputFormat((JobConf) nwJob.getConfiguration());
+ setOutputFormat(nwJob);
if (mapStores.size() + reduceStores.size() == 1) { // single store case
log.info("Setting up single store job");
@@ -1877,24 +1877,24 @@ public class JobControlCompiler{
}
}
- public static void setOutputFormat(JobConf conf) {
+ public static void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
// the OutputFormat we report to Hadoop is always PigOutputFormat which
// can be wrapped with LazyOutputFormat provided if it is supported by
// the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
- if ("true".equalsIgnoreCase(conf.get(PigConfiguration.PIG_OUTPUT_LAZY))) {
+ if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
try {
Class<?> clazz = PigContext
.resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
Method method = clazz.getMethod("setOutputFormatClass",
- JobConf.class, Class.class);
- method.invoke(null, conf, PigOutputFormat.class);
+ org.apache.hadoop.mapreduce.Job.class, Class.class);
+ method.invoke(null, job, PigOutputFormat.class);
} catch (Exception e) {
- conf.set("mapreduce.outputformat.class", PigOutputFormat.class.getName());
+ job.setOutputFormatClass(PigOutputFormat.class);
log.warn(PigConfiguration.PIG_OUTPUT_LAZY
+ " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
}
} else {
- conf.set("mapreduce.outputformat.class", PigOutputFormat.class.getName());
+ job.setOutputFormatClass(PigOutputFormat.class);
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Mon Jul 7 20:08:14 2014
@@ -38,6 +38,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -218,7 +219,13 @@ public class PlanPrinter<O extends Opera
inner_plans.addAll(joinPlans.values());
sb.append(planString(inner_plans));
}
- }
+ }
+ else if(node instanceof POLimit) {
+ PhysicalPlan limitPlan = ((POLimit)node).getLimitPlan();
+ if (limitPlan != null) {
+ sb.append(planString(limitPlan));
+ }
+ }
}
if (node instanceof POSplit) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java Mon Jul 7 20:08:14 2014
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map.Entry;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -107,9 +108,9 @@ public class MultiQueryOptimizerTez exte
addSubPlanPropertiesToParent(tezOp, splitee);
removeSplittee(getPlan(), tezOp, splitee);
- valueOutput.outputKeys.remove(splitee.getOperatorKey().toString());
+ valueOutput.removeOutputKey(splitee.getOperatorKey().toString());
}
- if (!valueOutput.outputKeys.isEmpty()) {
+ if (valueOutput.getTezOutputs().length > 0) {
// We still need valueOutput
PhysicalPlan phyPlan = new PhysicalPlan();
phyPlan.addAsLeaf(valueOutput);
@@ -145,6 +146,15 @@ public class MultiQueryOptimizerTez exte
input.replaceInput(splittee.getOperatorKey().toString(),
splitter.getOperatorKey().toString());
}
+ List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(succTezOperator.plan, POUserFunc.class);
+ for (POUserFunc userFunc : userFuncs) {
+ if (userFunc.getFunc() instanceof ReadScalarsTez) {
+ TezInput tezInput = (TezInput)userFunc.getFunc();
+ tezInput.replaceInput(splittee.getOperatorKey().toString(),
+ splitter.getOperatorKey().toString());
+ userFunc.getFuncSpec().setCtorArgs(tezInput.getTezInputs());
+ }
+ }
} catch (VisitorException e) {
throw new PlanException(e);
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java Mon Jul 7 20:08:14 2014
@@ -21,6 +21,8 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
@@ -34,8 +36,6 @@ import org.apache.pig.impl.plan.NodeIdGe
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.library.api.KeyValueWriter;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
-import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
/**
* POLocalRearrangeTez is used to write to a Tez OnFileSortedOutput
@@ -44,6 +44,7 @@ import org.apache.tez.runtime.library.ou
public class POLocalRearrangeTez extends POLocalRearrange implements TezOutput {
private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(POLocalRearrangeTez.class);
protected String outputKey;
protected transient KeyValueWriter writer;
@@ -108,28 +109,13 @@ public class POLocalRearrangeTez extends
@Override
public void attachOutputs(Map<String, LogicalOutput> outputs,
Configuration conf) throws ExecException {
- LogicalOutput logicalOut = outputs.get(outputKey);
- if (logicalOut == null) {
- throw new ExecException(
- "POLocalRearrangeTez only accepts OnFileSortedOutput (shuffle)"
- + " or OnFileUnorderedKVOutput(broadcast). key = "
- + outputKey + ", outputs = " + outputs
- + ", operator = " + this);
+ LogicalOutput output = outputs.get(outputKey);
+ if (output == null) {
+ throw new ExecException("Output to vertex " + outputKey + " is missing");
}
try {
- if (logicalOut instanceof OnFileSortedOutput) {
- writer = ((OnFileSortedOutput) logicalOut).getWriter();
- } else if (logicalOut instanceof OnFileUnorderedKVOutput) {
- writer = ((OnFileUnorderedKVOutput) logicalOut).getWriter();
- } else {
- throw new ExecException(
- "POLocalRearrangeTez only accepts OnFileSortedOutput (shuffle)"
- + " or OnFileUnorderedKVOutput(broadcast). key = "
- + outputKey + ", outputs = " + outputs
- + ", operator = " + this);
- }
- } catch (ExecException e) {
- throw e;
+ writer = (KeyValueWriter) output.getWriter();
+ LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
} catch (Exception e) {
throw new ExecException(e);
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java Mon Jul 7 20:08:14 2014
@@ -29,7 +29,6 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -37,21 +36,31 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
-public class POValueOutputTez extends PhysicalOperator implements TezOutput {
+public class POValueOutputTez extends PhysicalOperator implements TezOutput, TezTaskConfigurable {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(POValueOutputTez.class);
+
+ private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ private boolean taskIndexWithRecordIndexAsKey;
// TODO Change this to outputKey and write only once
// when a shared edge support is available in Tez
- protected Set<String> outputKeys = new HashSet<String>();
+ private Set<String> outputKeys = new HashSet<String>();
// TODO Change this to value only writer after implementing
// value only input output
- protected transient List<KeyValueWriter> writers;
+ private transient List<KeyValueWriter> writers;
+ private transient Object key;
+ private transient int taskIndex;
+ private transient long count;
+
public static EmptyWritable EMPTY_KEY = new EmptyWritable();
@@ -59,6 +68,24 @@ public class POValueOutputTez extends Ph
super(k);
}
+ public boolean isTaskIndexWithRecordIndexAsKey() {
+ return taskIndexWithRecordIndexAsKey;
+ }
+
+ /*
+ * Sets tuple with task index and record index as the key. For eg: (0,1), (0,2), etc
+ * Default is empty key
+ */
+ public void setTaskIndexWithRecordIndexAsKey(boolean taskIndexWithRecordIndexAsKey) {
+ this.taskIndexWithRecordIndexAsKey = taskIndexWithRecordIndexAsKey;
+ }
+
+ @Override
+ public void initialize(TezProcessorContext processorContext)
+ throws ExecException {
+ taskIndex = processorContext.getTaskIndex();
+ }
+
@Override
public String[] getTezOutputs() {
return outputKeys.toArray(new String[outputKeys.size()]);
@@ -89,6 +116,10 @@ public class POValueOutputTez extends Ph
throw new ExecException(e);
}
}
+ count = 0;
+ if (!taskIndexWithRecordIndexAsKey) {
+ key = EMPTY_KEY;
+ }
}
public void addOutputKey(String outputKey) {
@@ -117,7 +148,13 @@ public class POValueOutputTez extends Ph
}
for (KeyValueWriter writer : writers) {
try {
- writer.write(EMPTY_KEY, inp.result);
+ if (taskIndexWithRecordIndexAsKey) {
+ Tuple tuple = tupleFactory.newTuple(2);
+ tuple.set(0, taskIndex);
+ tuple.set(1, count++);
+ key = tuple;
+ }
+ writer.write(key, inp.result);
} catch (IOException e) {
throw new ExecException(e);
}
@@ -164,24 +201,4 @@ public class POValueOutputTez extends Ph
}
}
- //TODO: Remove after PIG-3775/TEZ-661
- public static class EmptyWritableComparator implements RawComparator<EmptyWritable> {
-
- @Override
- public int compare(EmptyWritable o1, EmptyWritable o2) {
- return 0;
- }
-
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- // 0 - Reverses the input order. 0 groups all values into
- // single record on reducer which is additional overhead.
- // -1, 1 - Returns input in random order. But comparator is invoked way more
- // times than 0. Compared to 1, -1 invokes comparator even more.
- // Going with 0 for now. After TEZ-661 this will not be required any more.
- return 0;
- }
-
- }
-
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Mon Jul 7 20:08:14 2014
@@ -91,7 +91,7 @@ public class PigProcessor implements Log
private Configuration conf;
private PigHadoopLogger pigHadoopLogger;
-
+
private TezProcessorContext processorContext;
public static String sampleVertex;
@@ -102,7 +102,7 @@ public class PigProcessor implements Log
public void initialize(TezProcessorContext processorContext)
throws Exception {
this.processorContext = processorContext;
-
+
// Reset any static variables to avoid conflict in container-reuse.
sampleVertex = null;
sampleMap = null;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java Mon Jul 7 20:08:14 2014
@@ -78,9 +78,10 @@ public class ReadScalarsTez extends Eval
KeyValueReader reader = (KeyValueReader) input.getReader();
if (reader.next()) {
t = (Tuple) reader.getCurrentValue();
+ String first = t == null ? null : t.toString();
if (reader.next()) {
String msg = "Scalar has more than one row in the output. "
- + "1st : " + t + ", 2nd :"
+ + "1st : " + first + ", 2nd :"
+ reader.getCurrentValue();
throw new ExecException(msg);
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Mon Jul 7 20:08:14 2014
@@ -42,6 +42,7 @@ import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
@@ -107,6 +108,7 @@ import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
@@ -260,12 +262,11 @@ public class TezCompiler extends PhyPlan
TezOperator from = phyToTezOpMap.get(store);
- FuncSpec newSpec = new FuncSpec(ReadScalarsTez.class.getName(), from.getOperatorKey().toString());
- userFunc.setFuncSpec(newSpec);
-
if (storeSeen.containsKey(store)) {
- storeSeen.get(store).outputKeys.add(tezOp.getOperatorKey().toString());
+ storeSeen.get(store).addOutputKey(tezOp.getOperatorKey().toString());
} else {
+ FuncSpec newSpec = new FuncSpec(ReadScalarsTez.class.getName(), from.getOperatorKey().toString());
+ userFunc.setFuncSpec(newSpec);
POValueOutputTez output = new POValueOutputTez(OperatorKey.genOpKey(scope));
output.addOutputKey(tezOp.getOperatorKey().toString());
from.plan.remove(from.plan.getOperator(store.getOperatorKey()));
@@ -771,28 +772,28 @@ public class TezCompiler extends PhyPlan
}
}
- // Need to add POLocalRearrange to the end of the last tezOp before we shuffle.
- POLocalRearrange lr = localRearrangeFactory.create();
- lr.setAlias(op.getAlias());
- curTezOp.plan.addAsLeaf(lr);
+ // Need to add POValueOutputTez to the end of the last tezOp
+ POValueOutputTez output = new POValueOutputTez(OperatorKey.genOpKey(scope));
+ output.setAlias(op.getAlias());
+ curTezOp.plan.addAsLeaf(output);
+ TezOperator prevOp = curTezOp;
- // Mark the start of a new TezOperator, connecting the inputs.
+ // Mark the start of a new TezOperator which will do the actual limiting with 1 task.
blocking();
- // As an optimization, don't do any sorting in the shuffle, as LIMIT does not make any
- // ordering guarantees.
- // TODO Enable this after TEZ-661
- // TezEdgeDescriptor edge = curTezOp.inEdges.get(lastOp.getOperatorKey());
- // edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
- // edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+ // Explicitly set the parallelism for the new vertex to 1.
+ curTezOp.setRequestedParallelism(1);
- // Then add a POPackage and a POForEach to the start of the new tezOp.
- POPackage pkg = getPackage(1, DataType.TUPLE);
- POForEach forEach = TezCompilerUtil.getForEachPlain(scope, nig);
- pkg.setAlias(op.getAlias());
- forEach.setAlias(op.getAlias());
- curTezOp.plan.add(pkg);
- curTezOp.plan.addAsLeaf(forEach);
+ output.addOutputKey(curTezOp.getOperatorKey().toString());
+ // LIMIT does not make any ordering guarantees and this is unsorted shuffle.
+ TezEdgeDescriptor edge = curTezOp.inEdges.get(prevOp.getOperatorKey());
+ TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.SCATTER_GATHER);
+
+ // Then add a POValueInputTez to the start of the new tezOp.
+ POValueInputTez input = new POValueInputTez(OperatorKey.genOpKey(scope));
+ input.setAlias(op.getAlias());
+ input.setInputKey(prevOp.getOperatorKey().toString());
+ curTezOp.plan.addAsLeaf(input);
if (!pigContext.inIllustrator) {
POLimit limitCopy = new POLimit(OperatorKey.genOpKey(scope));
@@ -804,8 +805,6 @@ public class TezCompiler extends PhyPlan
curTezOp.plan.addAsLeaf(op);
}
- // Explicitly set the parallelism for the new vertex to 1.
- curTezOp.setRequestedParallelism(1);
} catch (Exception e) {
int errCode = 2034;
String msg = "Error compiling operator " + op.getClass().getSimpleName();
@@ -1972,6 +1971,7 @@ public class TezCompiler extends PhyPlan
if (rp == -1) {
rp = pigContext.defaultParallel;
}
+
// if rp is still -1, let it be, TezParallelismEstimator will set it to an estimated rp
Pair<TezOperator, Integer> quantJobParallelismPair = getOrderbySamplingAggregationJob(op, rp);
TezOperator[] sortOpers = getSortJobs(prevOper, lr, op, keyType, fields);
@@ -2020,6 +2020,48 @@ public class TezCompiler extends PhyPlan
// curTezOp.isUDFComparatorUsed = true;
// }
quantJobParallelismPair.first.sortOperator = sortOpers[1];
+
+ // If Order by followed by Limit and parallelism of order by is not 1
+ // add a new vertex for Limit with parallelism 1.
+ // Equivalent of LimitAdjuster.java in MR
+ if (op.isLimited() && rp != 1) {
+ POValueOutputTez output = new POValueOutputTez(OperatorKey.genOpKey(scope));
+ output.setAlias(op.getAlias());
+ sortOpers[1].plan.addAsLeaf(output);
+
+ TezOperator limitOper = getTezOp();
+ tezPlan.add(limitOper);
+ curTezOp = limitOper;
+
+ // Explicitly set the parallelism for the new vertex to 1.
+ limitOper.setRequestedParallelism(1);
+
+ edge = TezCompilerUtil.connect(tezPlan, sortOpers[1], limitOper);
+ // LIMIT in this case should be ordered. So we output unordered with key as task index
+ // and on the input we use ShuffledMergedInput to do ordered merge to retain sorted order.
+ output.addOutputKey(limitOper.getOperatorKey().toString());
+ output.setTaskIndexWithRecordIndexAsKey(true);
+ edge = curTezOp.inEdges.get(sortOpers[1].getOperatorKey());
+ TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.SCATTER_GATHER);
+ // POValueOutputTez will write key (task index, record index) in
+ // sorted order. So using OnFileUnorderedKVOutput instead of OnFileSortedOutput.
+ // But input needs to be merged in sorter order and requires ShuffledMergedInput
+ edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+ edge.inputClassName = ShuffledMergedInput.class.getName();
+ edge.setIntermediateOutputKeyClass(TezCompilerUtil.TUPLE_CLASS);
+ edge.setIntermediateOutputKeyComparatorClass(PigTupleWritableComparator.class.getName());
+
+ // Then add a POValueInputTez to the start of the new tezOp followed by a LIMIT
+ POValueInputTez input = new POValueInputTez(OperatorKey.genOpKey(scope));
+ input.setAlias(op.getAlias());
+ input.setInputKey(sortOpers[1].getOperatorKey().toString());
+ curTezOp.plan.addAsLeaf(input);
+
+ POLimit limit = new POLimit(OperatorKey.genOpKey(scope));
+ limit.setLimit(op.getLimit());
+ curTezOp.plan.addAsLeaf(limit);
+ }
+
phyToTezOpMap.put(op, curTezOp);
}catch(Exception e){
int errCode = 2034;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon Jul 7 20:08:14 2014
@@ -47,6 +47,7 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBigDecimalWritableComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBigIntegerWritableComparator;
@@ -61,7 +62,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingPartitionWritableComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingTupleWritableComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBigDecimalRawComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBigIntegerRawComparator;
@@ -130,6 +130,7 @@ import org.apache.tez.mapreduce.input.MR
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
import org.apache.tez.runtime.library.input.SortedGroupedMergedInput;
/**
@@ -242,10 +243,15 @@ public class TezDagBuilder extends TezOp
EdgeProperty edgeProperty = newEdge(fromOp, toOp);
- String groupInputClass = edgeProperty.getDataMovementType().equals(
- DataMovementType.SCATTER_GATHER)
- ? SortedGroupedMergedInput.class.getName()
- : ConcatenatedMergedKeyValueInput.class.getName();
+ String groupInputClass = ConcatenatedMergedKeyValueInput.class.getName();
+
+ // In case of SCATTER_GATHER and ShuffledUnorderedKVInput it will still be
+ // ConcatenatedMergedKeyValueInput
+ if(edgeProperty.getDataMovementType().equals(DataMovementType.SCATTER_GATHER)
+ && edgeProperty.getEdgeDestination().getClassName().equals(ShuffledMergedInput.class.getName())) {
+ groupInputClass = SortedGroupedMergedInput.class.getName();
+ }
+
return new GroupInputEdge(from, to, edgeProperty,
new InputDescriptor(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
}
@@ -533,7 +539,7 @@ public class TezDagBuilder extends TezOp
}
}
}
- JobControlCompiler.setOutputFormat(payloadConf);
+ JobControlCompiler.setOutputFormat(job);
// set parent plan in all operators. currently the parent plan is really
// used only when POStream, POSplit are present in the plan
@@ -631,7 +637,7 @@ public class TezDagBuilder extends TezOp
}
}
sortOper = mPlan.getSuccessors(sampleBasedPartionerOper).get(0);
-
+
if (sortOper.getRequestedParallelism()==-1 && pc.defaultParallel==-1) {
// set estimate parallelism for order by/skewed join to sampler parallelism
// that include:
@@ -652,7 +658,7 @@ public class TezDagBuilder extends TezOp
// Take our assembled configuration and create a vertex
byte[] userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
procDesc.setUserPayload(userPayload);
-
+
Vertex vertex = new Vertex(tezOp.getOperatorKey().toString(), procDesc, parallelism,
isMap ? MRHelpers.getMapResource(globalConf) : MRHelpers.getReduceResource(globalConf));
@@ -728,7 +734,7 @@ public class TezDagBuilder extends TezOp
if (stores.size() > 0) {
new PigOutputFormat().checkOutputSpecs(job);
}
-
+
// Set the right VertexManagerPlugin
if (tezOp.getEstimatedParallelism() != -1) {
if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
@@ -1147,7 +1153,7 @@ public class TezDagBuilder extends TezOp
conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
comparatorClass);
}
-
+
public static int estimateParallelism(Job job, TezOperPlan tezPlan,
TezOperator tezOp) throws IOException {
Configuration conf = job.getConfiguration();
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Mon Jul 7 20:08:14 2014
@@ -41,19 +41,19 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.UnionOptimizer;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
import org.apache.pig.tools.pigstats.tez.TezStats;
import org.apache.pig.tools.pigstats.tez.TezTaskStats;
-import org.apache.pig.tools.pigstats.tez.TezScriptState;
import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
/**
* Main class that launches pig for Tez
@@ -67,8 +67,8 @@ public class TezLauncher extends Launche
@Override
public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception {
- if (pc.defaultParallel == -1 &&
- !Boolean.parseBoolean(pc.getProperties().getProperty("pig.tez.auto.parallelism", "true"))) {
+ if (pc.defaultParallel == -1 &&
+ !Boolean.parseBoolean(pc.getProperties().getProperty(PigConfiguration.TEZ_AUTO_PARALLELISM, "true"))) {
pc.defaultParallel = 1;
}
aggregateWarning = Boolean.parseBoolean(pc.getProperties().getProperty("aggregate.warning", "false"));
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java Mon Jul 7 20:08:14 2014
@@ -99,12 +99,20 @@ public class TezPrinter extends TezOpPla
@Override
public void visitTezOp(TezOperator tezOper) throws VisitorException {
- buf.append("Tez vertex " + tezOper.getOperatorKey().toString());
+ if (tezOper.isVertexGroup()) {
+ buf.append("Tez vertex group " + tezOper.getOperatorKey().toString());
+ } else {
+ buf.append("Tez vertex " + tezOper.getOperatorKey().toString());
+ }
List<TezOperator> succs = mPlan.getSuccessors(tezOper);
if (succs != null) {
buf.append("\t->\t");
for (TezOperator op : succs) {
- buf.append("Tez vertex " + op.getOperatorKey().toString()).append(",");
+ if (op.isVertexGroup()) {
+ buf.append("Tez vertex group " + op.getOperatorKey().toString()).append(",");
+ } else {
+ buf.append("Tez vertex " + op.getOperatorKey().toString()).append(",");
+ }
}
}
buf.append("\n");
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java Mon Jul 7 20:08:14 2014
@@ -22,7 +22,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -41,8 +40,8 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
+import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
/**
* Optimizes union by removing the intermediate union vertex and making the
@@ -207,15 +206,8 @@ public class UnionOptimizer extends TezO
if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
edge.dataMovementType = DataMovementType.SCATTER_GATHER;
edge.partitionerClass = RoundRobinPartitioner.class;
- edge.outputClassName = OnFileSortedOutput.class.getName();
- edge.inputClassName = ShuffledMergedInput.class.getName();
-
- for (TezOutput tezOutput : valueOnlyOutputs) {
- if (ArrayUtils.contains(tezOutput.getTezOutputs(), entry.getKey().toString())) {
- edge.setIntermediateOutputKeyComparatorClass(
- POValueOutputTez.EmptyWritableComparator.class.getName());
- }
- }
+ edge.outputClassName = OnFileUnorderedPartitionedKVOutput.class.getName();
+ edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
}
TezOperator vertexGroupOp = outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())];
for (OperatorKey predKey : vertexGroupOp.getVertexGroupMembers()) {
Modified: pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Mon Jul 7 20:08:14 2014
@@ -177,7 +177,8 @@ public class GruntParser extends PigScri
mNumFailedJobs++;
Exception exp = (js.getException() != null) ? js.getException()
: new ExecException(
- "Job " + js.getName() + " failed, hadoop does not return any error message",
+ "Job " + (js.getJobId() == null ? "" : js.getJobId() + " ") +
+ "failed, hadoop does not return any error message",
2244);
LogUtils.writeLog(exp,
mPigServer.getPigContext().getProperties().getProperty("pig.logfile"),
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java Mon Jul 7 20:08:14 2014
@@ -120,8 +120,8 @@ public class TezTaskStats extends JobSta
String filename = sto.getSFile().getFileName();
if (map != null) {
if (sto.isMultiStore()) {
- Long n = map.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP)
- .get(PigStatsUtil.getMultiStoreCounterName(sto));
+ Map<String, Long> msGroup = map.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+ Long n = msGroup == null ? null: msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto));
if (n != null) records = n;
} else if (map.get(TASK_COUNTER_GROUP) != null
&& map.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name()) != null) {
Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Mon Jul 7 20:08:14 2014
@@ -2056,12 +2056,11 @@ store d into ':OUTPATH:';\,
},
{
'num' => 2,
- 'ignore23' => 'The record limit pick is different in 23',
'pig' =>q\a = load ':INPATH:/singlefile/studentnulltab10k';
-b = order a by $0, $1;
+b = order a by $0, $1, $2;
c = limit b 100;
store c into ':OUTPATH:';\,
- 'sortArgs' => ['-t', ' ', '-k', '1,2'],
+ 'sortArgs' => ['-t', ' ', '-k', '1,3'],
},
{
# Make sure that limit higher than number of rows doesn't mess stuff up
@@ -2604,7 +2603,7 @@ register :FUNCPATH:/testudf.jar;
a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:int, gpa: double);
b = foreach a generate CONCAT('(', name), CONCAT((chararray)age, ' )');
store b into ':OUTPATH:.intermediate' using PigStorage(',');
-c = load ':OUTPATH:.intermediate' using org.apache.pig.test.udf.storefunc.DumpLoader();
+c = load ':OUTPATH:.intermediate' using DumpLoader();
store c into ':OUTPATH:';\,
'notmq' => 1,
Modified: pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java (original)
+++ pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java Mon Jul 7 20:08:14 2014
@@ -58,7 +58,9 @@ abstract public class MiniGenericCluster
if (INSTANCE == null) {
String execType = System.getProperty("test.exec.type");
if (execType == null) {
- throw new RuntimeException("test.exec.type is not set");
+ // Default to MR
+ System.setProperty("test.exec.type", EXECTYPE_MR);
+ return buildCluster(EXECTYPE_MR);
}
return buildCluster(execType);