You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/04/11 16:57:16 UTC
svn commit: r1586670 [1/2] - in /pig/branches/tez: ivy/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/
src/org/apache/pig/tools/pigstats/
src/org/apache/pig/tools/pigstats/mapred...
Author: rohini
Date: Fri Apr 11 14:57:15 2014
New Revision: 1586670
URL: http://svn.apache.org/r1586670
Log:
PIG-3842: Pig on tez job hangs when AM has a failure and Multiquery fixes (rohini)
Added:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5-OPTOFF.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-1.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld
Removed:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDAG.java
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC20.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC21.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC6.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC8.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC9.gld
Modified:
pig/branches/tez/ivy/libraries.properties
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java
pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java
pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld
pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/branches/tez/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/tez/ivy/libraries.properties?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/ivy/libraries.properties (original)
+++ pig/branches/tez/ivy/libraries.properties Fri Apr 11 14:57:15 2014
@@ -91,5 +91,5 @@ mockito.version=1.8.4
jansi.version=1.9
asm.version=3.3.1
snappy.version=1.1.0.1
-tez.version=0.3.0-incubating
+tez.version=0.4.0-incubating
parquet-pig-bundle.version=1.2.3
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java Fri Apr 11 14:57:15 2014
@@ -23,8 +23,6 @@ import java.util.Map.Entry;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -50,26 +48,15 @@ public class MultiQueryOptimizerTez exte
List<TezOperator> successors = getPlan().getSuccessors(tezOp);
List<TezOperator> succ_successors = new ArrayList<TezOperator>();
for (TezOperator successor : successors) {
- // don't want to be complicated by nested split
- if (successor.isSplitter()) {
- continue;
- }
+
// If has other dependency, don't merge into split,
if (getPlan().getPredecessors(successor).size()!=1) {
continue;
}
- boolean containsBlacklistedOp = false;
- for (PhysicalOperator op : successor.plan) {
- if (op instanceof POReservoirSample || op instanceof POPoissonSample) {
- containsBlacklistedOp = true;
- break;
- }
- }
- if (containsBlacklistedOp) {
- continue;
- }
+
// Detect diamond shape, we cannot merge it into split, since Tez
// does not handle double edge between vertexes
+ // TODO: PIG-3876 to handle this by writing to same edge
boolean sharedSucc = false;
if (getPlan().getSuccessors(successor)!=null) {
for (TezOperator succ_successor : getPlan().getSuccessors(successor)) {
@@ -102,8 +89,6 @@ public class MultiQueryOptimizerTez exte
tezOp.plan.remove(firstNodeLeaf);
singleSplitee.plan.remove(secondNodeRoot);
- //TODO remove filter all
-
tezOp.plan.merge(singleSplitee.plan);
tezOp.plan.connect(firstNodeLeafPred, secondNodeSucc);
@@ -113,6 +98,7 @@ public class MultiQueryOptimizerTez exte
} else {
POValueOutputTez valueOutput = (POValueOutputTez)tezOp.plan.getLeaves().get(0);
POSplit split = new POSplit(OperatorKey.genOpKey(valueOutput.getOperatorKey().getScope()));
+ split.setAlias(valueOutput.getAlias());
for (TezOperator splitee : splittees) {
PhysicalOperator spliteeRoot = splitee.plan.getRoots().get(0);
splitee.plan.remove(spliteeRoot);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java Fri Apr 11 14:57:15 2014
@@ -27,19 +27,24 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
/**
* POStoreTez is used to write to a Tez MROutput
*/
-public class POStoreTez extends POStore implements TezOutput {
+public class POStoreTez extends POStore implements TezOutput, TezTaskConfigurable {
private static final long serialVersionUID = 1L;
private transient MROutput output;
private transient KeyValueWriter writer;
private String outputKey;
+ private TezCounter outputRecordCounter;
public POStoreTez(OperatorKey k) {
super(k);
@@ -65,6 +70,24 @@ public class POStoreTez extends POStore
}
@Override
+ public void initialize(TezProcessorContext processorContext)
+ throws ExecException {
+ if (isMultiStore()) {
+ CounterGroup multiStoreGroup = processorContext.getCounters()
+ .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+ if (multiStoreGroup == null) {
+ processorContext.getCounters().addGroup(
+ MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
+ MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+ }
+ String name = MRPigStatsUtil.getMultiStoreCounterName(this);
+ if (name != null) {
+ outputRecordCounter = multiStoreGroup.addCounter(name, name, 0);
+ }
+ }
+ }
+
+ @Override
public void replaceOutput(String oldOutputKey, String newOutputKey) {
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Fri Apr 11 14:57:15 2014
@@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Fri Apr 11 14:57:15 2014
@@ -671,11 +671,7 @@ public class TezCompiler extends PhyPlan
tezOp.plan.addAsLeaf(lr);
TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, tezOp, curTezOp);
- if (tezOp.getSplitOperatorKey() != null) {
- inputKeys.add(tezOp.getSplitOperatorKey().toString());
- } else {
- inputKeys.add(tezOp.getOperatorKey().toString());
- }
+ inputKeys.add(tezOp.getOperatorKey().toString());
// Configure broadcast edges for replicated tables
edge.dataMovementType = DataMovementType.BROADCAST;
@@ -1971,9 +1967,11 @@ public class TezCompiler extends PhyPlan
splitOp.setSplitter(true);
phyToTezOpMap.put(op, splitOp);
output = new POValueOutputTez(OperatorKey.genOpKey(scope));
+ output.setAlias(op.getAlias());
splitOp.plan.addAsLeaf(output);
}
curTezOp = getTezOp();
+ curTezOp.setSplitParent(splitOp.getOperatorKey());
tezPlan.add(curTezOp);
output.addOutputKey(curTezOp.getOperatorKey().toString());
TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, splitOp, curTezOp);
@@ -1981,6 +1979,7 @@ public class TezCompiler extends PhyPlan
TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.ONE_TO_ONE);
curTezOp.setRequestedParallelismByReference(splitOp);
POValueInputTez input = new POValueInputTez(OperatorKey.genOpKey(scope));
+ input.setAlias(op.getAlias());
input.setInputKey(splitOp.getOperatorKey().toString());
curTezOp.plan.addAsLeaf(input);
} catch (Exception e) {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Fri Apr 11 14:57:15 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -92,6 +93,7 @@ import org.apache.pig.impl.util.ObjectSe
import org.apache.pig.impl.util.UDFContext;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -119,7 +121,7 @@ import org.apache.tez.runtime.library.in
public class TezDagBuilder extends TezOpPlanVisitor {
private static final Log log = LogFactory.getLog(TezJobControlCompiler.class);
- private TezDAG dag;
+ private DAG dag;
private Map<String, LocalResource> localResources;
private PigContext pc;
private Configuration globalConf;
@@ -127,7 +129,7 @@ public class TezDagBuilder extends TezOp
private String scope;
private NodeIdGenerator nig;
- public TezDagBuilder(PigContext pc, TezOperPlan plan, TezDAG dag,
+ public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
Map<String, LocalResource> localResources) {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
this.pc = pc;
@@ -136,6 +138,14 @@ public class TezDagBuilder extends TezOp
this.dag = dag;
this.scope = plan.getRoots().get(0).getOperatorKey().getScope();
this.nig = NodeIdGenerator.getGenerator();
+
+ try {
+ // Add credentials from binary token file and get tokens for namenodes
+ // specified in mapreduce.job.hdfs-servers
+ SecurityHelper.populateTokenCache(globalConf, dag.getCredentials());
+ } catch (IOException e) {
+ throw new RuntimeException("Error while fetching delegation tokens", e);
+ }
}
@Override
@@ -358,12 +368,17 @@ public class TezDagBuilder extends TezOp
ProcessorDescriptor procDesc = new ProcessorDescriptor(
tezOp.getProcessorName());
+ // Pass physical plans to vertex as user payload.
+ JobConf payloadConf = new JobConf(ConfigurationUtil.toConfiguration(pc.getProperties(), false));
+
+ // We do this so that dag.getCredentials(), job.getCredentials(),
+ // job.getConfiguration().getCredentials() all reference the same Credentials object
+ // Unfortunately there is no setCredentials() on Job
+ payloadConf.setCredentials(dag.getCredentials());
// We won't actually use this job, but we need it to talk with the Load Store funcs
@SuppressWarnings("deprecation")
- Job job = new Job(ConfigurationUtil.toConfiguration(pc.getProperties(), false));
-
- // Pass physical plans to vertex as user payload.
- Configuration payloadConf = job.getConfiguration();
+ Job job = new Job(payloadConf);
+ payloadConf = (JobConf) job.getConfiguration();
if (tezOp.sampleOperator != null) {
payloadConf.set("pig.sampleVertex", tezOp.sampleOperator.getOperatorKey().toString());
@@ -574,19 +589,13 @@ public class TezDagBuilder extends TezOp
storeOutDescriptor, MROutputCommitter.class);
}
- if (stores.size() > 0) {
- new PigOutputFormat().checkOutputSpecs(job);
- }
-
// LoadFunc and StoreFunc add delegation tokens to Job Credentials in
// setLocation and setStoreLocation respectively. For eg: HBaseStorage
// InputFormat add delegation token in getSplits and OutputFormat in
// checkOutputSpecs. For eg: FileInputFormat and FileOutputFormat
- dag.getCredentials().addAll(job.getCredentials());
-
- // Add credentials from binary token file and get tokens for namenodes
- // specified in mapreduce.job.hdfs-servers
- SecurityHelper.populateTokenCache(job.getConfiguration(), dag.getCredentials());
+ if (stores.size() > 0) {
+ new PigOutputFormat().checkOutputSpecs(job);
+ }
return vertex;
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Fri Apr 11 14:57:15 2014
@@ -19,9 +19,11 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -32,6 +34,7 @@ import org.apache.tez.client.TezSession;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
@@ -51,15 +54,16 @@ public class TezJob extends ControlledJo
private EnumSet<StatusGetOpts> statusGetOpts;
private DAGStatus dagStatus;
private Configuration conf;
- private TezDAG dag;
+ private DAG dag;
private DAGClient dagClient;
private Map<String, LocalResource> requestAMResources;
private TezSession tezSession;
private boolean reuseSession;
private TezCounters dagCounters;
- private Map<String, Map<String, Long>> vertexCounters;
+ // Vertex, CounterGroup, Counter, Value
+ private Map<String, Map<String, Map<String, Long>>> vertexCounters;
- public TezJob(TezConfiguration conf, TezDAG dag, Map<String, LocalResource> requestAMResources)
+ public TezJob(TezConfiguration conf, DAG dag, Map<String, LocalResource> requestAMResources)
throws IOException {
super(conf);
this.conf = conf;
@@ -70,7 +74,7 @@ public class TezJob extends ControlledJo
this.vertexCounters = Maps.newHashMap();
}
- public TezDAG getDag() {
+ public DAG getDag() {
return dag;
}
@@ -82,8 +86,12 @@ public class TezJob extends ControlledJo
return dagCounters;
}
- public Map<String, Long> getVertexCounters(String name) {
- return vertexCounters.get(name);
+ public Map<String, Map<String, Long>> getVertexCounters(String group) {
+ return vertexCounters.get(group);
+ }
+
+ public Map<String, Long> getVertexCounters(String group, String name) {
+ return vertexCounters.get(group).get(name);
}
@Override
@@ -149,17 +157,20 @@ public class TezJob extends ControlledJo
String name = v.getVertexName();
try {
VertexStatus s = dagClient.getVertexStatus(name, statusGetOpts);
- Map<String, Long> cntMap = Maps.newHashMap();
TezCounters counters = s.getVertexCounters();
+ Map<String, Map<String, Long>> grpCounters = Maps.newHashMap();
Iterator<CounterGroup> grpIt = counters.iterator();
while (grpIt.hasNext()) {
- Iterator<TezCounter> cntIt = grpIt.next().iterator();
+ CounterGroup grp = grpIt.next();
+ Iterator<TezCounter> cntIt = grp.iterator();
+ Map<String, Long> cntMap = Maps.newHashMap();
while (cntIt.hasNext()) {
TezCounter cnt = cntIt.next();
cntMap.put(cnt.getName(), cnt.getValue());
}
+ grpCounters.put(grp.getName(), cntMap);
}
- vertexCounters.put(name, cntMap);
+ vertexCounters.put(name, grpCounters);
} catch (Exception e) {
// Don't fail the job even if vertex counters couldn't
// be retrieved.
@@ -201,5 +212,24 @@ public class TezJob extends ControlledJo
}
return jobState;
}
+
+ @Override
+ public synchronized String getMessage() {
+ return super.getMessage() + "\n " + getDiagnostics();
+ }
+
+ private String getDiagnostics() {
+ try {
+ if (dagClient != null && dagStatus == null) {
+ dagStatus = dagClient.getDAGStatus(new HashSet<StatusGetOpts>());
+ }
+ if (dagStatus != null) {
+ return StringUtils.join(dagStatus.getDiagnostics(), "\n");
+ }
+ } catch (Exception e) {
+ //Ignore
+ }
+ return "";
+ }
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java Fri Apr 11 14:57:15 2014
@@ -17,26 +17,29 @@
*/
package org.apache.pig.backend.hadoop.executionengine.tez;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.hadoop23.PigJobControl;
import org.apache.pig.tools.pigstats.tez.TezStats;
public class TezJobControl extends PigJobControl {
-
+
+ private static final Log LOG = LogFactory.getLog(TezJobControl.class);
private TezJobNotifier notifier = null;
private TezStats stats = null;
public TezJobControl(String groupName, int timeToSleep) {
super(groupName, timeToSleep);
}
-
+
public void setJobNotifier(TezJobNotifier notifier) {
this.notifier = notifier;
}
-
+
public void setTezStats(TezStats stats) {
this.stats = stats;
}
-
+
@Override
public void run() {
try {
@@ -57,13 +60,21 @@ public class TezJobControl extends PigJo
if (stats!=null) {
stats.accumulateStats(this);
}
- if (notifier!=null) {
+ if (notifier != null) {
notifier.complete(this);
+ notifier = null;
}
}
} catch (Exception e) {
// should not happen
+ LOG.error("Unexpected error", e);
throw new RuntimeException(e);
+ } finally {
+ // Try notify if not notified. Else process will hang.
+ if (notifier != null) {
+ notifier.complete(this);
+ notifier = null;
+ }
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java Fri Apr 11 14:57:15 2014
@@ -24,11 +24,13 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.impl.PigContext;
+import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
/**
@@ -47,10 +49,11 @@ public class TezJobControlCompiler {
this.tezConf = new TezConfiguration(conf);
}
- public TezDAG buildDAG(TezOperPlan tezPlan, Map<String, LocalResource> localResources)
+ public DAG buildDAG(TezOperPlan tezPlan, Map<String, LocalResource> localResources)
throws IOException, YarnException {
String jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, "pig");
- TezDAG tezDag = new TezDAG(jobName);
+ DAG tezDag = new DAG(jobName);
+ tezDag.setCredentials(new Credentials());
TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlan, tezDag, localResources);
dagBuilder.visit();
return tezDag;
@@ -98,7 +101,7 @@ public class TezJobControlCompiler {
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
localResources.putAll(planContainer.getLocalResources());
localResources.putAll(tezPlan.getExtraResources());
- TezDAG tezDag = buildDAG(tezPlan, localResources);
+ DAG tezDag = buildDAG(tezPlan, localResources);
return new TezJob(tezConf, tezDag, localResources);
} catch (Exception e) {
int errCode = 2017;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Fri Apr 11 14:57:15 2014
@@ -32,6 +32,7 @@ import org.apache.pig.backend.BackendExc
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.Launcher;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.NoopFilterRemover;
import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.UnionOptimizer;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
@@ -151,6 +152,9 @@ public class TezLauncher extends Launche
TezCompiler comp = new TezCompiler(php, pc, tezResourceManager);
TezOperPlan tezPlan = comp.compile();
+ NoopFilterRemover filter = new NoopFilterRemover(tezPlan);
+ filter.visit();
+
boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
PigConfiguration.PROP_NO_COMBINER, "false"));
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Fri Apr 11 14:57:15 2014
@@ -142,7 +142,7 @@ public class TezOperPlan extends Operato
@Override
public boolean disconnect(TezOperator from, TezOperator to) {
from.outEdges.remove(to.getOperatorKey());
- to.outEdges.remove(from.getOperatorKey());
+ to.inEdges.remove(from.getOperatorKey());
return super.disconnect(from, to);
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Fri Apr 11 14:57:15 2014
@@ -70,14 +70,12 @@ public class TezOperator extends Operato
//int requestedMemory = 1024;
//int requestedCpu = 1;
- // Presence indicates that this TezOper is sub-plan of a POSplit.
- // This is in-case when multi-query is turned on
- // Only POStore or POLocalRearrange leaf can be a sub-plan of POSplit
- private OperatorKey splitOperatorKey = null;
-
// This indicates that this TezOper is a split operator
private boolean splitter;
+ // This indicates that this TezOper has POSplit as a predecessor.
+ private OperatorKey splitParent = null;
+
// Indicates that the plan creation is complete
boolean closed = false;
@@ -177,12 +175,12 @@ public class TezOperator extends Operato
this.requestedParallelism = oper.requestedParallelism;
}
- public OperatorKey getSplitOperatorKey() {
- return splitOperatorKey;
+ public OperatorKey getSplitParent() {
+ return splitParent;
}
- public void setSplitOperatorKey(OperatorKey splitOperatorKey) {
- this.splitOperatorKey = splitOperatorKey;
+ public void setSplitParent(OperatorKey splitParent) {
+ this.splitParent = splitParent;
}
public void setSplitter(boolean spl) {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Fri Apr 11 14:57:15 2014
@@ -86,6 +86,8 @@ public class TezSessionManager {
while (true) {
TezSessionStatus status = tezSession.getSessionStatus();
if (status.equals(TezSessionStatus.SHUTDOWN)) {
+ //TODO: TEZ-1017 Show diagnostics message
+ //log.error("TezSession has already shutdown. Diagnostics: " + tezSession.getSessionDiagnostics());
throw new RuntimeException("TezSession has already shutdown");
}
if (status.equals(TezSessionStatus.READY)) {
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java?rev=1586670&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java Fri Apr 11 14:57:15 2014
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.optimizers;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * For historical reasons splits will always produce filters that pass
+ * everything through unchanged. This optimizer removes these.
+ *
+ * The condition we look for is POFilters with a constant boolean
+ * (true) expression as it's plan.
+ */
+public class NoopFilterRemover extends TezOpPlanVisitor {
+
+ private static Log LOG = LogFactory.getLog(NoopFilterRemover.class);
+
+ public NoopFilterRemover(TezOperPlan plan) {
+ super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+ }
+
+ @Override
+ public void visitTezOp(TezOperator tezOp) throws VisitorException {
+ if (tezOp.getSplitParent() == null) {
+ return;
+ }
+ try {
+ List<POFilter> filters = PlanHelper.getPhysicalOperators(tezOp.plan, POFilter.class);
+ for (POFilter filter : filters) {
+ PhysicalPlan filterPlan = filter.getPlan();
+ if (filterPlan.size() == 1) {
+ PhysicalOperator fp = filterPlan.getRoots().get(0);
+ if (fp instanceof ConstantExpression) {
+ ConstantExpression exp = (ConstantExpression)fp;
+ Object value = exp.getValue();
+ if (value instanceof Boolean) {
+ Boolean filterValue = (Boolean)value;
+ if (filterValue) {
+ tezOp.plan.removeAndReconnect(filter);
+ }
+ }
+ }
+ }
+ }
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+}
Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java Fri Apr 11 14:57:15 2014
@@ -18,19 +18,27 @@
package org.apache.pig.tools.pigstats;
+import java.io.FileNotFoundException;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Counters;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigStatsOutputSizeReader;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.PlanVisitor;
-import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
/**
@@ -41,6 +49,8 @@ import org.apache.pig.tools.pigstats.Pig
@InterfaceStability.Evolving
public abstract class JobStats extends Operator {
+ private static final Log LOG = LogFactory.getLog(JobStats.class);
+
public static final String ALIAS = "JobStatistics:alias";
public static final String ALIAS_LOCATION = "JobStatistics:alias_location";
public static final String FEATURE = "JobStatistics:feature";
@@ -321,4 +331,37 @@ public abstract class JobStats extends O
@Deprecated
abstract public Map<String, Long> getMultiInputCounters();
+ /**
+ * Looks up the output size reader from OUTPUT_SIZE_READER_KEY and invokes
+ * it to get the size of output. If OUTPUT_SIZE_READER_KEY is not set,
+ * defaults to FileBasedOutputSizeReader.
+ * @param sto POStore
+ * @param conf configuration
+ */
+ public static long getOutputSize(POStore sto, Configuration conf) {
+ PigStatsOutputSizeReader reader = null;
+ String readerNames = conf.get(
+ PigStatsOutputSizeReader.OUTPUT_SIZE_READER_KEY,
+ FileBasedOutputSizeReader.class.getCanonicalName());
+
+ for (String className : readerNames.split(",")) {
+ reader = (PigStatsOutputSizeReader) PigContext.instantiateFuncFromSpec(className);
+ if (reader.supports(sto)) {
+ LOG.info("using output size reader: " + className);
+ try {
+ return reader.getOutputSize(sto, conf);
+ } catch (FileNotFoundException e) {
+ LOG.warn("unable to find the output file", e);
+ return -1;
+ } catch (IOException e) {
+ LOG.warn("unable to get byte written of the job", e);
+ return -1;
+ }
+ }
+ }
+
+ LOG.warn("unable to find an output size reader");
+ return -1;
+ }
+
}
Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Fri Apr 11 14:57:15 2014
@@ -23,8 +23,8 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
-import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
/**
@@ -45,31 +45,12 @@ public class PigStatsUtil {
public static final String HDFS_BYTES_READ
= "HDFS_BYTES_READ";
- /**
- * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_INPUT_RECORD_COUNTER} instead.
- */
- @Deprecated
public static final String MULTI_INPUTS_RECORD_COUNTER
= "Input records from ";
-
- /**
- * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_INPUT_COUNTER_GROUP} instead.
- */
- @Deprecated
public static final String MULTI_INPUTS_COUNTER_GROUP
= "MultiInputCounters";
-
- /**
- * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_STORE_RECORD_COUNTER} instead.
- */
- @Deprecated
public static final String MULTI_STORE_RECORD_COUNTER
= "Output records in ";
-
- /**
- * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_STORE_COUNTER_GROUP} instead.
- */
- @Deprecated
public static final String MULTI_STORE_COUNTER_GROUP
= "MultiStoreCounters";
@@ -149,4 +130,56 @@ public class PigStatsUtil {
PigStats.start(new EmbeddedPigStats(statsMap));
}
+ /**
+ * Returns the counter name for the given input file name
+ *
+ * @param fname the input file name
+ * @return the counter name
+ */
+ public static String getMultiInputsCounterName(String fname, int index) {
+ String shortName = getShortName(fname);
+ return (shortName == null) ? null
+ : MULTI_INPUTS_RECORD_COUNTER + "_" + index + "_" + shortName;
+ }
+
+ /**
+ * Returns the counter name for the given {@link POStore}
+ *
+ * @param store the POStore
+ * @return the counter name
+ */
+ public static String getMultiStoreCounterName(POStore store) {
+ String shortName = getShortName(store.getSFile().getFileName());
+ return (shortName == null) ? null
+ : MULTI_STORE_RECORD_COUNTER + "_" + store.getIndex() + "_" + shortName;
+ }
+
+ // Restrict total string size of a counter name to 64 characters.
+ // Leave 24 characters for prefix string.
+ private static final int COUNTER_NAME_LIMIT = 40;
+ private static final String SEPARATOR = "/";
+ private static final String SEMICOLON = ";";
+
+ private static String getShortName(String uri) {
+ int scolon = uri.indexOf(SEMICOLON);
+ int slash;
+ if (scolon!=-1) {
+ slash = uri.lastIndexOf(SEPARATOR, scolon);
+ } else {
+ slash = uri.lastIndexOf(SEPARATOR);
+ }
+ String shortName = null;
+ if (scolon==-1) {
+ shortName = uri.substring(slash+1);
+ }
+ if (slash < scolon) {
+ shortName = uri.substring(slash+1, scolon);
+ }
+ if (shortName != null && shortName.length() > COUNTER_NAME_LIMIT) {
+ shortName = shortName.substring(shortName.length()
+ - COUNTER_NAME_LIMIT);
+ }
+ return shortName;
+ }
+
}
Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Fri Apr 11 14:57:15 2014
@@ -18,7 +18,6 @@
package org.apache.pig.tools.pigstats.mapreduce;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -31,24 +30,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskReport;
-import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.pig.PigCounters;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigStatsOutputSizeReader;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
-import org.apache.pig.newplan.PlanVisitor;
-import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.newplan.PlanVisitor;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
@@ -118,58 +114,80 @@ public final class MRJobStats extends Jo
private Counters counters = null;
+ @Override
public String getJobId() {
return (jobId == null) ? null : jobId.toString();
}
+ @Override
public int getNumberMaps() { return numberMaps; }
+ @Override
public int getNumberReduces() { return numberReduces; }
+ @Override
public long getMaxMapTime() { return maxMapTime; }
+ @Override
public long getMinMapTime() { return minMapTime; }
+ @Override
public long getAvgMapTime() { return avgMapTime; }
+ @Override
public long getMaxReduceTime() { return maxReduceTime; }
+ @Override
public long getMinReduceTime() { return minReduceTime; }
+ @Override
public long getAvgReduceTime() { return avgReduceTime; }
+ @Override
public long getMapInputRecords() { return mapInputRecords; }
+ @Override
public long getMapOutputRecords() { return mapOutputRecords; }
+ @Override
public long getReduceInputRecords() { return reduceInputRecords; }
+ @Override
public long getReduceOutputRecords() { return reduceOutputRecords; }
+ @Override
public long getSMMSpillCount() { return spillCount; }
+ @Override
public long getProactiveSpillCountObjects() { return activeSpillCountObj; }
+ @Override
public long getProactiveSpillCountRecs() { return activeSpillCountRecs; }
+ @Override
public Counters getHadoopCounters() { return counters; }
+ @Override
public Map<String, Long> getMultiStoreCounters() {
return Collections.unmodifiableMap(multiStoreCounters);
}
+ @Override
public Map<String, Long> getMultiInputCounters() {
return Collections.unmodifiableMap(multiInputCounters);
}
+ @Override
public String getAlias() {
return (String)getAnnotation(ALIAS);
}
+ @Override
public String getAliasLocation() {
return (String)getAnnotation(ALIAS_LOCATION);
}
+ @Override
public String getFeature() {
return (String)getAnnotation(FEATURE);
}
@@ -219,6 +237,7 @@ public final class MRJobStats extends Jo
medianReduceTime = median;
}
+ @Override
public String getDisplayString(boolean local) {
StringBuilder sb = new StringBuilder();
String id = (jobId == null) ? "N/A" : jobId.toString();
@@ -420,39 +439,6 @@ public final class MRJobStats extends Jo
}
}
- /**
- * Looks up the output size reader from OUTPUT_SIZE_READER_KEY and invokes
- * it to get the size of output. If OUTPUT_SIZE_READER_KEY is not set,
- * defaults to FileBasedOutputSizeReader.
- * @param sto POStore
- * @param conf configuration
- */
- static long getOutputSize(POStore sto, Configuration conf) {
- PigStatsOutputSizeReader reader = null;
- String readerNames = conf.get(
- PigStatsOutputSizeReader.OUTPUT_SIZE_READER_KEY,
- FileBasedOutputSizeReader.class.getCanonicalName());
-
- for (String className : readerNames.split(",")) {
- reader = (PigStatsOutputSizeReader) PigContext.instantiateFuncFromSpec(className);
- if (reader.supports(sto)) {
- LOG.info("using output size reader: " + className);
- try {
- return reader.getOutputSize(sto, conf);
- } catch (FileNotFoundException e) {
- LOG.warn("unable to find the output file", e);
- return -1;
- } catch (IOException e) {
- LOG.warn("unable to get byte written of the job", e);
- return -1;
- }
- }
- }
-
- LOG.warn("unable to find an output size reader");
- return -1;
- }
-
private void addOneOutputStats(POStore sto) {
long records = -1;
if (sto.isMultiStore()) {
Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java Fri Apr 11 14:57:15 2014
@@ -20,6 +20,7 @@ package org.apache.pig.tools.pigstats.ma
import java.io.IOException;
import java.util.Map;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.Counters;
@@ -32,16 +33,13 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.classification.InterfaceAudience.Private;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.tools.pigstats.PigStats.JobGraph;
-import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
-import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
+import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.PigStatsUtil;
-import org.apache.pig.tools.pigstats.JobStats;
@@ -54,21 +52,9 @@ public class MRPigStatsUtil extends PigS
= "org.apache.hadoop.mapred.Task$Counter";
public static final String FS_COUNTER_GROUP
= HadoopShims.getFsCounterGroupName();
- public static final String MULTI_INPUTS_RECORD_COUNTER
- = "Input records from ";
- public static final String MULTI_INPUTS_COUNTER_GROUP
- = "MultiInputCounters";
- public static final String MULTI_STORE_RECORD_COUNTER
- = "Output records in ";
- public static final String MULTI_STORE_COUNTER_GROUP
- = "MultiStoreCounters";
private static final Log LOG = LogFactory.getLog(MRPigStatsUtil.class);
- // Restrict total string size of a counter name to 64 characters.
- // Leave 24 characters for prefix string.
- private static final int COUNTER_NAME_LIMIT = 40;
-
/**
* Returns the count for the given counter name in the counter group
* 'MultiStoreCounters'
@@ -95,55 +81,6 @@ public class MRPigStatsUtil extends PigS
}
/**
- * Returns the counter name for the given {@link POStore}
- *
- * @param store the POStore
- * @return the counter name
- */
- public static String getMultiStoreCounterName(POStore store) {
- String shortName = getShortName(store.getSFile().getFileName());
- return (shortName == null) ? null
- : MULTI_STORE_RECORD_COUNTER + "_" + store.getIndex() + "_" + shortName;
- }
-
- /**
- * Returns the counter name for the given input file name
- *
- * @param fname the input file name
- * @return the counter name
- */
- public static String getMultiInputsCounterName(String fname, int index) {
- String shortName = getShortName(fname);
- return (shortName == null) ? null
- : MULTI_INPUTS_RECORD_COUNTER + "_" + index + "_" + shortName;
- }
-
- private static final String SEPARATOR = "/";
- private static final String SEMICOLON = ";";
-
- private static String getShortName(String uri) {
- int scolon = uri.indexOf(SEMICOLON);
- int slash;
- if (scolon!=-1) {
- slash = uri.lastIndexOf(SEPARATOR, scolon);
- } else {
- slash = uri.lastIndexOf(SEPARATOR);
- }
- String shortName = null;
- if (scolon==-1) {
- shortName = uri.substring(slash+1);
- }
- if (slash < scolon) {
- shortName = uri.substring(slash+1, scolon);
- }
- if (shortName != null && shortName.length() > COUNTER_NAME_LIMIT) {
- shortName = shortName.substring(shortName.length()
- - COUNTER_NAME_LIMIT);
- }
- return shortName;
- }
-
- /**
* Starts collecting statistics for the given MR plan
*
* @param pc the Pig context
Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java Fri Apr 11 14:57:15 2014
@@ -23,6 +23,7 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -44,6 +45,9 @@ import org.apache.pig.tools.pigstats.Out
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
@@ -55,12 +59,9 @@ import com.google.common.collect.Maps;
public class TezStats extends PigStats {
private static final Log LOG = LogFactory.getLog(TezStats.class);
- public static final String DAG_COUNTER =
- "org.apache.tez.common.counters.DAGCounter";
- public static final String FS_COUNTER =
- "org.apache.tez.common.counters.FileSystemCounter";
- public static final String TASK_COUNTER =
- "org.apache.tez.common.counters.TaskCounter";
+ public static final String DAG_COUNTER_GROUP = DAGCounter.class.getName();
+ public static final String FS_COUNTER_GROUP = FileSystemCounter.class.getName();
+ public static final String TASK_COUNTER_GROUP = TaskCounter.class.getName();
private List<String> dagStatsStrings;
private Map<String, TezTaskStats> tezOpVertexMap;
@@ -152,7 +153,9 @@ public class TezStats extends PigStats {
String[] lines = errorMessage.split("\n");
for (int i = 0; i < lines.length; i++) {
String s = lines[i].trim();
- sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "ErrorMessage" : "", s));
+ if (i == 0 || !StringUtils.isEmpty(s)) {
+ sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "ErrorMessage" : "", s));
+ }
}
sb.append("\n");
}
@@ -163,15 +166,14 @@ public class TezStats extends PigStats {
sb.append("\n");
}
- List<InputStats> is = getInputStats();
- for (int i = 0; i < is.size(); i++) {
- String s = is.get(i).getDisplayString(isLocal).trim();
- sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "Input(s)" : "", s));
- }
- List<OutputStats> os = getOutputStats();
- for (int i = 0; i < os.size(); i++) {
- String s = os.get(i).getDisplayString(isLocal).trim();
- sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "Output(s)" : "", s));
+ sb.append("Input(s):\n");
+ for (InputStats is : getInputStats()) {
+ sb.append(is.getDisplayString(isLocal).trim()).append("\n");
+ }
+ sb.append("\n");
+ sb.append("Output(s):\n");
+ for (OutputStats os : getOutputStats()) {
+ sb.append(os.getDisplayString(isLocal).trim()).append("\n");
}
LOG.info("Script Statistics:\n" + sb.toString());
}
@@ -203,33 +205,42 @@ public class TezStats extends PigStats {
}
}
if (!succeeded) {
- errorMessage = tezJob.getMessage();
+ errorMessage = tezJob.getMessage().trim();
}
}
private void addVertexStats(String tezOpName, Configuration conf, boolean succeeded,
- Map<String, Long> counters) {
+ Map<String, Map<String, Long>> map) {
TezTaskStats stats = tezOpVertexMap.get(tezOpName);
stats.setConf(conf);
stats.setId(tezOpName);
stats.setSuccessful(succeeded);
- stats.addInputStatistics(counters);
- stats.addOutputStatistics(counters);
+ if (map == null) {
+ if (stats.hasLoadOrStore()) {
+ LOG.warn("Unable to get input(s)/output(s) of the job");
+ }
+ } else {
+ stats.addInputStatistics(map);
+ stats.addOutputStatistics(map);
+ }
}
private static String getDisplayString(TezJob tezJob) {
StringBuilder sb = new StringBuilder();
TezCounters cnt = tezJob.getDagCounters();
+ if (cnt == null) {
+ return "";
+ }
sb.append(String.format("%1$20s: %2$-100s%n", "JobId",
tezJob.getJobID()));
- CounterGroup dagGrp = cnt.getGroup(DAG_COUNTER);
+ CounterGroup dagGrp = cnt.getGroup(DAG_COUNTER_GROUP);
TezCounter numTasks = dagGrp.findCounter("TOTAL_LAUNCHED_TASKS");
sb.append(String.format("%1$20s: %2$-100s%n", "TotalLaunchedTasks",
numTasks.getValue()));
- CounterGroup fsGrp = cnt.getGroup(FS_COUNTER);
+ CounterGroup fsGrp = cnt.getGroup(FS_COUNTER_GROUP);
TezCounter bytesRead = fsGrp.findCounter("FILE_BYTES_READ");
TezCounter bytesWritten = fsGrp.findCounter("FILE_BYTES_WRITTEN");
sb.append(String.format("%1$20s: %2$-100s%n", "FileBytesRead",
Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java Fri Apr 11 14:57:15 2014
@@ -1,5 +1,8 @@
package org.apache.pig.tools.pigstats.tez;
+import static org.apache.pig.tools.pigstats.tez.TezStats.FS_COUNTER_GROUP;
+import static org.apache.pig.tools.pigstats.tez.TezStats.TASK_COUNTER_GROUP;
+
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -20,12 +23,12 @@ import org.apache.pig.tools.pigstats.Out
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.tez.common.counters.TaskCounter;
public class TezTaskStats extends JobStats {
private static final Log LOG = LogFactory.getLog(TezTaskStats.class);
private String vertexName;
- private Configuration conf;
private List<POStore> stores = null;
private List<FileSpec> loads = null;
@@ -63,6 +66,7 @@ public class TezTaskStats extends JobSta
return sb.toString();
}
+ @Override
@SuppressWarnings("unchecked")
public void setConf(Configuration conf) {
super.setConf(conf);
@@ -78,9 +82,8 @@ public class TezTaskStats extends JobSta
}
}
- public void addInputStatistics(Map<String, Long> counters) {
- if (inputs == null) {
- LOG.warn("Unable to get inputs of the job");
+ public void addInputStatistics(Map<String, Map<String, Long>> map) {
+ if (loads == null) {
return;
}
@@ -88,11 +91,19 @@ public class TezTaskStats extends JobSta
long records = -1;
long hdfsBytesRead = -1;
String filename = fs.getFileName();
- if (counters.get(PigStatsUtil.MAP_INPUT_RECORDS) != null) {
- records = counters.get(PigStatsUtil.MAP_INPUT_RECORDS);
+ Map<String, Long> taskCounter = map.get(TASK_COUNTER_GROUP);
+ if (taskCounter != null) {
+ //TaskCounter.INPUT_RECORDS_PROCESSED.name()
+ if (taskCounter.get("INPUT_RECORDS_PROCESSED") != null) {
+ records = taskCounter.get("INPUT_RECORDS_PROCESSED");
+ } else if (taskCounter.get(PigStatsUtil.MAP_INPUT_RECORDS) != null) {
+ // Tez 0.3 has MAP_INPUT_RECORDS TODO: Remove after we move away from Tez 0.3
+ records = taskCounter.get(PigStatsUtil.MAP_INPUT_RECORDS);
+ }
}
- if (counters.get(PigStatsUtil.HDFS_BYTES_READ) != null) {
- hdfsBytesRead = counters.get(PigStatsUtil.HDFS_BYTES_READ);
+ if (map.get(FS_COUNTER_GROUP) !=null &&
+ map.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
+ hdfsBytesRead = map.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ);
}
InputStats is = new InputStats(filename, hdfsBytesRead,
records, (state == JobState.SUCCESS));
@@ -101,22 +112,32 @@ public class TezTaskStats extends JobSta
}
}
- public void addOutputStatistics(Map<String, Long> counters) {
+ public void addOutputStatistics(Map<String, Map<String, Long>> map) {
if (stores == null) {
- LOG.warn("Unable to get stores of the job");
return;
}
for (POStore sto : stores) {
long records = -1;
- long hdfsBytesWritten = -1;
+ long hdfsBytesWritten = JobStats.getOutputSize(sto, conf);
String filename = sto.getSFile().getFileName();
- if (counters.get(PigStatsUtil.MAP_OUTPUT_RECORDS) != null) {
- records = counters.get(PigStatsUtil.MAP_OUTPUT_RECORDS);
+ if (sto.isMultiStore()) {
+ Long n = map.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP).get(PigStatsUtil.getMultiStoreCounterName(sto));
+ if (n != null) records = n;
+ } else if (map.get(TASK_COUNTER_GROUP) != null) {
+ if(map.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name()) != null) {
+ records = map.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name());
+ } else if(map.get(TASK_COUNTER_GROUP).get(PigStatsUtil.MAP_OUTPUT_RECORDS) != null) {
+ // Tez 0.3 has MAP_OUTPUT_RECORDS TODO: Remove after we move away from Tez 0.3
+ records = map.get(TASK_COUNTER_GROUP).get(PigStatsUtil.MAP_OUTPUT_RECORDS);
+ }
}
- if (counters.get(PigStatsUtil.HDFS_BYTES_WRITTEN) != null) {
- hdfsBytesWritten = counters.get(PigStatsUtil.HDFS_BYTES_WRITTEN);
+ /*
+ if (map.get(FS_COUNTER_GROUP)!= null &&
+ map.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN) != null) {
+ hdfsBytesWritten = map.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN);
}
+ */
OutputStats os = new OutputStats(filename, hdfsBytesWritten,
records, (state == JobState.SUCCESS));
os.setPOStore(sto);
@@ -232,4 +253,12 @@ public class TezTaskStats extends JobSta
public Map<String, Long> getMultiInputCounters() {
throw new UnsupportedOperationException();
}
+
+ public boolean hasLoadOrStore() {
+ if ((loads != null && !loads.isEmpty())
+ || (stores != null && !stores.isEmpty())) {
+ return true;
+ }
+ return false;
+ }
}
Modified: pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java Fri Apr 11 14:57:15 2014
@@ -38,8 +38,11 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.plan.VisitorException;
import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
+// ant sends abstract classes also to junit. skipNonTests attribute does not work with all ant versions
+@Ignore
public abstract class TestSecondarySort {
protected static MiniGenericCluster cluster = null;
protected PigServer pigServer;
@@ -476,7 +479,7 @@ public abstract class TestSecondarySort
Util.deleteFile(cluster, clusterFilePath);
}
-
+
@Test
// Once custom partitioner is used, we cannot use secondary key optimizer, see PIG-3827
public void testCustomPartitionerWithSort() throws Exception {
@@ -502,9 +505,9 @@ public abstract class TestSecondarySort
} catch (Exception e) {
captureException = true;
}
-
+
assertTrue(captureException);
-
+
Util.deleteFile(cluster, clusterPath);
}
}
Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld?rev=1586670&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld Fri Apr 11 14:57:15 2014
@@ -0,0 +1,65 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-66
+#--------------------------------------------------
+Tez vertex scope-58 -> Tez vertex scope-60,Tez vertex scope-62,Tez vertex scope-64,
+Tez vertex scope-64
+Tez vertex scope-62
+Tez vertex scope-60
+
+Tez vertex scope-58
+# Plan on vertex
+POValueOutputTez - scope-59 -> [scope-64, scope-62, scope-60]
+|
+|---a: New For Each(false,false)[bag] - scope-41
+ | |
+ | Cast[int] - scope-36
+ | |
+ | |---Project[bytearray][0] - scope-35
+ | |
+ | Cast[int] - scope-39
+ | |
+ | |---Project[bytearray][1] - scope-38
+ |
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-34
+Tez vertex scope-64
+# Plan on vertex
+d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-57
+|
+|---d: Filter[bag] - scope-53
+ | |
+ | Greater Than[boolean] - scope-56
+ | |
+ | |---Project[int][0] - scope-54
+ | |
+ | |---Constant(10) - scope-55
+ |
+ |---POValueInputTez - scope-65 <- scope-58
+Tez vertex scope-62
+# Plan on vertex
+c: Store(file:///tmp/output/c:org.apache.pig.builtin.PigStorage) - scope-52
+|
+|---c: Filter[bag] - scope-48
+ | |
+ | Less Than or Equal[boolean] - scope-51
+ | |
+ | |---Project[int][0] - scope-49
+ | |
+ | |---Constant(10) - scope-50
+ |
+ |---POValueInputTez - scope-63 <- scope-58
+Tez vertex scope-60
+# Plan on vertex
+b: Store(file:///tmp/output/b:org.apache.pig.builtin.PigStorage) - scope-47
+|
+|---b: Filter[bag] - scope-43
+ | |
+ | Less Than or Equal[boolean] - scope-46
+ | |
+ | |---Project[int][0] - scope-44
+ | |
+ | |---Constant(5) - scope-45
+ |
+ |---POValueInputTez - scope-61 <- scope-58
Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld?rev=1586670&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld Fri Apr 11 14:57:15 2014
@@ -0,0 +1,53 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-33
+#--------------------------------------------------
+Tez vertex scope-24
+
+Tez vertex scope-24
+# Plan on vertex
+1-1: Split - scope-32
+| |
+| b: Store(file:///tmp/output/b:org.apache.pig.builtin.PigStorage) - scope-13
+| |
+| |---b: Filter[bag] - scope-9
+| | |
+| | Less Than or Equal[boolean] - scope-12
+| | |
+| | |---Project[int][0] - scope-10
+| | |
+| | |---Constant(5) - scope-11
+| |
+| c: Store(file:///tmp/output/c:org.apache.pig.builtin.PigStorage) - scope-18
+| |
+| |---c: Filter[bag] - scope-14
+| | |
+| | Less Than or Equal[boolean] - scope-17
+| | |
+| | |---Project[int][0] - scope-15
+| | |
+| | |---Constant(10) - scope-16
+| |
+| d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-23
+| |
+| |---d: Filter[bag] - scope-19
+| | |
+| | Greater Than[boolean] - scope-22
+| | |
+| | |---Project[int][0] - scope-20
+| | |
+| | |---Constant(10) - scope-21
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld?rev=1586670&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld Fri Apr 11 14:57:15 2014
@@ -0,0 +1,311 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-412
+#--------------------------------------------------
+Tez vertex scope-316 -> Tez vertex scope-318,Tez vertex scope-329,Tez vertex scope-340,
+Tez vertex scope-318 -> Tez vertex scope-321,Tez vertex scope-334,Tez vertex scope-345,
+Tez vertex scope-345 -> Tez vertex scope-348,Tez vertex scope-372,
+Tez vertex scope-348 -> Tez vertex scope-366,Tez vertex scope-356,
+Tez vertex scope-356 -> Tez vertex scope-366,
+Tez vertex scope-366 -> Tez vertex scope-368,
+Tez vertex scope-368
+Tez vertex scope-340 -> Tez vertex scope-343,Tez vertex scope-384,
+Tez vertex scope-384 -> Tez vertex scope-388,
+Tez vertex scope-372 -> Tez vertex scope-376,
+Tez vertex scope-376 -> Tez vertex scope-382,Tez vertex scope-386,
+Tez vertex scope-386 -> Tez vertex scope-388,
+Tez vertex scope-388
+Tez vertex scope-329 -> Tez vertex scope-332,Tez vertex scope-337,
+Tez vertex scope-337 -> Tez vertex scope-339,
+Tez vertex scope-339
+Tez vertex scope-382
+Tez vertex scope-321 -> Tez vertex scope-323,
+Tez vertex scope-323 -> Tez vertex scope-325,Tez vertex scope-327,
+Tez vertex scope-327
+Tez vertex scope-325
+Tez vertex scope-343
+Tez vertex scope-332 -> Tez vertex scope-336,
+Tez vertex scope-334 -> Tez vertex scope-336,
+Tez vertex scope-336
+
+Tez vertex scope-316
+# Plan on vertex
+POValueOutputTez - scope-317 -> [scope-340, scope-318, scope-329]
+|
+|---a: New For Each(false,false)[bag] - scope-217
+ | |
+ | Cast[int] - scope-212
+ | |
+ | |---Project[bytearray][0] - scope-211
+ | |
+ | Cast[int] - scope-215
+ | |
+ | |---Project[bytearray][1] - scope-214
+ |
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-210
+Tez vertex scope-318
+# Plan on vertex
+POValueOutputTez - scope-320 -> [scope-321, scope-345, scope-334]
+|
+|---b: Filter[bag] - scope-219
+ | |
+ | Less Than or Equal[boolean] - scope-222
+ | |
+ | |---Project[int][0] - scope-220
+ | |
+ | |---Constant(5) - scope-221
+ |
+ |---POValueInputTez - scope-319 <- scope-316
+Tez vertex scope-345
+# Plan on vertex
+POValueOutputTez - scope-347 -> [scope-348, scope-372]
+|
+|---POValueInputTez - scope-346 <- scope-318
+Tez vertex scope-348
+# Plan on vertex
+Local Rearrange[tuple]{tuple}(false) - scope-352 -> scope-356
+| |
+| Constant(DummyVal) - scope-351
+|
+|---ReservoirSample - scope-355
+ |
+ |---New For Each(false)[tuple] - scope-354
+ | |
+ | Project[int][0] - scope-353
+ |
+ |---e1: Local Rearrange[tuple]{int}(false) - scope-350 -> scope-366
+ | |
+ | Project[int][0] - scope-298
+ |
+ |---e: Filter[bag] - scope-294
+ | |
+ | Less Than[boolean] - scope-297
+ | |
+ | |---Project[int][0] - scope-295
+ | |
+ | |---Constant(3) - scope-296
+ |
+ |---POValueInputTez - scope-349 <- scope-345
+Tez vertex scope-356
+# Plan on vertex
+POValueOutputTez - scope-365 -> [scope-366]
+|
+|---New For Each(false)[tuple] - scope-364
+ | |
+ | POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-363
+ | |
+ | |---Project[tuple][*] - scope-362
+ |
+ |---New For Each(false,false)[tuple] - scope-361
+ | |
+ | Constant(1) - scope-360
+ | |
+ | Project[bag][1] - scope-358
+ |
+ |---Package(Packager)[tuple]{bytearray} - scope-357
+Tez vertex scope-366
+# Plan on vertex
+POIdentityInOutTez - scope-367 <- scope-348 -> scope-368
+| |
+| Project[int][0] - scope-298
+Tez vertex scope-368
+# Plan on vertex
+e1: Store(file:///tmp/output/e1:org.apache.pig.builtin.PigStorage) - scope-300
+|
+|---New For Each(true)[tuple] - scope-371
+ | |
+ | Project[bag][1] - scope-370
+ |
+ |---Package(LitePackager)[tuple]{int} - scope-369
+Tez vertex scope-340
+# Plan on vertex
+POValueOutputTez - scope-342 -> [scope-384, scope-343]
+|
+|---d1: Filter[bag] - scope-283
+ | |
+ | Equal To[boolean] - scope-286
+ | |
+ | |---Project[int][0] - scope-284
+ | |
+ | |---Constant(5) - scope-285
+ |
+ |---d: Filter[bag] - scope-279
+ | |
+ | Greater Than[boolean] - scope-282
+ | |
+ | |---Project[int][0] - scope-280
+ | |
+ | |---Constant(10) - scope-281
+ |
+ |---POValueInputTez - scope-341 <- scope-316
+Tez vertex scope-384
+# Plan on vertex
+POValueOutputTez - scope-390 -> [scope-388]
+|
+|---POValueInputTez - scope-385 <- scope-340
+Tez vertex scope-372
+# Plan on vertex
+f1: Local Rearrange[tuple]{tuple}(false) - scope-375 -> scope-376
+| |
+| Project[tuple][*] - scope-374
+|
+|---f1: Limit - scope-305
+ |
+ |---f: Filter[bag] - scope-301
+ | |
+ | Greater Than or Equal[boolean] - scope-304
+ | |
+ | |---Project[int][0] - scope-302
+ | |
+ | |---Constant(3) - scope-303
+ |
+ |---POValueInputTez - scope-373 <- scope-345
+Tez vertex scope-376
+# Plan on vertex
+POValueOutputTez - scope-381 -> [scope-386, scope-382]
+|
+|---f1: Limit - scope-380
+ |
+ |---f1: New For Each(true)[bag] - scope-379
+ | |
+ | Project[tuple][1] - scope-378
+ |
+ |---f1: Package(Packager)[tuple]{tuple} - scope-377
+Tez vertex scope-386
+# Plan on vertex
+POValueOutputTez - scope-391 -> [scope-388]
+|
+|---POValueInputTez - scope-387 <- scope-376
+Tez vertex scope-388
+# Plan on vertex
+f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-315
+|
+|---POShuffledValueInputTez - scope-389 <- [scope-386, scope-384]
+Tez vertex scope-329
+# Plan on vertex
+POValueOutputTez - scope-331 -> [scope-337, scope-332]
+|
+|---c: Filter[bag] - scope-244
+ | |
+ | Less Than or Equal[boolean] - scope-247
+ | |
+ | |---Project[int][0] - scope-245
+ | |
+ | |---Constant(10) - scope-246
+ |
+ |---POValueInputTez - scope-330 <- scope-316
+Tez vertex scope-337
+# Plan on vertex
+c2: Local Rearrange[tuple]{int}(false) - scope-404 -> scope-339
+| |
+| Project[int][0] - scope-406
+|
+|---c3: New For Each(false,false)[bag] - scope-392
+ | |
+ | Project[int][0] - scope-393
+ | |
+ | POUserFunc(org.apache.pig.builtin.AlgebraicMathBase$Initial)[tuple] - scope-394
+ | |
+ | |---Project[bag][0] - scope-395
+ | |
+ | |---Project[bag][1] - scope-396
+ |
+ |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-407
+ |
+ |---POValueInputTez - scope-338 <- scope-329
+Tez vertex scope-339
+# Combine plan on edge <scope-337>
+c2: Local Rearrange[tuple]{int}(false) - scope-408 -> scope-339
+| |
+| Project[int][0] - scope-410
+|
+|---c3: New For Each(false,false)[bag] - scope-397
+ | |
+ | Project[int][0] - scope-398
+ | |
+ | POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-399
+ | |
+ | |---Project[bag][1] - scope-400
+ |
+ |---c2: Package(CombinerPackager)[tuple]{int} - scope-403
+# Plan on vertex
+c3: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-278
+|
+|---c3: New For Each(false,false)[bag] - scope-277
+ | |
+ | Project[int][0] - scope-271
+ | |
+ | POUserFunc(org.apache.pig.builtin.LongSum$Final)[long] - scope-275
+ | |
+ | |---Project[bag][1] - scope-401
+ |
+ |---c2: Package(CombinerPackager)[tuple]{int} - scope-268
+Tez vertex scope-382
+# Plan on vertex
+f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-309
+|
+|---POValueInputTez - scope-383 <- scope-376
+Tez vertex scope-321
+# Plan on vertex
+b1: Local Rearrange[tuple]{int}(false) - scope-228 -> scope-323
+| |
+| Project[int][0] - scope-229
+|
+|---POValueInputTez - scope-322 <- scope-318
+Tez vertex scope-323
+# Plan on vertex
+POValueOutputTez - scope-324 -> [scope-327, scope-325]
+|
+|---b1: Package(Packager)[tuple]{int} - scope-227
+Tez vertex scope-327
+# Plan on vertex
+b2: Store(file:///tmp/output/b2:org.apache.pig.builtin.PigStorage) - scope-243
+|
+|---b2: New For Each(false,false)[bag] - scope-242
+ | |
+ | Project[int][0] - scope-236
+ | |
+ | POUserFunc(org.apache.pig.builtin.LongSum)[long] - scope-240
+ | |
+ | |---Project[bag][0] - scope-239
+ | |
+ | |---Project[bag][1] - scope-238
+ |
+ |---POValueInputTez - scope-328 <- scope-323
+Tez vertex scope-325
+# Plan on vertex
+b1: Store(file:///tmp/output/b1:org.apache.pig.builtin.PigStorage) - scope-233
+|
+|---POValueInputTez - scope-326 <- scope-323
+Tez vertex scope-343
+# Plan on vertex
+d1: Store(file:///tmp/output/d1:org.apache.pig.builtin.PigStorage) - scope-290
+|
+|---POValueInputTez - scope-344 <- scope-340
+Tez vertex scope-332
+# Plan on vertex
+c1: Local Rearrange[tuple]{int}(false) - scope-257 -> scope-336
+| |
+| Project[int][0] - scope-258
+|
+|---POValueInputTez - scope-333 <- scope-329
+Tez vertex scope-334
+# Plan on vertex
+c1: Local Rearrange[tuple]{int}(false) - scope-259 -> scope-336
+| |
+| Project[int][0] - scope-260
+|
+|---POValueInputTez - scope-335 <- scope-318
+Tez vertex scope-336
+# Plan on vertex
+c1: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-264
+|
+|---c1: New For Each(true,true)[tuple] - scope-263
+ | |
+ | Project[bag][1] - scope-261
+ | |
+ | Project[bag][2] - scope-262
+ |
+ |---c1: Package(Packager)[tuple]{int} - scope-256