You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/11/07 22:10:23 UTC
svn commit: r1637449 [2/4] - in /pig/branches/branch-0.14: ./
src/org/apache/pig/ src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/b...
Added: pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java?rev=1637449&view=auto
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java (added)
+++ pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java Fri Nov 7 21:10:21 2014
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats.tez;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezJob;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.pigstats.InputStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
+import org.apache.tez.dag.api.client.DAGStatus;
+
+import com.google.common.collect.Maps;
+
+/*
+ * TezPigScriptStats encapsulates the statistics collected from a running script executed in Tez mode.
+ * It includes status of the execution, the Tez DAGs launched for the script, as well as
+ * information about outputs and inputs of the script.
+ *
+ * TezPigScriptStats encapsulates multiple TezDAGStats. TezDAGStats encapsulates multiple
+ * TezVertexStats
+ */
+public class TezPigScriptStats extends PigStats {
+ private static final Log LOG = LogFactory.getLog(TezPigScriptStats.class);
+
+ private TezScriptState tezScriptState;
+ private Map<String, TezDAGStats> tezDAGStatsMap;
+
+ /**
+ * This class builds the graph of Tez DAGs to be executed.
+ */
+ private class DAGGraphBuilder extends TezPlanContainerVisitor {
+ public DAGGraphBuilder(TezPlanContainer planContainer) {
+ super(planContainer, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(planContainer));
+ }
+
+ @Override
+ public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanNode) throws VisitorException {
+ TezScriptState ss = TezScriptState.get();
+ TezDAGScriptInfo dagScriptInfo = ss.setDAGScriptInfo(tezPlanNode);
+ TezDAGStats.JobGraphBuilder jobGraphBuilder = new TezDAGStats.JobGraphBuilder(tezPlanNode.getTezOperPlan(), dagScriptInfo);
+ jobGraphBuilder.visit();
+ TezDAGStats currStats = new TezDAGStats(tezPlanNode.getOperatorKey().toString(), jobGraphBuilder.getJobPlan(), jobGraphBuilder.getTezVertexStatsMap());
+ currStats.setAlias(dagScriptInfo);
+ jobPlan.add(currStats);
+ List<TezPlanContainerNode> preds = getPlan().getPredecessors(tezPlanNode);
+ if (preds != null) {
+ for (TezPlanContainerNode pred : preds) {
+ TezDAGStats predStats = tezDAGStatsMap.get(pred.getOperatorKey().toString());
+ if (!jobPlan.isConnected(predStats, currStats)) {
+ jobPlan.connect(predStats, currStats);
+ }
+ }
+ }
+ tezDAGStatsMap.put(tezPlanNode.getOperatorKey().toString(), currStats);
+ }
+ }
+
+ public TezPigScriptStats(PigContext pigContext) {
+ this.pigContext = pigContext;
+ this.jobPlan = new JobGraph();
+ this.tezDAGStatsMap = Maps.newHashMap();
+ this.tezScriptState = (TezScriptState) ScriptState.get();
+ }
+
+ public void initialize(TezPlanContainer tezPlanContainer) {
+ super.start();
+ try {
+ new DAGGraphBuilder(tezPlanContainer).visit();
+ } catch (FrontendException e) {
+ LOG.warn("Unable to build Tez DAG", e);
+ }
+ }
+
+ public void finish() {
+ super.stop();
+ display();
+ }
+
+ private void display() {
+ SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n");
+ sb.append(String.format("%1$20s: %2$-100s%n", "HadoopVersion", getHadoopVersion()));
+ sb.append(String.format("%1$20s: %2$-100s%n", "PigVersion", getPigVersion()));
+ sb.append(String.format("%1$20s: %2$-100s%n", "TezVersion", TezExecType.getTezVersion()));
+ sb.append(String.format("%1$20s: %2$-100s%n", "UserId", userId));
+ sb.append(String.format("%1$20s: %2$-100s%n", "FileName", getFileName()));
+ sb.append(String.format("%1$20s: %2$-100s%n", "StartedAt", sdf.format(new Date(startTime))));
+ sb.append(String.format("%1$20s: %2$-100s%n", "FinishedAt", sdf.format(new Date(endTime))));
+ sb.append(String.format("%1$20s: %2$-100s%n", "Features", getFeatures()));
+ sb.append("\n");
+ if (returnCode == ReturnCode.SUCCESS) {
+ sb.append("Success!\n");
+ } else if (returnCode == ReturnCode.PARTIAL_FAILURE) {
+ sb.append("Some tasks have failed! Stop running all dependent tasks\n");
+ } else {
+ sb.append("Failed!\n");
+ }
+ sb.append("\n");
+
+ // Print diagnostic info in case of failure
+ if (returnCode == ReturnCode.FAILURE
+ || returnCode == ReturnCode.PARTIAL_FAILURE) {
+ if (errorMessage != null) {
+ String[] lines = errorMessage.split("\n");
+ for (int i = 0; i < lines.length; i++) {
+ String s = lines[i].trim();
+ if (i == 0 || !StringUtils.isEmpty(s)) {
+ sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "ErrorMessage" : "", s));
+ }
+ }
+ sb.append("\n");
+ }
+ }
+
+ for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
+ sb.append(dagStats.getDisplayString());
+ sb.append("\n");
+ }
+
+ sb.append("Input(s):\n");
+ for (InputStats is : getInputStats()) {
+ sb.append(is.getDisplayString().trim()).append("\n");
+ }
+ sb.append("\n");
+ sb.append("Output(s):\n");
+ for (OutputStats os : getOutputStats()) {
+ sb.append(os.getDisplayString().trim()).append("\n");
+ }
+ LOG.info("Script Statistics:\n" + sb.toString());
+ }
+
+ /**
+ * Updates the statistics after a DAG is finished.
+ */
+ public void accumulateStats(TezJob tezJob) throws IOException {
+ DAGStatus dagStatus = tezJob.getDAGStatus();
+ TezDAGStats tezDAGStats = tezDAGStatsMap.get(tezJob.getName());
+ if (dagStatus == null) {
+ tezDAGStats.setSuccessful(false);
+ tezScriptState.emitJobFailedNotification(tezDAGStats);
+ return;
+ } else {
+ tezDAGStats.accumulateStats(tezJob);
+ for(OutputStats output: tezDAGStats.getOutputs()) {
+ tezScriptState.emitOutputCompletedNotification(output);
+ }
+ if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
+ tezDAGStats.setSuccessful(true);
+ tezScriptState.emitjobFinishedNotification(tezDAGStats);
+ } else if (dagStatus.getState() == DAGStatus.State.FAILED) {
+ tezDAGStats.setSuccessful(false);
+ tezDAGStats.setErrorMsg(tezJob.getDiagnostics());
+ tezScriptState.emitJobFailedNotification(tezDAGStats);
+ }
+ tezScriptState.dagCompletedNotification(tezJob.getName(), tezDAGStats);
+ }
+ }
+
+ public TezDAGStats addTezJobStatsForNative(String dagName, NativeTezOper tezOper, boolean success) {
+ TezDAGStats js = tezDAGStatsMap.get(dagName);
+ js.setJobId(tezOper.getJobId());
+ js.setSuccessful(success);
+ return js;
+ }
+
+ public TezVertexStats getVertexStats(String dagName, String vertexName) {
+ TezDAGStats tezDAGStats = tezDAGStatsMap.get(dagName);
+ return tezDAGStats == null ? null : tezDAGStats.getVertexStats(vertexName);
+ }
+
+ @Override
+ public boolean isEmbedded() {
+ return false;
+ }
+
+ @Override
+ public JobClient getJobClient() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, List<PigStats>> getAllStats() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> getAllErrorMessages() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getSMMSpillCount() {
+ long ret = 0;
+ for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
+ ret += dagStats.getSMMSpillCount();
+ }
+ return ret;
+ }
+
+ @Override
+ public long getProactiveSpillCountObjects() {
+ long ret = 0;
+ for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
+ ret += dagStats.getProactiveSpillCountObjects();
+ }
+ return ret;
+ }
+
+ @Override
+ public long getProactiveSpillCountRecords() {
+ long ret = 0;
+ for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
+ ret += dagStats.getProactiveSpillCountRecs();
+ }
+ return ret;
+ }
+
+}
Modified: pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java Fri Nov 7 21:10:21 2014
@@ -20,9 +20,10 @@ package org.apache.pig.tools.pigstats.te
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
-import java.util.Iterator;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,15 +32,21 @@ import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.newplan.Operator;
import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.ScriptState;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
@@ -52,9 +59,8 @@ import com.google.common.collect.Maps;
public class TezScriptState extends ScriptState {
private static final Log LOG = LogFactory.getLog(TezScriptState.class);
- private Map<TezOperator, String> featureMap = null;
- private Map<TezOperator, String> aliasMap = Maps.newHashMap();
- private Map<TezOperator, String> aliasLocationMap = Maps.newHashMap();
+ private List<PigTezProgressNotificationListener> tezListeners = Lists.newArrayList();
+ private Map<String, TezDAGScriptInfo> dagScriptInfo = Maps.newHashMap();
public TezScriptState(String id) {
super(id);
@@ -64,13 +70,48 @@ public class TezScriptState extends Scri
return (TezScriptState) ScriptState.get();
}
- public void addSettingsToConf(TezOperator tezOp, Configuration conf) {
+ @Override
+ public void registerListener(PigProgressNotificationListener listener) {
+ super.registerListener(listener);
+ if (listener instanceof PigTezProgressNotificationListener) {
+ tezListeners.add((PigTezProgressNotificationListener) listener);
+ }
+ }
+
+ public void dagLaunchNotification(String dagName, OperatorPlan<?> dagPlan, int numVerticesToLaunch) {
+ for (PigTezProgressNotificationListener listener: tezListeners) {
+ listener.dagLaunchNotification(id, dagName, dagPlan, numVerticesToLaunch);
+ }
+ }
+
+ public void dagStartedNotification(String dagName, String assignedApplicationId) {
+ for (PigTezProgressNotificationListener listener: tezListeners) {
+ listener.dagStartedNotification(id, dagName, assignedApplicationId);
+ }
+ }
+
+ public void dagProgressNotification(String dagName, int numVerticesCompleted, int progress) {
+ for (PigTezProgressNotificationListener listener: tezListeners) {
+ listener.dagProgressNotification(id, dagName, numVerticesCompleted, progress);
+ }
+ }
+
+ public void dagCompletedNotification(String dagName, TezDAGStats tezDAGStats) {
+ for (PigTezProgressNotificationListener listener: tezListeners) {
+ listener.dagCompletedNotification(id, dagName, tezDAGStats.isSuccessful(), tezDAGStats);
+ }
+ }
+
+ public void addDAGSettingsToConf(Configuration conf) {
LOG.info("Pig script settings are added to the job");
conf.set(PIG_PROPERTY.HADOOP_VERSION.toString(), getHadoopVersion());
conf.set(PIG_PROPERTY.VERSION.toString(), getPigVersion());
conf.set(PIG_PROPERTY.SCRIPT_ID.toString(), id);
conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript());
conf.set(PIG_PROPERTY.COMMAND_LINE.toString(), getCommandLine());
+ }
+
+ public void addVertexSettingsToConf(String dagName, TezOperator tezOp, Configuration conf) {
try {
List<POStore> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStore.class);
@@ -95,8 +136,8 @@ public class TezScriptState extends Scri
LOG.warn("unable to get the map loads", e);
}
- setPigFeature(tezOp, conf);
- setJobParents(tezOp, conf);
+ setPigFeature(dagName, tezOp, conf);
+ setJobParents(dagName, tezOp, conf);
conf.set("mapreduce.workflow.id", "pig_" + id);
conf.set("mapreduce.workflow.name", getFileName().isEmpty() ? "default" : getFileName());
@@ -116,32 +157,30 @@ public class TezScriptState extends Scri
}
}
- private void setPigFeature(TezOperator tezOp, Configuration conf) {
- conf.set(PIG_PROPERTY.JOB_FEATURE.toString(), getPigFeature(tezOp));
+ private void setPigFeature(String dagName, TezOperator tezOp, Configuration conf) {
+ if (tezOp.isVertexGroup()) {
+ return;
+ }
+ TezDAGScriptInfo dagInfo = getDAGScriptInfo(dagName);
+ conf.set(PIG_PROPERTY.JOB_FEATURE.toString(), dagInfo.getPigFeatures(tezOp));
if (scriptFeatures != 0) {
conf.set(PIG_PROPERTY.SCRIPT_FEATURES.toString(),
String.valueOf(scriptFeatures));
}
- conf.set(PIG_PROPERTY.JOB_ALIAS.toString(), getAlias(tezOp));
- conf.set(PIG_PROPERTY.JOB_ALIAS_LOCATION.toString(), getAliasLocation(tezOp));
+ conf.set(PIG_PROPERTY.JOB_ALIAS.toString(), dagInfo.getAlias(tezOp));
+ conf.set(PIG_PROPERTY.JOB_ALIAS_LOCATION.toString(), dagInfo.getAliasLocation(tezOp));
}
- private void setJobParents(TezOperator tezOp, Configuration conf) {
+ private void setJobParents(String dagName, TezOperator tezOp, Configuration conf) {
+ if (tezOp.isVertexGroup()) {
+ return;
+ }
// PigStats maintains a job DAG with the job id being updated
// upon available. Therefore, before a job is submitted, the ids
// of its parent jobs are already available.
- JobGraph jg = PigStats.get().getJobGraph();
- JobStats js = null;
- Iterator<JobStats> iter = jg.iterator();
- while (iter.hasNext()) {
- JobStats job = iter.next();
- if (job.getName().equals(tezOp.getOperatorKey().toString())) {
- js = job;
- break;
- }
- }
+ JobStats js = ((TezPigScriptStats)PigStats.get()).getVertexStats(dagName, tezOp.getOperatorKey().toString());
if (js != null) {
- List<Operator> preds = jg.getPredecessors(js);
+ List<Operator> preds = js.getPlan().getPredecessors(js);
if (preds != null) {
StringBuilder sb = new StringBuilder();
for (Operator op : preds) {
@@ -154,87 +193,173 @@ public class TezScriptState extends Scri
}
}
- public String getAlias(TezOperator tezOp) {
- if (!aliasMap.containsKey(tezOp)) {
- setAlias(tezOp);
- }
- return aliasMap.get(tezOp);
+ public TezDAGScriptInfo setDAGScriptInfo(TezPlanContainerNode tezPlanNode) {
+ TezDAGScriptInfo info = new TezDAGScriptInfo(tezPlanNode.getTezOperPlan());
+ dagScriptInfo.put(tezPlanNode.getOperatorKey().toString(), info);
+ return info;
}
- private void setAlias(TezOperator tezOp) {
- ArrayList<String> alias = new ArrayList<String>();
- String aliasLocationStr = "";
- try {
- ArrayList<String> aliasLocation = new ArrayList<String>();
- new AliasVisitor(tezOp.plan, alias, aliasLocation).visit();
- aliasLocationStr += LoadFunc.join(aliasLocation, ",");
- if (!alias.isEmpty()) {
- Collections.sort(alias);
- }
- } catch (VisitorException e) {
- LOG.warn("unable to get alias", e);
- }
- aliasMap.put(tezOp, LoadFunc.join(alias, ","));
- aliasLocationMap.put(tezOp, aliasLocationStr);
+ public TezDAGScriptInfo getDAGScriptInfo(String dagName) {
+ return dagScriptInfo.get(dagName);
}
- public String getAliasLocation(TezOperator tezOp) {
- if (!aliasLocationMap.containsKey(tezOp)) {
- setAlias(tezOp);
- }
- return aliasLocationMap.get(tezOp);
- }
+ static class TezDAGScriptInfo {
- public String getPigFeature(TezOperator tezOp) {
- if (featureMap == null) {
- featureMap = Maps.newHashMap();
- }
+ private static final Log LOG = LogFactory.getLog(TezDAGScriptInfo.class);
+ private TezOperPlan tezPlan;
+ private String alias;
+ private String aliasLocation;
+ private String features;
+
+ private Map<OperatorKey, String> featuresMap = Maps.newHashMap();
+ private Map<OperatorKey, String> aliasMap = Maps.newHashMap();
+ private Map<OperatorKey, String> aliasLocationMap = Maps.newHashMap();
+
+ class DAGAliasVisitor extends TezOpPlanVisitor {
+
+ private Set<String> aliases;
+ private Set<String> aliasLocations;
+ private BitSet featureSet;
+
+ public DAGAliasVisitor(TezOperPlan plan) {
+ super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+ this.aliases = new HashSet<String>();
+ this.aliasLocations = new HashSet<String>();
+ this.featureSet = new BitSet();
+ }
+
+ @Override
+ public void visitTezOp(TezOperator tezOp) throws VisitorException {
+ if (tezOp.isVertexGroup()) {
+ featureSet.set(PIG_FEATURE.UNION.ordinal());
+ return;
+ }
+ ArrayList<String> aliasList = new ArrayList<String>();
+ String aliasLocationStr = "";
+ try {
+ ArrayList<String> aliasLocationList = new ArrayList<String>();
+ new AliasVisitor(tezOp.plan, aliasList, aliasLocationList).visit();
+ aliasLocationStr += LoadFunc.join(aliasLocationList, ",");
+ if (!aliasList.isEmpty()) {
+ Collections.sort(aliasList);
+ aliases.addAll(aliasList);
+ aliasLocations.addAll(aliasLocationList);
+ }
+ } catch (VisitorException e) {
+ LOG.warn("unable to get alias", e);
+ }
+ aliasMap.put(tezOp.getOperatorKey(), LoadFunc.join(aliasList, ","));
+ aliasLocationMap.put(tezOp.getOperatorKey(), aliasLocationStr);
- String retStr = featureMap.get(tezOp);
- if (retStr == null) {
- BitSet feature = new BitSet();
- feature.clear();
- if (tezOp.isSkewedJoin()) {
- feature.set(PIG_FEATURE.SKEWED_JOIN.ordinal());
- }
- if (tezOp.isGlobalSort()) {
- feature.set(PIG_FEATURE.ORDER_BY.ordinal());
- }
- if (tezOp.isSampler()) {
- feature.set(PIG_FEATURE.SAMPLER.ordinal());
- }
- if (tezOp.isIndexer()) {
- feature.set(PIG_FEATURE.INDEXER.ordinal());
- }
- if (tezOp.isCogroup()) {
- feature.set(PIG_FEATURE.COGROUP.ordinal());
- }
- if (tezOp.isGroupBy()) {
- feature.set(PIG_FEATURE.GROUP_BY.ordinal());
- }
- if (tezOp.isRegularJoin()) {
- feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
- }
- if (tezOp.isUnion()) {
- feature.set(PIG_FEATURE.UNION.ordinal());
+
+ BitSet feature = new BitSet();
+ feature.clear();
+ if (tezOp.isSkewedJoin()) {
+ feature.set(PIG_FEATURE.SKEWED_JOIN.ordinal());
+ }
+ if (tezOp.isGlobalSort()) {
+ feature.set(PIG_FEATURE.ORDER_BY.ordinal());
+ }
+ if (tezOp.isSampler()) {
+ feature.set(PIG_FEATURE.SAMPLER.ordinal());
+ }
+ if (tezOp.isIndexer()) {
+ feature.set(PIG_FEATURE.INDEXER.ordinal());
+ }
+ if (tezOp.isCogroup()) {
+ feature.set(PIG_FEATURE.COGROUP.ordinal());
+ }
+ if (tezOp.isGroupBy()) {
+ feature.set(PIG_FEATURE.GROUP_BY.ordinal());
+ }
+ if (tezOp.isRegularJoin()) {
+ feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+ }
+ if (tezOp.isUnion()) {
+ feature.set(PIG_FEATURE.UNION.ordinal());
+ }
+ if (tezOp.isNative()) {
+ feature.set(PIG_FEATURE.NATIVE.ordinal());
+ }
+ if (tezOp.isLimit() || tezOp.isLimitAfterSort()) {
+ feature.set(PIG_FEATURE.LIMIT.ordinal());
+ }
+ try {
+ new FeatureVisitor(tezOp.plan, feature).visit();
+ } catch (VisitorException e) {
+ LOG.warn("Feature visitor failed", e);
+ }
+ StringBuilder sb = new StringBuilder();
+ for (int i=feature.nextSetBit(0); i>=0; i=feature.nextSetBit(i+1)) {
+ if (sb.length() > 0) sb.append(",");
+ sb.append(PIG_FEATURE.values()[i].name());
+ }
+ featuresMap.put(tezOp.getOperatorKey(), sb.toString());
+ for (int i=0; i < feature.length(); i++) {
+ if (feature.get(i)) {
+ featureSet.set(i);
+ }
+ }
}
- if (tezOp.isNative()) {
- feature.set(PIG_FEATURE.NATIVE.ordinal());
+
+ @Override
+ public void visit() throws VisitorException {
+ super.visit();
+ if (!aliases.isEmpty()) {
+ ArrayList<String> aliasList = new ArrayList<String>(aliases);
+ ArrayList<String> aliasLocationList = new ArrayList<String>(aliasLocations);
+ Collections.sort(aliasList);
+ Collections.sort(aliasLocationList);
+ alias = LoadFunc.join(aliasList, ",");
+ aliasLocation = LoadFunc.join(aliasLocationList, ",");
+ }
+ StringBuilder sb = new StringBuilder();
+ for (int i = featureSet.nextSetBit(0); i >= 0; i = featureSet.nextSetBit(i+1)) {
+ if (sb.length() > 0) sb.append(",");
+ sb.append(PIG_FEATURE.values()[i].name());
+ }
+ features = sb.toString();
}
+
+ }
+
+ public TezDAGScriptInfo(TezOperPlan tezPlan) {
+ this.tezPlan = tezPlan;
+ initialize();
+ }
+
+ private void initialize() {
try {
- new FeatureVisitor(tezOp.plan, feature).visit();
+ new DAGAliasVisitor(tezPlan).visit();
} catch (VisitorException e) {
- LOG.warn("Feature visitor failed", e);
+ LOG.warn("Cannot calculate alias information for DAG", e);
}
- StringBuilder sb = new StringBuilder();
- for (int i=feature.nextSetBit(0); i>=0; i=feature.nextSetBit(i+1)) {
- if (sb.length() > 0) sb.append(",");
- sb.append(PIG_FEATURE.values()[i].name());
- }
- retStr = sb.toString();
- featureMap.put(tezOp, retStr);
}
- return retStr;
+
+ public String getAlias() {
+ return alias;
+ }
+
+ public String getAliasLocation() {
+ return aliasLocation;
+ }
+
+ public String getPigFeatures() {
+ return features;
+ }
+
+ public String getAlias(TezOperator tezOp) {
+ return aliasMap.get(tezOp.getOperatorKey());
+ }
+
+ public String getAliasLocation(TezOperator tezOp) {
+ return aliasLocationMap.get(tezOp.getOperatorKey());
+ }
+
+ public String getPigFeatures(TezOperator tezOp) {
+ return featuresMap.get(tezOp.getOperatorKey());
+ }
+
}
}
Added: pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1637449&view=auto
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (added)
+++ pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Fri Nov 7 21:10:21 2014
@@ -0,0 +1,368 @@
+package org.apache.pig.tools.pigstats.tez;
+
+import static org.apache.pig.tools.pigstats.tez.TezDAGStats.FS_COUNTER_GROUP;
+import static org.apache.pig.tools.pigstats.tez.TezDAGStats.PIG_COUNTER_GROUP;
+import static org.apache.pig.tools.pigstats.tez.TezDAGStats.TASK_COUNTER_GROUP;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.pig.PigCounters;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.tools.pigstats.InputStats;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
+import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.client.VertexStatus;
+
+import com.google.common.collect.Maps;
+
+/*
+ * TezVertexStats encapsulates the statistics collected from a Tez Vertex.
+ * It includes status of the execution as well as
+ * information about outputs and inputs of the Vertex.
+ */
+public class TezVertexStats extends JobStats {
+ private static final Log LOG = LogFactory.getLog(TezVertexStats.class);
+
+ private boolean isMapOpts;
+ private int parallelism;
+ // CounterGroup, Counter, Value
+ private Map<String, Map<String, Long>> counters = null;
+
+ private List<POStore> stores = null;
+ private List<FileSpec> loads = null;
+
+ private int numberMaps = 0;
+ private int numberReduces = 0;
+
+ private long mapInputRecords = 0;
+ private long mapOutputRecords = 0;
+ private long reduceInputRecords = 0;
+ private long reduceOutputRecords = 0;
+ private long spillCount = 0;
+ private long activeSpillCountObj = 0;
+ private long activeSpillCountRecs = 0;
+
+ private HashMap<String, Long> multiStoreCounters
+ = new HashMap<String, Long>();
+
+ public TezVertexStats(String name, JobGraph plan, boolean isMapOpts) {
+ super(name, plan);
+ this.isMapOpts = isMapOpts;
+ }
+
+ @Override
+ public String getJobId() {
+ return name;
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws FrontendException {
+ if (v instanceof JobGraphPrinter) {
+ JobGraphPrinter jpp = (JobGraphPrinter)v;
+ jpp.visit(this);
+ }
+ }
+
+ @Override
+ public String getDisplayString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(String.format("%1$20s: %2$-100s%n", "VertexName", name));
+ if (getAlias() != null && !getAlias().isEmpty()) {
+ sb.append(String.format("%1$20s: %2$-100s%n", "Alias", getAlias()));
+ }
+ if (getFeature() != null && !getFeature().isEmpty()) {
+ sb.append(String.format("%1$20s: %2$-100s%n", "Features", getFeature()));
+ }
+ return sb.toString();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ try {
+ // TODO: We should replace PIG_REDUCE_STORES with something else in
+ // tez. For now, we keep it since it's used in PigOutputFormat.
+ this.stores = (List<POStore>) ObjectSerializer.deserialize(
+ conf.get(JobControlCompiler.PIG_REDUCE_STORES));
+ this.loads = (List<FileSpec>) ObjectSerializer.deserialize(
+ conf.get("pig.inputs"));
+ } catch (IOException e) {
+ LOG.warn("Failed to deserialize the store list", e);
+ }
+ }
+
+ public boolean hasLoadOrStore() {
+ if ((loads != null && !loads.isEmpty())
+ || (stores != null && !stores.isEmpty())) {
+ return true;
+ }
+ return false;
+ }
+
+ public void accumulateStats(VertexStatus status, int parallelism) {
+ hdfsBytesRead = -1;
+ hdfsBytesWritten = -1;
+
+ if (status != null) {
+ setSuccessful(status.getState().equals(VertexStatus.State.SUCCEEDED));
+ this.parallelism = parallelism;
+ if (this.isMapOpts) {
+ numberMaps += parallelism;
+ } else {
+ numberReduces += parallelism;
+ }
+ TezCounters tezCounters = status.getVertexCounters();
+ counters = Maps.newHashMap();
+ Iterator<CounterGroup> grpIt = tezCounters.iterator();
+ while (grpIt.hasNext()) {
+ CounterGroup grp = grpIt.next();
+ Iterator<TezCounter> cntIt = grp.iterator();
+ Map<String, Long> cntMap = Maps.newHashMap();
+ while (cntIt.hasNext()) {
+ TezCounter cnt = cntIt.next();
+ cntMap.put(cnt.getName(), cnt.getValue());
+ }
+ counters.put(grp.getName(), cntMap);
+ }
+
+ if (counters.get(FS_COUNTER_GROUP) != null &&
+ counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
+ hdfsBytesRead = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ);
+ }
+ if (counters.get(FS_COUNTER_GROUP) != null &&
+ counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN) != null) {
+ hdfsBytesWritten = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN);
+ }
+ Map<String, Long> pigCounters = counters.get(PIG_COUNTER_GROUP);
+ if (pigCounters != null) {
+ if (pigCounters.containsKey(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT)) {
+ spillCount = pigCounters.get(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
+ }
+ if (pigCounters.containsKey(PigCounters.PROACTIVE_SPILL_COUNT_BAGS)) {
+ activeSpillCountObj = pigCounters.get(PigCounters.PROACTIVE_SPILL_COUNT_BAGS);
+ }
+ if (pigCounters.containsKey(PigCounters.PROACTIVE_SPILL_COUNT_RECS)) {
+ activeSpillCountRecs = pigCounters.get(PigCounters.PROACTIVE_SPILL_COUNT_RECS);
+ }
+ }
+
+ addInputStatistics();
+ addOutputStatistics();
+ }
+ }
+
+ public Map<String, Map<String, Long>> getCounters() {
+ return counters;
+ }
+
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ public void addInputStatistics() {
+ if (loads == null) {
+ return;
+ }
+
+ for (FileSpec fs : loads) {
+ long records = -1;
+ long hdfsBytesRead = -1;
+ String filename = fs.getFileName();
+ if (counters != null) {
+ Map<String, Long> taskCounter = counters.get(TASK_COUNTER_GROUP);
+ if (taskCounter != null
+ && taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()) != null) {
+ records = taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name());
+ if (this.isMapOpts) {
+ mapInputRecords += records;
+ } else {
+ reduceInputRecords += records;
+ }
+ }
+ if (counters.get(FS_COUNTER_GROUP) != null &&
+ counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
+ hdfsBytesRead = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ);
+ }
+ }
+ InputStats is = new InputStats(filename, hdfsBytesRead,
+ records, (state == JobState.SUCCESS));
+ is.setConf(conf);
+ inputs.add(is);
+ }
+ }
+
+ public void addOutputStatistics() {
+ if (stores == null) {
+ return;
+ }
+
+ for (POStore sto : stores) {
+ if (sto.isTmpStore()) {
+ continue;
+ }
+ long records = -1;
+ long hdfsBytesWritten = -1;
+ String filename = sto.getSFile().getFileName();
+ if (counters != null) {
+ if (sto.isMultiStore()) {
+ Map<String, Long> msGroup = counters.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+ if (msGroup != null) {
+ multiStoreCounters.putAll(msGroup);
+ Long n = msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto));
+ if (n != null) records = n;
+ }
+ } else if (counters.get(TASK_COUNTER_GROUP) != null
+ && counters.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name()) != null) {
+ records = counters.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name());
+ }
+ if (records != -1) {
+ if (this.isMapOpts) {
+ mapOutputRecords += records;
+ } else {
+ reduceOutputRecords += records;
+ }
+ }
+ }
+ /* TODO: Need to check FILE_BYTES_WRITTEN for local mode */
+ if (!sto.isMultiStore() && counters.get(FS_COUNTER_GROUP)!= null &&
+ counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN) != null) {
+ hdfsBytesWritten = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN);
+ } else {
+ hdfsBytesWritten = JobStats.getOutputSize(sto, conf);
+ }
+
+ OutputStats os = new OutputStats(filename, hdfsBytesWritten,
+ records, (state == JobState.SUCCESS));
+ os.setPOStore(sto);
+ os.setConf(conf);
+ outputs.add(os);
+ }
+ }
+
+ @Override
+ @Deprecated
+ public int getNumberMaps() {
+ return numberMaps;
+ }
+
+ @Override
+ @Deprecated
+ public int getNumberReduces() {
+ return numberReduces;
+ }
+
+ @Override
+ @Deprecated
+ public long getMaxMapTime() {
+ return -1;
+ }
+
+ @Override
+ @Deprecated
+ public long getMinMapTime() {
+ return -1;
+ }
+
+ @Override
+ @Deprecated
+ public long getAvgMapTime() {
+ return -1;
+ }
+
+ @Override
+ @Deprecated
+ public long getMaxReduceTime() {
+ return -1;
+ }
+
+ @Override
+ @Deprecated
+ public long getMinReduceTime() {
+ return -1;
+ }
+
+ @Override
+ @Deprecated
+ public long getAvgREduceTime() {
+ return -1;
+ }
+
+ @Override
+ @Deprecated
+ public long getMapInputRecords() {
+ return mapInputRecords;
+ }
+
+ @Override
+ @Deprecated
+ public long getMapOutputRecords() {
+ return mapOutputRecords;
+ }
+
+ @Override
+ @Deprecated
+ public long getReduceInputRecords() {
+ return reduceInputRecords;
+ }
+
+ @Override
+ @Deprecated
+ public long getReduceOutputRecords() {
+ return reduceOutputRecords;
+ }
+
+ @Override
+ public long getSMMSpillCount() {
+ return spillCount;
+ }
+
+ @Override
+ public long getProactiveSpillCountObjects() {
+ return activeSpillCountObj;
+ }
+
+ @Override
+ public long getProactiveSpillCountRecs() {
+ return activeSpillCountRecs;
+ }
+
+ @Override
+ @Deprecated
+ public Counters getHadoopCounters() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ @Deprecated
+ public Map<String, Long> getMultiStoreCounters() {
+ return multiStoreCounters;
+ }
+
+ @Override
+ @Deprecated
+ public Map<String, Long> getMultiInputCounters() {
+ throw new UnsupportedOperationException();
+ }
+
+}
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/TestPigRunner.java?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/TestPigRunner.java Fri Nov 7 21:10:21 2014
@@ -45,23 +45,25 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.newplan.Operator;
+import org.apache.pig.tools.pigstats.EmptyPigStats;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
-import org.apache.pig.tools.pigstats.EmptyPigStats;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.junit.AfterClass;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestPigRunner {
- private static MiniCluster cluster;
+ private static MiniGenericCluster cluster;
+ private static String execType;
private static final String INPUT_FILE = "input";
private static final String OUTPUT_FILE = "output";
@@ -69,7 +71,8 @@ public class TestPigRunner {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- cluster = MiniCluster.buildCluster();
+ cluster = MiniGenericCluster.buildCluster();
+ execType = cluster.getExecType().name().toLowerCase();
PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
w.println("1\t2\t3");
w.println("5\t3\t4");
@@ -89,6 +92,7 @@ public class TestPigRunner {
@Before
public void setUp() {
deleteAll(new File(OUTPUT_FILE));
+ Util.resetStateForExecModeSwitch();
}
@Test
@@ -154,8 +158,8 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Dopt.fetch=false", "-Daggregate.warning=false", PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Dopt.fetch=false", "-Daggregate.warning=false", "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
@@ -189,8 +193,8 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Daggregate.warning=false", PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Daggregate.warning=false", "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats instanceof EmptyPigStats);
assertTrue(stats.isSuccessful());
@@ -220,8 +224,8 @@ public class TestPigRunner {
Path inputInDfs = new Path(cluster.getFileSystem().getHomeDirectory(), PIG_FILE);
try {
- String[] args = { inputInDfs.toString() };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, inputInDfs.toString() };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
@@ -248,11 +252,17 @@ public class TestPigRunner {
w.println("C = limit B 2;");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
- String[] args = { PIG_FILE };
+ String[] args = { "-x", execType, PIG_FILE };
try {
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
- assertTrue(stats.getJobGraph().size() == 4);
+ if (execType.equals("tez")) {
+ assertEquals(stats.getJobGraph().size(), 1);
+ // 5 vertices
+ assertEquals(stats.getJobGraph().getSources().get(0).getPlan().size(), 5);
+ } else {
+ assertEquals(stats.getJobGraph().size(), 4);
+ }
assertTrue(stats.getJobGraph().getSinks().size() == 1);
assertTrue(stats.getJobGraph().getSources().size() == 1);
JobStats js = (JobStats) stats.getJobGraph().getSinks().get(0);
@@ -263,11 +273,20 @@ public class TestPigRunner {
assertEquals(2, stats.getRecordWritten());
assertEquals(12, stats.getBytesWritten());
- assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get(
- 0)).getAlias());
- assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
- js).get(0)).getAlias());
- assertEquals("B", js.getAlias());
+ if (execType.equals("tez")) {
+ assertEquals("A,B", ((JobStats) stats.getJobGraph().getSources().get(
+ 0)).getAlias());
+ // TODO: alias is not set for sample-aggregation/partition/sort job.
+ // Need to investigate
+ // assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
+ // js).get(0)).getAlias());
+ } else {
+ assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get(
+ 0)).getAlias());
+ assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
+ js).get(0)).getAlias());
+ assertEquals("B", js.getAlias());
+ }
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
@@ -287,8 +306,8 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
assertTrue(stats.getJobGraph().size() == 1);
// Each output file should include the following:
@@ -336,10 +355,11 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
- assertTrue(stats.getJobGraph().size() == 1);
+ assertEquals(stats.getJobGraph().size(), 1);
+
// Each output file should include the following:
// output:
// 5\t3\t4\n
@@ -383,15 +403,19 @@ public class TestPigRunner {
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", execType, PIG_FILE };
PigStats stats = PigRunner.run(args, null);
Iterator<JobStats> iter = stats.getJobGraph().iterator();
while (iter.hasNext()) {
JobStats js=iter.next();
- if(js.getState().name().equals("FAILED")) {
- List<Operator> ops=stats.getJobGraph().getSuccessors(js);
- for(Operator op : ops ) {
- assertEquals(((JobStats)op).getState().toString(), "UNKNOWN");
+ if (execType.equals("tez")) {
+ assertEquals(js.getState().name(), "FAILED");
+ } else {
+ if(js.getState().name().equals("FAILED")) {
+ List<Operator> ops=stats.getJobGraph().getSuccessors(js);
+ for(Operator op : ops ) {
+ assertEquals(((JobStats)op).getState().toString(), "UNKNOWN");
+ }
}
}
}
@@ -410,7 +434,7 @@ public class TestPigRunner {
w.println("C = foreach B generate group, COUNT(A);");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
- String[] args = { "-c", PIG_FILE };
+ String[] args = { "-x", execType, "-c", PIG_FILE };
PigStats stats = PigRunner.run(args, null);
assertTrue(stats.getReturnCode() == ReturnCode.PIG_EXCEPTION);
// TODO: error message has changed. Need to catch the new message generated from the
@@ -422,22 +446,23 @@ public class TestPigRunner {
@Test
public void simpleNegativeTest2() throws Exception {
- String[] args = { "-c", "-e", "this is a test" };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, "-c", "-e", "this is a test" };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.getReturnCode() == ReturnCode.ILLEGAL_ARGS);
}
@Test
public void simpleNegativeTest3() throws Exception {
- String[] args = { "-c", "-y" };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, "-c", "-y" };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.getReturnCode() == ReturnCode.PARSE_EXCEPTION);
- assertEquals("Found unknown option (-y) at position 2",
+ assertEquals("Found unknown option (-y) at position 4",
stats.getErrorMessage());
}
@Test
- public void NagetiveTest() throws Exception {
+ public void streamNegativeTest() throws Exception {
+ Assume.assumeTrue("Skip this test for TEZ temporarily as it hangs", Util.isMapredExecType(cluster.getExecType()));
final String OUTPUT_FILE_2 = "output2";
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
@@ -451,21 +476,32 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", execType, PIG_FILE };
PigStats stats = PigRunner.run(args, null);
assertTrue(!stats.isSuccessful());
- assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE);
- assertTrue(stats.getJobGraph().size() == 2);
- JobStats job = (JobStats)stats.getJobGraph().getSources().get(0);
- assertTrue(job.isSuccessful());
- job = (JobStats)stats.getJobGraph().getSinks().get(0);
- assertTrue(!job.isSuccessful());
- assertTrue(stats.getOutputStats().size() == 3);
- for (OutputStats output : stats.getOutputStats()) {
- if (output.getName().equals("ee")) {
+ if (execType.equals("tez")) {
+ assertTrue(stats.getReturnCode() == ReturnCode.FAILURE);
+ assertTrue(stats.getJobGraph().size() == 1);
+ JobStats job = (JobStats)stats.getJobGraph().getSinks().get(0);
+ assertTrue(!job.isSuccessful());
+ assertTrue(stats.getOutputStats().size() == 3);
+ for (OutputStats output : stats.getOutputStats()) {
assertTrue(!output.isSuccessful());
- } else {
- assertTrue(output.isSuccessful());
+ }
+ } else {
+ assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE);
+ assertTrue(stats.getJobGraph().size() == 2);
+ JobStats job = (JobStats)stats.getJobGraph().getSources().get(0);
+ assertTrue(job.isSuccessful());
+ job = (JobStats)stats.getJobGraph().getSinks().get(0);
+ assertTrue(!job.isSuccessful());
+ assertTrue(stats.getOutputStats().size() == 3);
+ for (OutputStats output : stats.getOutputStats()) {
+ if (output.getName().equals("ee")) {
+ assertTrue(!output.isSuccessful());
+ } else {
+ assertTrue(output.isSuccessful());
+ }
}
}
} finally {
@@ -519,8 +555,8 @@ public class TestPigRunner {
w1.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
@@ -554,8 +590,8 @@ public class TestPigRunner {
w1.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
@@ -586,8 +622,8 @@ public class TestPigRunner {
w1.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
@@ -613,9 +649,9 @@ public class TestPigRunner {
String jarName = Util.findPigJarName();
String[] args = { "-Dpig.additional.jars=" + jarName,
- "-Dmapred.job.queue.name=default",
+ "-Dmapred.job.queue.name=default", "-x", execType,
"-e", "A = load '" + INPUT_FILE + "';store A into '" + OUTPUT_FILE + "';\n" };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
Util.deleteFile(cluster, OUTPUT_FILE);
PigContext ctx = stats.getPigContext();
@@ -642,8 +678,8 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
} finally {
new File(PIG_FILE).delete();
@@ -658,8 +694,8 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(!stats.isSuccessful());
assertTrue(stats.getReturnCode() == PigRunner.ReturnCode.IO_EXCEPTION);
@@ -670,7 +706,7 @@ public class TestPigRunner {
@Test // PIG-2006
public void testEmptyFile() throws IOException {
- File f1 = new File( PIG_FILE );
+ File f1 = new File(PIG_FILE);
FileWriter fw1 = new FileWriter(f1);
fw1.close();
@@ -698,7 +734,7 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", execType, PIG_FILE };
PigStats stats = PigRunner.run(args, null);
assertTrue(!stats.isSuccessful());
@@ -721,7 +757,7 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", execType, PIG_FILE };
PigStats stats = PigRunner.run(args, null);
assertTrue(!stats.isSuccessful());
@@ -751,8 +787,8 @@ public class TestPigRunner {
w1.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
@@ -783,8 +819,8 @@ public class TestPigRunner {
w1.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
@@ -815,17 +851,22 @@ public class TestPigRunner {
w1.close();
try {
- String[] args = { "-Dpig.disable.counter=true", PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = {"-Dpig.disable.counter=true", "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
assertEquals(1, stats.getNumberJobs());
List<InputStats> inputs = stats.getInputStats();
assertEquals(2, inputs.size());
- for (InputStats instats : inputs) {
- // the multi-input counters are disabled
- assertEquals(-1, instats.getNumberRecords());
+ if (execType.equals("tez")) {
+ assertEquals(5, inputs.get(0).getNumberRecords());
+ assertEquals(5, inputs.get(1).getNumberRecords());
+ } else {
+ for (InputStats instats : inputs) {
+ // the multi-input counters are disabled
+ assertEquals(-1, instats.getNumberRecords());
+ }
}
List<OutputStats> outputs = stats.getOutputStats();
@@ -853,27 +894,44 @@ public class TestPigRunner {
w.close();
try {
- String[] args = { PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
+
+ String TASK_COUNTER_GROUP = execType.equals("tez") ? "org.apache.tez.common.counters.TaskCounter" : MRPigStatsUtil.TASK_COUNTER_GROUP;
+ String FS_COUNTER_GROUP = execType.equals("tez") ? "org.apache.tez.common.counters.FileSystemCounter" : MRPigStatsUtil.FS_COUNTER_GROUP;
- Counters counter= ((MRJobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
- assertEquals(5, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
- MRPigStatsUtil.MAP_INPUT_RECORDS).getValue());
- assertEquals(3, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
- MRPigStatsUtil.MAP_OUTPUT_RECORDS).getValue());
- assertEquals(2, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
- MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue());
- assertEquals(0, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
- MRPigStatsUtil.REDUCE_OUTPUT_RECORDS).getValue());
- assertEquals(20,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
- MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
-
- // Skip for hadoop 20.203+, See PIG-2446
- if (Util.isHadoop203plus())
- return;
+ if (execType.equals("tez")) {
+ Counters counter= ((JobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
+ assertEquals(5, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName(
+ "INPUT_RECORDS_PROCESSED").getValue());
+ assertEquals(2, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue());
+ assertEquals(7, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName(
+ "OUTPUT_RECORDS").getValue());
+ assertEquals(20,counter.getGroup(FS_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
+ assertEquals(30,counter.getGroup(FS_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.HDFS_BYTES_READ).getValue());
+ } else {
+ Counters counter= ((MRJobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
+ assertEquals(5, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.MAP_INPUT_RECORDS).getValue());
+ assertEquals(3, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.MAP_OUTPUT_RECORDS).getValue());
+ assertEquals(2, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue());
+ assertEquals(0, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.REDUCE_OUTPUT_RECORDS).getValue());
+ assertEquals(20,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
+
+ // Skip for hadoop 20.203+, See PIG-2446
+ if (Util.isHadoop203plus())
+ return;
- assertEquals(30,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
- MRPigStatsUtil.HDFS_BYTES_READ).getValue());
+ assertEquals(30,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.HDFS_BYTES_READ).getValue());
+ }
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
@@ -892,17 +950,22 @@ public class TestPigRunner {
w1.close();
try {
- String[] args = { "-Dpig.disable.counter=true", PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-Dpig.disable.counter=true", "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
assertEquals(1, stats.getNumberJobs());
List<OutputStats> outputs = stats.getOutputStats();
assertEquals(2, outputs.size());
- for (OutputStats outstats : outputs) {
- // the multi-output counters are disabled
- assertEquals(-1, outstats.getNumberRecords());
+ if (execType.equals("tez")) {
+ assertEquals(outputs.get(0).getNumberRecords(), 5);
+ assertEquals(outputs.get(1).getNumberRecords(), 2);
+ } else {
+ for (OutputStats outstats : outputs) {
+ // the multi-output counters are disabled
+ assertEquals(-1, outstats.getNumberRecords());
+ }
}
List<InputStats> inputs = stats.getInputStats();
@@ -937,8 +1000,8 @@ public class TestPigRunner {
w1.close();
try {
- String[] args = { "-F", PIG_FILE };
- PigStats stats = PigRunner.run(args, new TestNotificationListener());
+ String[] args = { "-x", execType, "-F", PIG_FILE };
+ PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(!stats.isSuccessful());
@@ -967,6 +1030,15 @@ public class TestPigRunner {
private static final int JobsSubmitted = 1;
private static final int JobStarted = 2;
private static final int JobFinished = 3;
+ private String execType;
+
+ public TestNotificationListener(String execType) {
+ this.execType = execType;
+ }
+
+ public TestNotificationListener() {
+ this.execType = "mr";
+ }
@Override
public void initialPlanNotification(String id, OperatorPlan<?> plan) {
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/TestScriptLanguage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/TestScriptLanguage.java?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/TestScriptLanguage.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/TestScriptLanguage.java Fri Nov 7 21:10:21 2014
@@ -662,7 +662,8 @@ public class TestScriptLanguage {
PigStatsUtil.getEmptyPigStats();
// ExecMode.FILE
- stats = PigRunner.run(new String[] { "-f", scriptFile.getAbsolutePath(), "arg0",
+ stats = PigRunner.run(new String[] { "-x", cluster.getExecType().name().toLowerCase(),
+ "-f", scriptFile.getAbsolutePath(), "arg0",
file1 + "," + file2 }, null);
assertEquals(null, stats.getErrorMessage());
assertFileNotExists(file1, file2);
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld Fri Nov 7 21:10:21 2014
@@ -2,7 +2,7 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-37
+# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-34 -> Tez vertex scope-36,
Tez vertex scope-35 -> Tez vertex scope-36,
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld Fri Nov 7 21:10:21 2014
@@ -2,7 +2,7 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-25
+# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-13 -> Tez vertex scope-16,
Tez vertex scope-16
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld Fri Nov 7 21:10:21 2014
@@ -2,26 +2,26 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-20
+# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-18 -> Tez vertex scope-19,
Tez vertex scope-19
Tez vertex scope-18
# Plan on vertex
-b: Local Rearrange[tuple]{int}(false) - scope-33 -> scope-19
+b: Local Rearrange[tuple]{int}(false) - scope-32 -> scope-19
| |
-| Project[int][0] - scope-35
+| Project[int][0] - scope-34
|
-|---c: New For Each(false,false)[bag] - scope-22
+|---c: New For Each(false,false)[bag] - scope-21
| |
- | Project[int][0] - scope-23
+ | Project[int][0] - scope-22
| |
- | POUserFunc(org.apache.pig.builtin.Distinct$Initial)[tuple] - scope-24
+ | POUserFunc(org.apache.pig.builtin.Distinct$Initial)[tuple] - scope-23
| |
- | |---Project[tuple][1] - scope-25
+ | |---Project[tuple][1] - scope-24
|
- |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-36
+ |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-35
|
|---a: New For Each(false,false)[bag] - scope-7
| |
@@ -36,19 +36,19 @@ b: Local Rearrange[tuple]{int}(false) -
|---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-19
# Combine plan on edge <scope-18>
-b: Local Rearrange[tuple]{int}(false) - scope-37 -> scope-19
+b: Local Rearrange[tuple]{int}(false) - scope-36 -> scope-19
| |
-| Project[int][0] - scope-39
+| Project[int][0] - scope-38
|
-|---c: New For Each(false,false)[bag] - scope-26
+|---c: New For Each(false,false)[bag] - scope-25
| |
- | Project[int][0] - scope-27
+ | Project[int][0] - scope-26
| |
- | POUserFunc(org.apache.pig.builtin.Distinct$Intermediate)[tuple] - scope-28
+ | POUserFunc(org.apache.pig.builtin.Distinct$Intermediate)[tuple] - scope-27
| |
- | |---Project[bag][1] - scope-29
+ | |---Project[bag][1] - scope-28
|
- |---b: Package(CombinerPackager)[tuple]{int} - scope-32
+ |---b: Package(CombinerPackager)[tuple]{int} - scope-31
# Plan on vertex
c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-17
|
@@ -56,8 +56,8 @@ c: Store(file:///tmp/output:org.apache.p
| |
| POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-13
| |
- | |---POUserFunc(org.apache.pig.builtin.Distinct$Final)[bag] - scope-21
+ | |---POUserFunc(org.apache.pig.builtin.Distinct$Final)[bag] - scope-20
| |
- | |---Project[bag][1] - scope-30
+ | |---Project[bag][1] - scope-29
|
|---b: Package(CombinerPackager)[tuple]{int} - scope-9
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld Fri Nov 7 21:10:21 2014
@@ -2,7 +2,7 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-39
+# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-37 -> Tez vertex scope-36,
Tez vertex scope-38 -> Tez vertex scope-36,
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld Fri Nov 7 21:10:21 2014
@@ -2,7 +2,7 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-39
+# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-36 -> Tez vertex scope-37,
Tez vertex scope-38 -> Tez vertex scope-37,
@@ -10,21 +10,21 @@ Tez vertex scope-37
Tez vertex scope-36
# Plan on vertex
-b: Local Rearrange[tuple]{int}(false) - scope-52 -> scope-37
+b: Local Rearrange[tuple]{int}(false) - scope-51 -> scope-37
| |
-| Project[int][0] - scope-54
+| Project[int][0] - scope-53
|
-|---b1: New For Each(false,false)[bag] - scope-40
+|---b1: New For Each(false,false)[bag] - scope-39
| |
- | Project[int][0] - scope-41
+ | Project[int][0] - scope-40
| |
- | POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-42
+ | POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-41
| |
- | |---Project[bag][1] - scope-43
+ | |---Project[bag][1] - scope-42
| |
- | |---Project[bag][1] - scope-44
+ | |---Project[bag][1] - scope-43
|
- |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-55
+ |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-54
|
|---a: New For Each(false,false)[bag] - scope-7
| |
@@ -56,19 +56,19 @@ Local Rearrange[tuple]{int}(false) - sco
|---c: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-19
Tez vertex scope-37
# Combine plan on edge <scope-36>
-b: Local Rearrange[tuple]{int}(false) - scope-56 -> scope-37
+b: Local Rearrange[tuple]{int}(false) - scope-55 -> scope-37
| |
-| Project[int][0] - scope-58
+| Project[int][0] - scope-57
|
-|---b1: New For Each(false,false)[bag] - scope-45
+|---b1: New For Each(false,false)[bag] - scope-44
| |
- | Project[int][0] - scope-46
+ | Project[int][0] - scope-45
| |
- | POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-47
+ | POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-46
| |
- | |---Project[bag][1] - scope-48
+ | |---Project[bag][1] - scope-47
|
- |---b: Package(CombinerPackager)[tuple]{int} - scope-51
+ |---b: Package(CombinerPackager)[tuple]{int} - scope-50
# Plan on vertex
d: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-35
|
@@ -84,6 +84,6 @@ d: Store(file:///tmp/output/e:org.apache
| |
| POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-16
| |
- | |---Project[bag][1] - scope-49
+ | |---Project[bag][1] - scope-48
|
|---b: Package(CombinerPackager)[tuple]{int} - scope-9
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld Fri Nov 7 21:10:21 2014
@@ -2,7 +2,7 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-17
+# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-16
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld Fri Nov 7 21:10:21 2014
@@ -2,28 +2,28 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-22
+# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-20 -> Tez vertex scope-21,
Tez vertex scope-21
Tez vertex scope-20
# Plan on vertex
-b: Local Rearrange[tuple]{int}(false) - scope-35 -> scope-21
+b: Local Rearrange[tuple]{int}(false) - scope-34 -> scope-21
| |
-| Project[int][0] - scope-37
+| Project[int][0] - scope-36
|
-|---c: New For Each(false,false)[bag] - scope-23
+|---c: New For Each(false,false)[bag] - scope-22
| |
- | Project[int][0] - scope-24
+ | Project[int][0] - scope-23
| |
- | POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-25
+ | POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-24
| |
- | |---Project[bag][0] - scope-26
+ | |---Project[bag][0] - scope-25
| |
- | |---Project[bag][1] - scope-27
+ | |---Project[bag][1] - scope-26
|
- |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-38
+ |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-37
|
|---a: New For Each(false,false)[bag] - scope-7
| |
@@ -38,19 +38,19 @@ b: Local Rearrange[tuple]{int}(false) -
|---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-21
# Combine plan on edge <scope-20>
-b: Local Rearrange[tuple]{int}(false) - scope-39 -> scope-21
+b: Local Rearrange[tuple]{int}(false) - scope-38 -> scope-21
| |
-| Project[int][0] - scope-41
+| Project[int][0] - scope-40
|
-|---c: New For Each(false,false)[bag] - scope-28
+|---c: New For Each(false,false)[bag] - scope-27
| |
- | Project[int][0] - scope-29
+ | Project[int][0] - scope-28
| |
- | POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-30
+ | POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-29
| |
- | |---Project[bag][1] - scope-31
+ | |---Project[bag][1] - scope-30
|
- |---b: Package(CombinerPackager)[tuple]{int} - scope-34
+ |---b: Package(CombinerPackager)[tuple]{int} - scope-33
# Plan on vertex
c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-19
|
@@ -60,6 +60,6 @@ c: Store(file:///tmp/output:org.apache.p
| |
| POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-16
| |
- | |---Project[bag][1] - scope-32
+ | |---Project[bag][1] - scope-31
|
|---b: Package(CombinerPackager)[tuple]{int} - scope-9
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld Fri Nov 7 21:10:21 2014
@@ -2,7 +2,7 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-38
+# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-35 -> Tez vertex scope-37,
Tez vertex scope-36 -> Tez vertex scope-37,
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld Fri Nov 7 21:10:21 2014
@@ -2,7 +2,7 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-15
+# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-10 -> Tez vertex scope-12,
Tez vertex scope-12
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld Fri Nov 7 21:10:21 2014
@@ -2,7 +2,7 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-49
+# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-12 -> Tez vertex scope-33,Tez vertex scope-22,
Tez vertex scope-22 -> Tez vertex scope-33,
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld Fri Nov 7 21:10:21 2014
@@ -2,7 +2,7 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-71
+# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-33 -> Tez vertex scope-37,Tez vertex scope-60,Tez vertex scope-49,
Tez vertex scope-49 -> Tez vertex scope-60,
@@ -13,21 +13,21 @@ Tez vertex scope-67
Tez vertex scope-33
# Plan on vertex
-a: Split - scope-92
+a: Split - scope-91
| |
-| g: Local Rearrange[tuple]{chararray}(false) - scope-83 -> scope-37
+| g: Local Rearrange[tuple]{chararray}(false) - scope-82 -> scope-37
| | |
-| | Project[chararray][0] - scope-85
+| | Project[chararray][0] - scope-84
| |
-| |---h: New For Each(false,false)[bag] - scope-72
+| |---h: New For Each(false,false)[bag] - scope-71
| | |
-| | Project[chararray][0] - scope-73
+| | Project[chararray][0] - scope-72
| | |
-| | POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-74
+| | POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-73
| | |
-| | |---Project[bag][1] - scope-75
+| | |---Project[bag][1] - scope-74
| |
-| |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-86
+| |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-85
| |
| Local Rearrange[tuple]{tuple}(false) - scope-42 -> scope-49
| | |
@@ -88,19 +88,19 @@ POIdentityInOutTez - scope-61 <- scope-
| Project[int][1] - scope-12
Tez vertex scope-37
# Combine plan on edge <scope-33>
-g: Local Rearrange[tuple]{chararray}(false) - scope-87 -> scope-37
+g: Local Rearrange[tuple]{chararray}(false) - scope-86 -> scope-37
| |
-| Project[chararray][0] - scope-89
+| Project[chararray][0] - scope-88
|
-|---h: New For Each(false,false)[bag] - scope-76
+|---h: New For Each(false,false)[bag] - scope-75
| |
- | Project[chararray][0] - scope-77
+ | Project[chararray][0] - scope-76
| |
- | POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-78
+ | POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-77
| |
- | |---Project[bag][1] - scope-79
+ | |---Project[bag][1] - scope-78
|
- |---g: Package(CombinerPackager)[tuple]{chararray} - scope-82
+ |---g: Package(CombinerPackager)[tuple]{chararray} - scope-81
# Plan on vertex
POValueOutputTez - scope-70 -> [scope-67, scope-62]
|
@@ -108,7 +108,7 @@ POValueOutputTez - scope-70 -> [scope-6
| |
| POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-21
| |
- | |---Project[bag][1] - scope-80
+ | |---Project[bag][1] - scope-79
|
|---g: Package(CombinerPackager)[tuple]{chararray} - scope-17
Tez vertex scope-62