You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/11/07 22:10:23 UTC

svn commit: r1637449 [2/4] - in /pig/branches/branch-0.14: ./ src/org/apache/pig/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/b...

Added: pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java?rev=1637449&view=auto
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java (added)
+++ pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java Fri Nov  7 21:10:21 2014
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats.tez;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezJob;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.pigstats.InputStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
+import org.apache.tez.dag.api.client.DAGStatus;
+
+import com.google.common.collect.Maps;
+
+/*
+ * TezPigScriptStats encapsulates the statistics collected from a running script executed in Tez mode.
+ * It includes status of the execution, the Tez DAGs launched for the script, as well as
+ * information about outputs and inputs of the script.
+ *
+ * TezPigScriptStats encapsulates multiple TezDAGStats. TezDAGStats encapsulates multiple
+ * TezVertexStats
+ */
+public class TezPigScriptStats extends PigStats {
+    private static final Log LOG = LogFactory.getLog(TezPigScriptStats.class);
+
+    private TezScriptState tezScriptState;
+    private Map<String, TezDAGStats> tezDAGStatsMap;
+
+    /**
+     * This class builds the graph of Tez DAGs to be executed.
+     */
+    private class DAGGraphBuilder extends TezPlanContainerVisitor {
+        public DAGGraphBuilder(TezPlanContainer planContainer) {
+            super(planContainer, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(planContainer));
+        }
+
+        @Override
+        public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanNode) throws VisitorException {
+            TezScriptState ss = TezScriptState.get();
+            TezDAGScriptInfo dagScriptInfo = ss.setDAGScriptInfo(tezPlanNode);
+            TezDAGStats.JobGraphBuilder jobGraphBuilder = new TezDAGStats.JobGraphBuilder(tezPlanNode.getTezOperPlan(), dagScriptInfo);
+            jobGraphBuilder.visit();
+            TezDAGStats currStats = new TezDAGStats(tezPlanNode.getOperatorKey().toString(), jobGraphBuilder.getJobPlan(), jobGraphBuilder.getTezVertexStatsMap());
+            currStats.setAlias(dagScriptInfo);
+            jobPlan.add(currStats);
+            List<TezPlanContainerNode> preds = getPlan().getPredecessors(tezPlanNode);
+            if (preds != null) {
+                for (TezPlanContainerNode pred : preds) {
+                    TezDAGStats predStats = tezDAGStatsMap.get(pred.getOperatorKey().toString());
+                    if (!jobPlan.isConnected(predStats, currStats)) {
+                        jobPlan.connect(predStats, currStats);
+                    }
+                }
+            }
+            tezDAGStatsMap.put(tezPlanNode.getOperatorKey().toString(), currStats);
+        }
+    }
+
+    public TezPigScriptStats(PigContext pigContext) {
+        this.pigContext = pigContext;
+        this.jobPlan = new JobGraph();
+        this.tezDAGStatsMap = Maps.newHashMap();
+        this.tezScriptState = (TezScriptState) ScriptState.get();
+    }
+
+    public void initialize(TezPlanContainer tezPlanContainer) {
+        super.start();
+        try {
+            new DAGGraphBuilder(tezPlanContainer).visit();
+        } catch (FrontendException e) {
+            LOG.warn("Unable to build Tez DAG", e);
+        }
+    }
+
+    public void finish() {
+        super.stop();
+        display();
+    }
+
+    private void display() {
+        SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n");
+        sb.append(String.format("%1$20s: %2$-100s%n", "HadoopVersion", getHadoopVersion()));
+        sb.append(String.format("%1$20s: %2$-100s%n", "PigVersion", getPigVersion()));
+        sb.append(String.format("%1$20s: %2$-100s%n", "TezVersion", TezExecType.getTezVersion()));
+        sb.append(String.format("%1$20s: %2$-100s%n", "UserId", userId));
+        sb.append(String.format("%1$20s: %2$-100s%n", "FileName", getFileName()));
+        sb.append(String.format("%1$20s: %2$-100s%n", "StartedAt", sdf.format(new Date(startTime))));
+        sb.append(String.format("%1$20s: %2$-100s%n", "FinishedAt", sdf.format(new Date(endTime))));
+        sb.append(String.format("%1$20s: %2$-100s%n", "Features", getFeatures()));
+        sb.append("\n");
+        if (returnCode == ReturnCode.SUCCESS) {
+            sb.append("Success!\n");
+        } else if (returnCode == ReturnCode.PARTIAL_FAILURE) {
+            sb.append("Some tasks have failed! Stop running all dependent tasks\n");
+        } else {
+            sb.append("Failed!\n");
+        }
+        sb.append("\n");
+
+        // Print diagnostic info in case of failure
+        if (returnCode == ReturnCode.FAILURE
+                || returnCode == ReturnCode.PARTIAL_FAILURE) {
+            if (errorMessage != null) {
+                String[] lines = errorMessage.split("\n");
+                for (int i = 0; i < lines.length; i++) {
+                    String s = lines[i].trim();
+                    if (i == 0 || !StringUtils.isEmpty(s)) {
+                        sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "ErrorMessage" : "", s));
+                    }
+                }
+                sb.append("\n");
+            }
+        }
+
+        for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
+            sb.append(dagStats.getDisplayString());
+            sb.append("\n");
+        }
+
+        sb.append("Input(s):\n");
+        for (InputStats is : getInputStats()) {
+            sb.append(is.getDisplayString().trim()).append("\n");
+        }
+        sb.append("\n");
+        sb.append("Output(s):\n");
+        for (OutputStats os : getOutputStats()) {
+            sb.append(os.getDisplayString().trim()).append("\n");
+        }
+        LOG.info("Script Statistics:\n" + sb.toString());
+    }
+
+    /**
+     * Updates the statistics after a DAG is finished.
+     */
+    public void accumulateStats(TezJob tezJob) throws IOException {
+        DAGStatus dagStatus = tezJob.getDAGStatus();
+        TezDAGStats tezDAGStats = tezDAGStatsMap.get(tezJob.getName());
+        if (dagStatus == null) {
+            tezDAGStats.setSuccessful(false);
+            tezScriptState.emitJobFailedNotification(tezDAGStats);
+            return;
+        } else {
+            tezDAGStats.accumulateStats(tezJob);
+            for(OutputStats output: tezDAGStats.getOutputs()) {
+                tezScriptState.emitOutputCompletedNotification(output);
+            }
+            if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
+                tezDAGStats.setSuccessful(true);
+                tezScriptState.emitjobFinishedNotification(tezDAGStats);
+            } else if (dagStatus.getState() == DAGStatus.State.FAILED) {
+                tezDAGStats.setSuccessful(false);
+                tezDAGStats.setErrorMsg(tezJob.getDiagnostics());
+                tezScriptState.emitJobFailedNotification(tezDAGStats);
+            }
+            tezScriptState.dagCompletedNotification(tezJob.getName(), tezDAGStats);
+        }
+    }
+
+    public TezDAGStats addTezJobStatsForNative(String dagName, NativeTezOper tezOper, boolean success) {
+        TezDAGStats js = tezDAGStatsMap.get(dagName);
+        js.setJobId(tezOper.getJobId());
+        js.setSuccessful(success);
+        return js;
+    }
+
+    public TezVertexStats getVertexStats(String dagName, String vertexName) {
+        TezDAGStats tezDAGStats = tezDAGStatsMap.get(dagName);
+        return tezDAGStats == null ? null : tezDAGStats.getVertexStats(vertexName);
+    }
+
+    @Override
+    public boolean isEmbedded() {
+        return false;
+    }
+
+    @Override
+    public JobClient getJobClient() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<String, List<PigStats>> getAllStats() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<String> getAllErrorMessages() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getSMMSpillCount() {
+        long ret = 0;
+        for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
+            ret += dagStats.getSMMSpillCount();
+        }
+        return ret;
+    }
+
+    @Override
+    public long getProactiveSpillCountObjects() {
+        long ret = 0;
+        for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
+            ret += dagStats.getProactiveSpillCountObjects();
+        }
+        return ret;
+    }
+
+    @Override
+    public long getProactiveSpillCountRecords() {
+        long ret = 0;
+        for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
+            ret += dagStats.getProactiveSpillCountRecs();
+        }
+        return ret;
+    }
+
+}

Modified: pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java Fri Nov  7 21:10:21 2014
@@ -20,9 +20,10 @@ package org.apache.pig.tools.pigstats.te
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,15 +32,21 @@ import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
 import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.ScriptState;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
@@ -52,9 +59,8 @@ import com.google.common.collect.Maps;
 public class TezScriptState extends ScriptState {
     private static final Log LOG = LogFactory.getLog(TezScriptState.class);
 
-    private Map<TezOperator, String> featureMap = null;
-    private Map<TezOperator, String> aliasMap = Maps.newHashMap();
-    private Map<TezOperator, String> aliasLocationMap = Maps.newHashMap();
+    private List<PigTezProgressNotificationListener> tezListeners = Lists.newArrayList();
+    private Map<String, TezDAGScriptInfo> dagScriptInfo = Maps.newHashMap();
 
     public TezScriptState(String id) {
         super(id);
@@ -64,13 +70,48 @@ public class TezScriptState extends Scri
         return (TezScriptState) ScriptState.get();
     }
 
-    public void addSettingsToConf(TezOperator tezOp, Configuration conf) {
+    @Override
+    public void registerListener(PigProgressNotificationListener listener) {
+        super.registerListener(listener);
+        if (listener instanceof PigTezProgressNotificationListener) {
+            tezListeners.add((PigTezProgressNotificationListener) listener);
+        }
+    }
+
+    public void dagLaunchNotification(String dagName, OperatorPlan<?> dagPlan, int numVerticesToLaunch)  {
+        for (PigTezProgressNotificationListener listener: tezListeners) {
+            listener.dagLaunchNotification(id, dagName, dagPlan, numVerticesToLaunch);
+        }
+    }
+
+    public void dagStartedNotification(String dagName, String assignedApplicationId)  {
+        for (PigTezProgressNotificationListener listener: tezListeners) {
+            listener.dagStartedNotification(id, dagName, assignedApplicationId);
+        }
+    }
+
+    public void dagProgressNotification(String dagName, int numVerticesCompleted, int progress) {
+        for (PigTezProgressNotificationListener listener: tezListeners) {
+            listener.dagProgressNotification(id, dagName, numVerticesCompleted, progress);
+        }
+    }
+
+    public void dagCompletedNotification(String dagName, TezDAGStats tezDAGStats) {
+        for (PigTezProgressNotificationListener listener: tezListeners) {
+            listener.dagCompletedNotification(id, dagName, tezDAGStats.isSuccessful(), tezDAGStats);
+        }
+    }
+
+    public void addDAGSettingsToConf(Configuration conf) {
         LOG.info("Pig script settings are added to the job");
         conf.set(PIG_PROPERTY.HADOOP_VERSION.toString(), getHadoopVersion());
         conf.set(PIG_PROPERTY.VERSION.toString(), getPigVersion());
         conf.set(PIG_PROPERTY.SCRIPT_ID.toString(), id);
         conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript());
         conf.set(PIG_PROPERTY.COMMAND_LINE.toString(), getCommandLine());
+    }
+
+    public void addVertexSettingsToConf(String dagName, TezOperator tezOp, Configuration conf) {
 
         try {
             List<POStore> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStore.class);
@@ -95,8 +136,8 @@ public class TezScriptState extends Scri
             LOG.warn("unable to get the map loads", e);
         }
 
-        setPigFeature(tezOp, conf);
-        setJobParents(tezOp, conf);
+        setPigFeature(dagName, tezOp, conf);
+        setJobParents(dagName, tezOp, conf);
 
         conf.set("mapreduce.workflow.id", "pig_" + id);
         conf.set("mapreduce.workflow.name", getFileName().isEmpty() ? "default" : getFileName());
@@ -116,32 +157,30 @@ public class TezScriptState extends Scri
         }
     }
 
-    private void setPigFeature(TezOperator tezOp, Configuration conf) {
-        conf.set(PIG_PROPERTY.JOB_FEATURE.toString(), getPigFeature(tezOp));
+    private void setPigFeature(String dagName, TezOperator tezOp, Configuration conf) {
+        if (tezOp.isVertexGroup()) {
+            return;
+        }
+        TezDAGScriptInfo dagInfo = getDAGScriptInfo(dagName);
+        conf.set(PIG_PROPERTY.JOB_FEATURE.toString(), dagInfo.getPigFeatures(tezOp));
         if (scriptFeatures != 0) {
             conf.set(PIG_PROPERTY.SCRIPT_FEATURES.toString(),
                     String.valueOf(scriptFeatures));
         }
-        conf.set(PIG_PROPERTY.JOB_ALIAS.toString(), getAlias(tezOp));
-        conf.set(PIG_PROPERTY.JOB_ALIAS_LOCATION.toString(), getAliasLocation(tezOp));
+        conf.set(PIG_PROPERTY.JOB_ALIAS.toString(), dagInfo.getAlias(tezOp));
+        conf.set(PIG_PROPERTY.JOB_ALIAS_LOCATION.toString(), dagInfo.getAliasLocation(tezOp));
     }
 
-    private void setJobParents(TezOperator tezOp, Configuration conf) {
+    private void setJobParents(String dagName, TezOperator tezOp, Configuration conf) {
+        if (tezOp.isVertexGroup()) {
+            return;
+        }
         // PigStats maintains a job DAG with the job id being updated
         // upon available. Therefore, before a job is submitted, the ids
         // of its parent jobs are already available.
-        JobGraph jg = PigStats.get().getJobGraph();
-        JobStats js = null;
-        Iterator<JobStats> iter = jg.iterator();
-        while (iter.hasNext()) {
-            JobStats job = iter.next();
-            if (job.getName().equals(tezOp.getOperatorKey().toString())) {
-                js = job;
-                break;
-            }
-        }
+        JobStats js = ((TezPigScriptStats)PigStats.get()).getVertexStats(dagName, tezOp.getOperatorKey().toString());
         if (js != null) {
-            List<Operator> preds = jg.getPredecessors(js);
+            List<Operator> preds = js.getPlan().getPredecessors(js);
             if (preds != null) {
                 StringBuilder sb = new StringBuilder();
                 for (Operator op : preds) {
@@ -154,87 +193,173 @@ public class TezScriptState extends Scri
         }
     }
 
-    public String getAlias(TezOperator tezOp) {
-        if (!aliasMap.containsKey(tezOp)) {
-            setAlias(tezOp);
-        }
-        return aliasMap.get(tezOp);
+    public TezDAGScriptInfo setDAGScriptInfo(TezPlanContainerNode tezPlanNode) {
+        TezDAGScriptInfo info = new TezDAGScriptInfo(tezPlanNode.getTezOperPlan());
+        dagScriptInfo.put(tezPlanNode.getOperatorKey().toString(), info);
+        return info;
     }
 
-    private void setAlias(TezOperator tezOp) {
-        ArrayList<String> alias = new ArrayList<String>();
-        String aliasLocationStr = "";
-        try {
-            ArrayList<String> aliasLocation = new ArrayList<String>();
-            new AliasVisitor(tezOp.plan, alias, aliasLocation).visit();
-            aliasLocationStr += LoadFunc.join(aliasLocation, ",");
-            if (!alias.isEmpty()) {
-                Collections.sort(alias);
-            }
-        } catch (VisitorException e) {
-            LOG.warn("unable to get alias", e);
-        }
-        aliasMap.put(tezOp, LoadFunc.join(alias, ","));
-        aliasLocationMap.put(tezOp, aliasLocationStr);
+    public TezDAGScriptInfo getDAGScriptInfo(String dagName) {
+        return dagScriptInfo.get(dagName);
     }
 
-    public String getAliasLocation(TezOperator tezOp) {
-        if (!aliasLocationMap.containsKey(tezOp)) {
-            setAlias(tezOp);
-        }
-        return aliasLocationMap.get(tezOp);
-    }
+    static class TezDAGScriptInfo {
 
-    public String getPigFeature(TezOperator tezOp) {
-        if (featureMap == null) {
-            featureMap = Maps.newHashMap();
-        }
+        private static final Log LOG = LogFactory.getLog(TezDAGScriptInfo.class);
+        private TezOperPlan tezPlan;
+        private String alias;
+        private String aliasLocation;
+        private String features;
+
+        private Map<OperatorKey, String> featuresMap = Maps.newHashMap();
+        private Map<OperatorKey, String> aliasMap = Maps.newHashMap();
+        private Map<OperatorKey, String> aliasLocationMap = Maps.newHashMap();
+
+        class DAGAliasVisitor extends TezOpPlanVisitor {
+
+            private Set<String> aliases;
+            private Set<String> aliasLocations;
+            private BitSet featureSet;
+
+            public DAGAliasVisitor(TezOperPlan plan) {
+                super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+                this.aliases = new HashSet<String>();
+                this.aliasLocations = new HashSet<String>();
+                this.featureSet = new BitSet();
+            }
+
+            @Override
+            public void visitTezOp(TezOperator tezOp) throws VisitorException {
+                if (tezOp.isVertexGroup()) {
+                    featureSet.set(PIG_FEATURE.UNION.ordinal());
+                    return;
+                }
+                ArrayList<String> aliasList = new ArrayList<String>();
+                String aliasLocationStr = "";
+                try {
+                    ArrayList<String> aliasLocationList = new ArrayList<String>();
+                    new AliasVisitor(tezOp.plan, aliasList, aliasLocationList).visit();
+                    aliasLocationStr += LoadFunc.join(aliasLocationList, ",");
+                    if (!aliasList.isEmpty()) {
+                        Collections.sort(aliasList);
+                        aliases.addAll(aliasList);
+                        aliasLocations.addAll(aliasLocationList);
+                    }
+                } catch (VisitorException e) {
+                    LOG.warn("unable to get alias", e);
+                }
+                aliasMap.put(tezOp.getOperatorKey(), LoadFunc.join(aliasList, ","));
+                aliasLocationMap.put(tezOp.getOperatorKey(), aliasLocationStr);
 
-        String retStr = featureMap.get(tezOp);
-        if (retStr == null) {
-            BitSet feature = new BitSet();
-            feature.clear();
-            if (tezOp.isSkewedJoin()) {
-                feature.set(PIG_FEATURE.SKEWED_JOIN.ordinal());
-            }
-            if (tezOp.isGlobalSort()) {
-                feature.set(PIG_FEATURE.ORDER_BY.ordinal());
-            }
-            if (tezOp.isSampler()) {
-                feature.set(PIG_FEATURE.SAMPLER.ordinal());
-            }
-            if (tezOp.isIndexer()) {
-                feature.set(PIG_FEATURE.INDEXER.ordinal());
-            }
-            if (tezOp.isCogroup()) {
-                feature.set(PIG_FEATURE.COGROUP.ordinal());
-            }
-            if (tezOp.isGroupBy()) {
-                feature.set(PIG_FEATURE.GROUP_BY.ordinal());
-            }
-            if (tezOp.isRegularJoin()) {
-                feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
-            }
-            if (tezOp.isUnion()) {
-                feature.set(PIG_FEATURE.UNION.ordinal());
+
+                BitSet feature = new BitSet();
+                feature.clear();
+                if (tezOp.isSkewedJoin()) {
+                    feature.set(PIG_FEATURE.SKEWED_JOIN.ordinal());
+                }
+                if (tezOp.isGlobalSort()) {
+                    feature.set(PIG_FEATURE.ORDER_BY.ordinal());
+                }
+                if (tezOp.isSampler()) {
+                    feature.set(PIG_FEATURE.SAMPLER.ordinal());
+                }
+                if (tezOp.isIndexer()) {
+                    feature.set(PIG_FEATURE.INDEXER.ordinal());
+                }
+                if (tezOp.isCogroup()) {
+                    feature.set(PIG_FEATURE.COGROUP.ordinal());
+                }
+                if (tezOp.isGroupBy()) {
+                    feature.set(PIG_FEATURE.GROUP_BY.ordinal());
+                }
+                if (tezOp.isRegularJoin()) {
+                    feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+                }
+                if (tezOp.isUnion()) {
+                    feature.set(PIG_FEATURE.UNION.ordinal());
+                }
+                if (tezOp.isNative()) {
+                    feature.set(PIG_FEATURE.NATIVE.ordinal());
+                }
+                if (tezOp.isLimit() || tezOp.isLimitAfterSort()) {
+                    feature.set(PIG_FEATURE.LIMIT.ordinal());
+                }
+                try {
+                    new FeatureVisitor(tezOp.plan, feature).visit();
+                } catch (VisitorException e) {
+                    LOG.warn("Feature visitor failed", e);
+                }
+                StringBuilder sb = new StringBuilder();
+                for (int i=feature.nextSetBit(0); i>=0; i=feature.nextSetBit(i+1)) {
+                    if (sb.length() > 0) sb.append(",");
+                    sb.append(PIG_FEATURE.values()[i].name());
+                }
+                featuresMap.put(tezOp.getOperatorKey(), sb.toString());
+                for (int i=0; i < feature.length(); i++) {
+                    if (feature.get(i)) {
+                        featureSet.set(i);
+                    }
+                }
             }
-            if (tezOp.isNative()) {
-                feature.set(PIG_FEATURE.NATIVE.ordinal());
+
+            @Override
+            public void visit() throws VisitorException {
+                super.visit();
+                if (!aliases.isEmpty()) {
+                    ArrayList<String> aliasList = new ArrayList<String>(aliases);
+                    ArrayList<String> aliasLocationList = new ArrayList<String>(aliasLocations);
+                    Collections.sort(aliasList);
+                    Collections.sort(aliasLocationList);
+                    alias = LoadFunc.join(aliasList, ",");
+                    aliasLocation = LoadFunc.join(aliasLocationList, ",");
+                }
+                StringBuilder sb = new StringBuilder();
+                for (int i = featureSet.nextSetBit(0); i >= 0; i = featureSet.nextSetBit(i+1)) {
+                    if (sb.length() > 0) sb.append(",");
+                    sb.append(PIG_FEATURE.values()[i].name());
+                }
+                features = sb.toString();
             }
+
+        }
+
+        public TezDAGScriptInfo(TezOperPlan tezPlan) {
+            this.tezPlan = tezPlan;
+            initialize();
+        }
+
+        private void initialize() {
             try {
-                new FeatureVisitor(tezOp.plan, feature).visit();
+                new DAGAliasVisitor(tezPlan).visit();
             } catch (VisitorException e) {
-                LOG.warn("Feature visitor failed", e);
+                LOG.warn("Cannot calculate alias information for DAG", e);
             }
-            StringBuilder sb = new StringBuilder();
-            for (int i=feature.nextSetBit(0); i>=0; i=feature.nextSetBit(i+1)) {
-                if (sb.length() > 0) sb.append(",");
-                sb.append(PIG_FEATURE.values()[i].name());
-            }
-            retStr = sb.toString();
-            featureMap.put(tezOp, retStr);
         }
-        return retStr;
+
+        public String getAlias() {
+            return alias;
+        }
+
+        public String getAliasLocation() {
+            return aliasLocation;
+        }
+
+        public String getPigFeatures() {
+            return features;
+        }
+
+        public String getAlias(TezOperator tezOp) {
+            return aliasMap.get(tezOp.getOperatorKey());
+        }
+
+        public String getAliasLocation(TezOperator tezOp) {
+            return aliasLocationMap.get(tezOp.getOperatorKey());
+        }
+
+        public String getPigFeatures(TezOperator tezOp) {
+            return featuresMap.get(tezOp.getOperatorKey());
+        }
+
     }
 
 }

Added: pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1637449&view=auto
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (added)
+++ pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Fri Nov  7 21:10:21 2014
@@ -0,0 +1,368 @@
+package org.apache.pig.tools.pigstats.tez;
+
+import static org.apache.pig.tools.pigstats.tez.TezDAGStats.FS_COUNTER_GROUP;
+import static org.apache.pig.tools.pigstats.tez.TezDAGStats.PIG_COUNTER_GROUP;
+import static org.apache.pig.tools.pigstats.tez.TezDAGStats.TASK_COUNTER_GROUP;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.pig.PigCounters;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.tools.pigstats.InputStats;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
+import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.client.VertexStatus;
+
+import com.google.common.collect.Maps;
+
+/*
+ * TezVertexStats encapsulates the statistics collected from a Tez Vertex.
+ * It includes status of the execution as well as
+ * information about outputs and inputs of the Vertex.
+ */
+public class TezVertexStats extends JobStats {
+    private static final Log LOG = LogFactory.getLog(TezVertexStats.class);
+
+    private boolean isMapOpts;
+    private int parallelism;
+    // CounterGroup, Counter, Value
+    private Map<String, Map<String, Long>> counters = null;
+
+    private List<POStore> stores = null;
+    private List<FileSpec> loads = null;
+
+    private int numberMaps = 0;
+    private int numberReduces = 0;
+
+    private long mapInputRecords = 0;
+    private long mapOutputRecords = 0;
+    private long reduceInputRecords = 0;
+    private long reduceOutputRecords = 0;
+    private long spillCount = 0;
+    private long activeSpillCountObj = 0;
+    private long activeSpillCountRecs = 0;
+
+    private HashMap<String, Long> multiStoreCounters
+            = new HashMap<String, Long>();
+
+    public TezVertexStats(String name, JobGraph plan, boolean isMapOpts) {
+        super(name, plan);
+        this.isMapOpts = isMapOpts;
+    }
+
+    @Override
+    public String getJobId() {
+        return name;
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws FrontendException {
+        if (v instanceof JobGraphPrinter) {
+            JobGraphPrinter jpp = (JobGraphPrinter)v;
+            jpp.visit(this);
+        }
+    }
+
+    @Override
+    public String getDisplayString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(String.format("%1$20s: %2$-100s%n", "VertexName", name));
+        if (getAlias() != null && !getAlias().isEmpty()) {
+            sb.append(String.format("%1$20s: %2$-100s%n", "Alias", getAlias()));
+        }
+        if (getFeature() != null && !getFeature().isEmpty()) {
+            sb.append(String.format("%1$20s: %2$-100s%n", "Features", getFeature()));
+        }
+        return sb.toString();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void setConf(Configuration conf) {
+        super.setConf(conf);
+        try {
+            // TODO: We should replace PIG_REDUCE_STORES with something else in
+            // tez. For now, we keep it since it's used in PigOutputFormat.
+            this.stores = (List<POStore>) ObjectSerializer.deserialize(
+                    conf.get(JobControlCompiler.PIG_REDUCE_STORES));
+            this.loads = (List<FileSpec>) ObjectSerializer.deserialize(
+                    conf.get("pig.inputs"));
+        } catch (IOException e) {
+            LOG.warn("Failed to deserialize the store list", e);
+        }
+    }
+
+    public boolean hasLoadOrStore() {
+        if ((loads != null && !loads.isEmpty())
+                || (stores != null && !stores.isEmpty())) {
+            return true;
+        }
+        return false;
+    }
+
+    public void accumulateStats(VertexStatus status, int parallelism) {
+        hdfsBytesRead = -1;
+        hdfsBytesWritten = -1;
+
+        if (status != null) {
+            setSuccessful(status.getState().equals(VertexStatus.State.SUCCEEDED));
+            this.parallelism = parallelism;
+            if (this.isMapOpts) {
+                numberMaps += parallelism;
+            } else {
+                numberReduces += parallelism;
+            }
+            TezCounters tezCounters = status.getVertexCounters();
+            counters = Maps.newHashMap();
+            Iterator<CounterGroup> grpIt = tezCounters.iterator();
+            while (grpIt.hasNext()) {
+                CounterGroup grp = grpIt.next();
+                Iterator<TezCounter> cntIt = grp.iterator();
+                Map<String, Long> cntMap = Maps.newHashMap();
+                while (cntIt.hasNext()) {
+                    TezCounter cnt = cntIt.next();
+                    cntMap.put(cnt.getName(), cnt.getValue());
+                }
+                counters.put(grp.getName(), cntMap);
+            }
+
+            if (counters.get(FS_COUNTER_GROUP) != null &&
+                    counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
+                hdfsBytesRead = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ);
+            }
+            if (counters.get(FS_COUNTER_GROUP) != null &&
+                    counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN) != null) {
+                hdfsBytesWritten = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN);
+            }
+            Map<String, Long> pigCounters = counters.get(PIG_COUNTER_GROUP);
+            if (pigCounters != null) {
+                if (pigCounters.containsKey(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT)) {
+                    spillCount = pigCounters.get(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
+                }
+                if (pigCounters.containsKey(PigCounters.PROACTIVE_SPILL_COUNT_BAGS)) {
+                    activeSpillCountObj = pigCounters.get(PigCounters.PROACTIVE_SPILL_COUNT_BAGS);
+                }
+                if (pigCounters.containsKey(PigCounters.PROACTIVE_SPILL_COUNT_RECS)) {
+                    activeSpillCountRecs = pigCounters.get(PigCounters.PROACTIVE_SPILL_COUNT_RECS);
+                }
+            }
+
+            addInputStatistics();
+            addOutputStatistics();
+        }
+    }
+
+    public Map<String, Map<String, Long>> getCounters() {
+        return counters;
+    }
+
+    public int getParallelism() {
+        return parallelism;
+    }
+
+    public void addInputStatistics() {
+        if (loads == null) {
+            return;
+        }
+
+        for (FileSpec fs : loads) {
+            long records = -1;
+            long hdfsBytesRead = -1;
+            String filename = fs.getFileName();
+            if (counters != null) {
+                Map<String, Long> taskCounter = counters.get(TASK_COUNTER_GROUP);
+                if (taskCounter != null
+                        && taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()) != null) {
+                    records = taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name());
+                    if (this.isMapOpts) {
+                        mapInputRecords += records;
+                    } else {
+                        reduceInputRecords += records;
+                    }
+                }
+                if (counters.get(FS_COUNTER_GROUP) != null &&
+                        counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
+                    hdfsBytesRead = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ);
+                }
+            }
+            InputStats is = new InputStats(filename, hdfsBytesRead,
+                    records, (state == JobState.SUCCESS));
+            is.setConf(conf);
+            inputs.add(is);
+        }
+    }
+
+    public void addOutputStatistics() {
+        if (stores == null) {
+            return;
+        }
+
+        for (POStore sto : stores) {
+            if (sto.isTmpStore()) {
+                continue;
+            }
+            long records = -1;
+            long hdfsBytesWritten = -1;
+            String filename = sto.getSFile().getFileName();
+            if (counters != null) {
+                if (sto.isMultiStore()) {
+                    Map<String, Long> msGroup = counters.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+                    if (msGroup != null) {
+                        multiStoreCounters.putAll(msGroup);
+                        Long n = msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto));
+                        if (n != null) records = n;
+                    }
+                } else if (counters.get(TASK_COUNTER_GROUP) != null
+                        && counters.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name()) != null) {
+                    records = counters.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name());
+                }
+                if (records != -1) {
+                    if (this.isMapOpts) {
+                        mapOutputRecords += records;
+                    } else {
+                        reduceOutputRecords += records;
+                    }
+                }
+            }
+            /* TODO: Need to check FILE_BYTES_WRITTEN for local mode */
+            if (!sto.isMultiStore() && counters.get(FS_COUNTER_GROUP)!= null &&
+                    counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN) != null) {
+                hdfsBytesWritten = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN);
+            } else {
+                hdfsBytesWritten = JobStats.getOutputSize(sto, conf);
+            }
+
+            OutputStats os = new OutputStats(filename, hdfsBytesWritten,
+                    records, (state == JobState.SUCCESS));
+            os.setPOStore(sto);
+            os.setConf(conf);
+            outputs.add(os);
+        }
+    }
+
+    @Override
+    @Deprecated
+    public int getNumberMaps() {
+        return numberMaps;
+    }
+
+    @Override
+    @Deprecated
+    public int getNumberReduces() {
+        return numberReduces;
+    }
+
+    @Override
+    @Deprecated
+    public long getMaxMapTime() {
+        return -1;
+    }
+
+    @Override
+    @Deprecated
+    public long getMinMapTime() {
+        return -1;
+    }
+
+    @Override
+    @Deprecated
+    public long getAvgMapTime() {
+        return -1;
+    }
+
+    @Override
+    @Deprecated
+    public long getMaxReduceTime() {
+        return -1;
+    }
+
+    @Override
+    @Deprecated
+    public long getMinReduceTime() {
+        return -1;
+    }
+
+    @Override
+    @Deprecated
+    public long getAvgREduceTime() {
+        return -1;
+    }
+
+    @Override
+    @Deprecated
+    public long getMapInputRecords() {
+        return mapInputRecords;
+    }
+
+    @Override
+    @Deprecated
+    public long getMapOutputRecords() {
+        return mapOutputRecords;
+    }
+
+    @Override
+    @Deprecated
+    public long getReduceInputRecords() {
+        return reduceInputRecords;
+    }
+
+    @Override
+    @Deprecated
+    public long getReduceOutputRecords() {
+        return reduceOutputRecords;
+    }
+
+    @Override
+    public long getSMMSpillCount() {
+        return spillCount;
+    }
+
+    @Override
+    public long getProactiveSpillCountObjects() {
+        return activeSpillCountObj;
+    }
+
+    @Override
+    public long getProactiveSpillCountRecs() {
+        return activeSpillCountRecs;
+    }
+
+    @Override
+    @Deprecated
+    public Counters getHadoopCounters() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    @Deprecated
+    public Map<String, Long> getMultiStoreCounters() {
+        return multiStoreCounters;
+    }
+
+    @Override
+    @Deprecated
+    public Map<String, Long> getMultiInputCounters() {
+        throw new UnsupportedOperationException();
+    }
+
+}

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/TestPigRunner.java?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/TestPigRunner.java Fri Nov  7 21:10:21 2014
@@ -45,23 +45,25 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.newplan.Operator;
+import org.apache.pig.tools.pigstats.EmptyPigStats;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
-import org.apache.pig.tools.pigstats.EmptyPigStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestPigRunner {
 
-    private static MiniCluster cluster;
+    private static MiniGenericCluster cluster;
+    private static String execType;
 
     private static final String INPUT_FILE = "input";
     private static final String OUTPUT_FILE = "output";
@@ -69,7 +71,8 @@ public class TestPigRunner {
 
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
-        cluster = MiniCluster.buildCluster();
+        cluster = MiniGenericCluster.buildCluster();
+        execType = cluster.getExecType().name().toLowerCase();
         PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
         w.println("1\t2\t3");
         w.println("5\t3\t4");
@@ -89,6 +92,7 @@ public class TestPigRunner {
     @Before
     public void setUp() {
         deleteAll(new File(OUTPUT_FILE));
+        Util.resetStateForExecModeSwitch();
     }
 
     @Test
@@ -154,8 +158,8 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Dopt.fetch=false", "-Daggregate.warning=false", PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Dopt.fetch=false", "-Daggregate.warning=false", "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
@@ -189,8 +193,8 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Daggregate.warning=false", PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Daggregate.warning=false", "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats instanceof EmptyPigStats);
             assertTrue(stats.isSuccessful());
@@ -220,8 +224,8 @@ public class TestPigRunner {
         Path inputInDfs = new Path(cluster.getFileSystem().getHomeDirectory(), PIG_FILE);
 
         try {
-            String[] args = { inputInDfs.toString() };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, inputInDfs.toString() };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
@@ -248,11 +252,17 @@ public class TestPigRunner {
         w.println("C = limit B 2;");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        String[] args = { PIG_FILE };
+        String[] args = { "-x", execType, PIG_FILE };
         try {
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
             assertTrue(stats.isSuccessful());
-            assertTrue(stats.getJobGraph().size() == 4);
+            if (execType.equals("tez")) {
+                assertEquals(stats.getJobGraph().size(), 1);
+                // 5 vertices
+                assertEquals(stats.getJobGraph().getSources().get(0).getPlan().size(), 5);
+            } else {
+                assertEquals(stats.getJobGraph().size(), 4);
+            }
             assertTrue(stats.getJobGraph().getSinks().size() == 1);
             assertTrue(stats.getJobGraph().getSources().size() == 1);
             JobStats js = (JobStats) stats.getJobGraph().getSinks().get(0);
@@ -263,11 +273,20 @@ public class TestPigRunner {
             assertEquals(2, stats.getRecordWritten());
             assertEquals(12, stats.getBytesWritten());
 
-            assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get(
-                    0)).getAlias());
-            assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
-                    js).get(0)).getAlias());
-            assertEquals("B", js.getAlias());
+            if (execType.equals("tez")) {
+                assertEquals("A,B", ((JobStats) stats.getJobGraph().getSources().get(
+                        0)).getAlias());
+                // TODO: alias is not set for sample-aggregation/partition/sort job.
+                //       Need to investigate
+                // assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
+                //        js).get(0)).getAlias());
+            } else {
+                assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get(
+                        0)).getAlias());
+                assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
+                        js).get(0)).getAlias());
+                assertEquals("B", js.getAlias());
+            }
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
@@ -287,8 +306,8 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
             assertTrue(stats.isSuccessful());
             assertTrue(stats.getJobGraph().size() == 1);
             // Each output file should include the following:
@@ -336,10 +355,11 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
             assertTrue(stats.isSuccessful());
-            assertTrue(stats.getJobGraph().size() == 1);
+            assertEquals(stats.getJobGraph().size(), 1);
+
             // Each output file should include the following:
             // output:
             //   5\t3\t4\n
@@ -383,15 +403,19 @@ public class TestPigRunner {
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", execType, PIG_FILE };
             PigStats stats = PigRunner.run(args, null);
             Iterator<JobStats> iter = stats.getJobGraph().iterator();
             while (iter.hasNext()) {
                  JobStats js=iter.next();
-                 if(js.getState().name().equals("FAILED")) {
-                     List<Operator> ops=stats.getJobGraph().getSuccessors(js);
-                     for(Operator op : ops ) {
-                         assertEquals(((JobStats)op).getState().toString(), "UNKNOWN");
+                 if (execType.equals("tez")) {
+                     assertEquals(js.getState().name(), "FAILED");
+                 } else {
+                     if(js.getState().name().equals("FAILED")) {
+                         List<Operator> ops=stats.getJobGraph().getSuccessors(js);
+                         for(Operator op : ops ) {
+                             assertEquals(((JobStats)op).getState().toString(), "UNKNOWN");
+                         }
                      }
                  }
             }
@@ -410,7 +434,7 @@ public class TestPigRunner {
         w.println("C = foreach B generate group, COUNT(A);");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        String[] args = { "-c", PIG_FILE };
+        String[] args = { "-x", execType, "-c", PIG_FILE };
         PigStats stats = PigRunner.run(args, null);
         assertTrue(stats.getReturnCode() == ReturnCode.PIG_EXCEPTION);
         // TODO: error message has changed. Need to catch the new message generated from the
@@ -422,22 +446,23 @@ public class TestPigRunner {
 
     @Test
     public void simpleNegativeTest2() throws Exception {
-        String[] args = { "-c", "-e", "this is a test" };
-        PigStats stats = PigRunner.run(args, new TestNotificationListener());
+        String[] args = { "-x", execType, "-c", "-e", "this is a test" };
+        PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
         assertTrue(stats.getReturnCode() == ReturnCode.ILLEGAL_ARGS);
     }
 
     @Test
     public void simpleNegativeTest3() throws Exception {
-        String[] args = { "-c", "-y" };
-        PigStats stats = PigRunner.run(args, new TestNotificationListener());
+        String[] args = { "-x", execType, "-c", "-y" };
+        PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
         assertTrue(stats.getReturnCode() == ReturnCode.PARSE_EXCEPTION);
-        assertEquals("Found unknown option (-y) at position 2",
+        assertEquals("Found unknown option (-y) at position 4",
                 stats.getErrorMessage());
     }
 
     @Test
-    public void NagetiveTest() throws Exception {
+    public void streamNegativeTest() throws Exception {
+        Assume.assumeTrue("Skip this test for TEZ temporarily as it hangs", Util.isMapredExecType(cluster.getExecType()));
         final String OUTPUT_FILE_2 = "output2";
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
         w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
@@ -451,21 +476,32 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", execType, PIG_FILE };
             PigStats stats = PigRunner.run(args, null);
             assertTrue(!stats.isSuccessful());
-            assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE);
-            assertTrue(stats.getJobGraph().size() == 2);
-            JobStats job = (JobStats)stats.getJobGraph().getSources().get(0);
-            assertTrue(job.isSuccessful());
-            job = (JobStats)stats.getJobGraph().getSinks().get(0);
-            assertTrue(!job.isSuccessful());
-            assertTrue(stats.getOutputStats().size() == 3);
-            for (OutputStats output : stats.getOutputStats()) {
-                if (output.getName().equals("ee")) {
+            if (execType.equals("tez")) {
+                assertTrue(stats.getReturnCode() == ReturnCode.FAILURE);
+                assertTrue(stats.getJobGraph().size() == 1);
+                JobStats job = (JobStats)stats.getJobGraph().getSinks().get(0);
+                assertTrue(!job.isSuccessful());
+                assertTrue(stats.getOutputStats().size() == 3);
+                for (OutputStats output : stats.getOutputStats()) {
                     assertTrue(!output.isSuccessful());
-                } else {
-                    assertTrue(output.isSuccessful());
+                }
+            } else {
+                assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE);
+                assertTrue(stats.getJobGraph().size() == 2);
+                JobStats job = (JobStats)stats.getJobGraph().getSources().get(0);
+                assertTrue(job.isSuccessful());
+                job = (JobStats)stats.getJobGraph().getSinks().get(0);
+                assertTrue(!job.isSuccessful());
+                assertTrue(stats.getOutputStats().size() == 3);
+                for (OutputStats output : stats.getOutputStats()) {
+                    if (output.getName().equals("ee")) {
+                        assertTrue(!output.isSuccessful());
+                    } else {
+                        assertTrue(output.isSuccessful());
+                    }
                 }
             }
         } finally {
@@ -519,8 +555,8 @@ public class TestPigRunner {
         w1.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
@@ -554,8 +590,8 @@ public class TestPigRunner {
         w1.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
@@ -586,8 +622,8 @@ public class TestPigRunner {
         w1.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
@@ -613,9 +649,9 @@ public class TestPigRunner {
         String jarName = Util.findPigJarName();
 
         String[] args = { "-Dpig.additional.jars=" + jarName,
-                "-Dmapred.job.queue.name=default",
+                "-Dmapred.job.queue.name=default", "-x", execType,
                 "-e", "A = load '" + INPUT_FILE + "';store A into '" + OUTPUT_FILE + "';\n" };
-        PigStats stats = PigRunner.run(args, new TestNotificationListener());
+        PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
         Util.deleteFile(cluster, OUTPUT_FILE);
         PigContext ctx = stats.getPigContext();
@@ -642,8 +678,8 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
             assertTrue(stats.isSuccessful());
         } finally {
             new File(PIG_FILE).delete();
@@ -658,8 +694,8 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(!stats.isSuccessful());
             assertTrue(stats.getReturnCode() == PigRunner.ReturnCode.IO_EXCEPTION);
@@ -670,7 +706,7 @@ public class TestPigRunner {
 
     @Test // PIG-2006
     public void testEmptyFile() throws IOException {
-        File f1 = new File( PIG_FILE );
+        File f1 = new File(PIG_FILE);
 
         FileWriter fw1 = new FileWriter(f1);
         fw1.close();
@@ -698,7 +734,7 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", execType, PIG_FILE };
             PigStats stats = PigRunner.run(args, null);
 
             assertTrue(!stats.isSuccessful());
@@ -721,7 +757,7 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", execType, PIG_FILE };
             PigStats stats = PigRunner.run(args, null);
 
             assertTrue(!stats.isSuccessful());
@@ -751,8 +787,8 @@ public class TestPigRunner {
         w1.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
@@ -783,8 +819,8 @@ public class TestPigRunner {
         w1.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
@@ -815,17 +851,22 @@ public class TestPigRunner {
         w1.close();
 
         try {
-            String[] args = { "-Dpig.disable.counter=true", PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = {"-Dpig.disable.counter=true", "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
             assertEquals(1, stats.getNumberJobs());
             List<InputStats> inputs = stats.getInputStats();
             assertEquals(2, inputs.size());
-            for (InputStats instats : inputs) {
-                // the multi-input counters are disabled
-                assertEquals(-1, instats.getNumberRecords());
+            if (execType.equals("tez")) {
+                assertEquals(5, inputs.get(0).getNumberRecords());
+                assertEquals(5, inputs.get(1).getNumberRecords());
+            } else {
+                for (InputStats instats : inputs) {
+                    // the multi-input counters are disabled
+                    assertEquals(-1, instats.getNumberRecords());
+                }
             }
 
             List<OutputStats> outputs = stats.getOutputStats();
@@ -853,27 +894,44 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
+
+            String TASK_COUNTER_GROUP = execType.equals("tez") ? "org.apache.tez.common.counters.TaskCounter" : MRPigStatsUtil.TASK_COUNTER_GROUP;
+            String FS_COUNTER_GROUP = execType.equals("tez") ? "org.apache.tez.common.counters.FileSystemCounter" : MRPigStatsUtil.FS_COUNTER_GROUP;
 
-            Counters counter= ((MRJobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
-            assertEquals(5, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
-                    MRPigStatsUtil.MAP_INPUT_RECORDS).getValue());
-            assertEquals(3, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
-                    MRPigStatsUtil.MAP_OUTPUT_RECORDS).getValue());
-            assertEquals(2, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
-                    MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue());
-            assertEquals(0, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
-                    MRPigStatsUtil.REDUCE_OUTPUT_RECORDS).getValue());
-            assertEquals(20,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
-                    MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
-
-            // Skip for hadoop 20.203+, See PIG-2446
-            if (Util.isHadoop203plus())
-                return;
+            if (execType.equals("tez")) {
+                Counters counter= ((JobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
+                assertEquals(5, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName(
+                        "INPUT_RECORDS_PROCESSED").getValue());
+                assertEquals(2, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue());
+                assertEquals(7, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName(
+                        "OUTPUT_RECORDS").getValue());
+                assertEquals(20,counter.getGroup(FS_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
+                assertEquals(30,counter.getGroup(FS_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.HDFS_BYTES_READ).getValue());
+            } else {
+                Counters counter= ((MRJobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
+                assertEquals(5, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.MAP_INPUT_RECORDS).getValue());
+                assertEquals(3, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.MAP_OUTPUT_RECORDS).getValue());
+                assertEquals(2, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue());
+                assertEquals(0, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.REDUCE_OUTPUT_RECORDS).getValue());
+                assertEquals(20,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
+
+                // Skip for hadoop 20.203+, See PIG-2446
+                if (Util.isHadoop203plus())
+                    return;
 
-            assertEquals(30,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
-                    MRPigStatsUtil.HDFS_BYTES_READ).getValue());
+                assertEquals(30,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.HDFS_BYTES_READ).getValue());
+            }
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
@@ -892,17 +950,22 @@ public class TestPigRunner {
         w1.close();
 
         try {
-            String[] args = { "-Dpig.disable.counter=true", PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-Dpig.disable.counter=true", "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
 
             assertEquals(1, stats.getNumberJobs());
             List<OutputStats> outputs = stats.getOutputStats();
             assertEquals(2, outputs.size());
-            for (OutputStats outstats : outputs) {
-                // the multi-output counters are disabled
-                assertEquals(-1, outstats.getNumberRecords());
+            if (execType.equals("tez")) {
+                assertEquals(outputs.get(0).getNumberRecords(), 5);
+                assertEquals(outputs.get(1).getNumberRecords(), 2);
+            } else {
+                for (OutputStats outstats : outputs) {
+                    // the multi-output counters are disabled
+                    assertEquals(-1, outstats.getNumberRecords());
+                }
             }
 
             List<InputStats> inputs = stats.getInputStats();
@@ -937,8 +1000,8 @@ public class TestPigRunner {
         w1.close();
 
         try {
-            String[] args = { "-F", PIG_FILE };
-            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+            String[] args = { "-x", execType, "-F", PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(!stats.isSuccessful());
 
@@ -967,6 +1030,15 @@ public class TestPigRunner {
         private static final int JobsSubmitted = 1;
         private static final int JobStarted = 2;
         private static final int JobFinished = 3;
+        private String execType;
+
+        public TestNotificationListener(String execType) {
+            this.execType = execType;
+        }
+
+        public TestNotificationListener() {
+            this.execType = "mr";
+        }
 
         @Override
         public void initialPlanNotification(String id, OperatorPlan<?> plan) {

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/TestScriptLanguage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/TestScriptLanguage.java?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/TestScriptLanguage.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/TestScriptLanguage.java Fri Nov  7 21:10:21 2014
@@ -662,7 +662,8 @@ public class TestScriptLanguage {
         PigStatsUtil.getEmptyPigStats();
 
         // ExecMode.FILE
-        stats = PigRunner.run(new String[] { "-f", scriptFile.getAbsolutePath(), "arg0",
+        stats = PigRunner.run(new String[] { "-x", cluster.getExecType().name().toLowerCase(),
+                "-f", scriptFile.getAbsolutePath(), "arg0",
                 file1 + "," + file2 }, null);
         assertEquals(null, stats.getErrorMessage());
         assertFileNotExists(file1, file2);

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld Fri Nov  7 21:10:21 2014
@@ -2,7 +2,7 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-37
+# TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-34	->	Tez vertex scope-36,
 Tez vertex scope-35	->	Tez vertex scope-36,

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld Fri Nov  7 21:10:21 2014
@@ -2,7 +2,7 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-25
+# TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-13	->	Tez vertex scope-16,
 Tez vertex scope-16

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld Fri Nov  7 21:10:21 2014
@@ -2,26 +2,26 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-20
+# TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-18	->	Tez vertex scope-19,
 Tez vertex scope-19
 
 Tez vertex scope-18
 # Plan on vertex
-b: Local Rearrange[tuple]{int}(false) - scope-33	->	 scope-19
+b: Local Rearrange[tuple]{int}(false) - scope-32	->	 scope-19
 |   |
-|   Project[int][0] - scope-35
+|   Project[int][0] - scope-34
 |
-|---c: New For Each(false,false)[bag] - scope-22
+|---c: New For Each(false,false)[bag] - scope-21
     |   |
-    |   Project[int][0] - scope-23
+    |   Project[int][0] - scope-22
     |   |
-    |   POUserFunc(org.apache.pig.builtin.Distinct$Initial)[tuple] - scope-24
+    |   POUserFunc(org.apache.pig.builtin.Distinct$Initial)[tuple] - scope-23
     |   |
-    |   |---Project[tuple][1] - scope-25
+    |   |---Project[tuple][1] - scope-24
     |
-    |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-36
+    |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-35
         |
         |---a: New For Each(false,false)[bag] - scope-7
             |   |
@@ -36,19 +36,19 @@ b: Local Rearrange[tuple]{int}(false) - 
             |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-19
 # Combine plan on edge <scope-18>
-b: Local Rearrange[tuple]{int}(false) - scope-37	->	 scope-19
+b: Local Rearrange[tuple]{int}(false) - scope-36	->	 scope-19
 |   |
-|   Project[int][0] - scope-39
+|   Project[int][0] - scope-38
 |
-|---c: New For Each(false,false)[bag] - scope-26
+|---c: New For Each(false,false)[bag] - scope-25
     |   |
-    |   Project[int][0] - scope-27
+    |   Project[int][0] - scope-26
     |   |
-    |   POUserFunc(org.apache.pig.builtin.Distinct$Intermediate)[tuple] - scope-28
+    |   POUserFunc(org.apache.pig.builtin.Distinct$Intermediate)[tuple] - scope-27
     |   |
-    |   |---Project[bag][1] - scope-29
+    |   |---Project[bag][1] - scope-28
     |
-    |---b: Package(CombinerPackager)[tuple]{int} - scope-32
+    |---b: Package(CombinerPackager)[tuple]{int} - scope-31
 # Plan on vertex
 c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-17
 |
@@ -56,8 +56,8 @@ c: Store(file:///tmp/output:org.apache.p
     |   |
     |   POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-13
     |   |
-    |   |---POUserFunc(org.apache.pig.builtin.Distinct$Final)[bag] - scope-21
+    |   |---POUserFunc(org.apache.pig.builtin.Distinct$Final)[bag] - scope-20
     |       |
-    |       |---Project[bag][1] - scope-30
+    |       |---Project[bag][1] - scope-29
     |
     |---b: Package(CombinerPackager)[tuple]{int} - scope-9

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld Fri Nov  7 21:10:21 2014
@@ -2,7 +2,7 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-39
+# TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-37	->	Tez vertex scope-36,
 Tez vertex scope-38	->	Tez vertex scope-36,

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld Fri Nov  7 21:10:21 2014
@@ -2,7 +2,7 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-39
+# TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-36	->	Tez vertex scope-37,
 Tez vertex scope-38	->	Tez vertex scope-37,
@@ -10,21 +10,21 @@ Tez vertex scope-37
 
 Tez vertex scope-36
 # Plan on vertex
-b: Local Rearrange[tuple]{int}(false) - scope-52	->	 scope-37
+b: Local Rearrange[tuple]{int}(false) - scope-51	->	 scope-37
 |   |
-|   Project[int][0] - scope-54
+|   Project[int][0] - scope-53
 |
-|---b1: New For Each(false,false)[bag] - scope-40
+|---b1: New For Each(false,false)[bag] - scope-39
     |   |
-    |   Project[int][0] - scope-41
+    |   Project[int][0] - scope-40
     |   |
-    |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-42
+    |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-41
     |   |
-    |   |---Project[bag][1] - scope-43
+    |   |---Project[bag][1] - scope-42
     |       |
-    |       |---Project[bag][1] - scope-44
+    |       |---Project[bag][1] - scope-43
     |
-    |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-55
+    |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-54
         |
         |---a: New For Each(false,false)[bag] - scope-7
             |   |
@@ -56,19 +56,19 @@ Local Rearrange[tuple]{int}(false) - sco
     |---c: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-19
 Tez vertex scope-37
 # Combine plan on edge <scope-36>
-b: Local Rearrange[tuple]{int}(false) - scope-56	->	 scope-37
+b: Local Rearrange[tuple]{int}(false) - scope-55	->	 scope-37
 |   |
-|   Project[int][0] - scope-58
+|   Project[int][0] - scope-57
 |
-|---b1: New For Each(false,false)[bag] - scope-45
+|---b1: New For Each(false,false)[bag] - scope-44
     |   |
-    |   Project[int][0] - scope-46
+    |   Project[int][0] - scope-45
     |   |
-    |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-47
+    |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-46
     |   |
-    |   |---Project[bag][1] - scope-48
+    |   |---Project[bag][1] - scope-47
     |
-    |---b: Package(CombinerPackager)[tuple]{int} - scope-51
+    |---b: Package(CombinerPackager)[tuple]{int} - scope-50
 # Plan on vertex
 d: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-35
 |
@@ -84,6 +84,6 @@ d: Store(file:///tmp/output/e:org.apache
         |   |
         |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-16
         |   |
-        |   |---Project[bag][1] - scope-49
+        |   |---Project[bag][1] - scope-48
         |
         |---b: Package(CombinerPackager)[tuple]{int} - scope-9

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld Fri Nov  7 21:10:21 2014
@@ -2,7 +2,7 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-17
+# TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-16
 

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld Fri Nov  7 21:10:21 2014
@@ -2,28 +2,28 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-22
+# TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-20	->	Tez vertex scope-21,
 Tez vertex scope-21
 
 Tez vertex scope-20
 # Plan on vertex
-b: Local Rearrange[tuple]{int}(false) - scope-35	->	 scope-21
+b: Local Rearrange[tuple]{int}(false) - scope-34	->	 scope-21
 |   |
-|   Project[int][0] - scope-37
+|   Project[int][0] - scope-36
 |
-|---c: New For Each(false,false)[bag] - scope-23
+|---c: New For Each(false,false)[bag] - scope-22
     |   |
-    |   Project[int][0] - scope-24
+    |   Project[int][0] - scope-23
     |   |
-    |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-25
+    |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-24
     |   |
-    |   |---Project[bag][0] - scope-26
+    |   |---Project[bag][0] - scope-25
     |       |
-    |       |---Project[bag][1] - scope-27
+    |       |---Project[bag][1] - scope-26
     |
-    |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-38
+    |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-37
         |
         |---a: New For Each(false,false)[bag] - scope-7
             |   |
@@ -38,19 +38,19 @@ b: Local Rearrange[tuple]{int}(false) - 
             |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-21
 # Combine plan on edge <scope-20>
-b: Local Rearrange[tuple]{int}(false) - scope-39	->	 scope-21
+b: Local Rearrange[tuple]{int}(false) - scope-38	->	 scope-21
 |   |
-|   Project[int][0] - scope-41
+|   Project[int][0] - scope-40
 |
-|---c: New For Each(false,false)[bag] - scope-28
+|---c: New For Each(false,false)[bag] - scope-27
     |   |
-    |   Project[int][0] - scope-29
+    |   Project[int][0] - scope-28
     |   |
-    |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-30
+    |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-29
     |   |
-    |   |---Project[bag][1] - scope-31
+    |   |---Project[bag][1] - scope-30
     |
-    |---b: Package(CombinerPackager)[tuple]{int} - scope-34
+    |---b: Package(CombinerPackager)[tuple]{int} - scope-33
 # Plan on vertex
 c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-19
 |
@@ -60,6 +60,6 @@ c: Store(file:///tmp/output:org.apache.p
     |   |
     |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-16
     |   |
-    |   |---Project[bag][1] - scope-32
+    |   |---Project[bag][1] - scope-31
     |
     |---b: Package(CombinerPackager)[tuple]{int} - scope-9

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld Fri Nov  7 21:10:21 2014
@@ -2,7 +2,7 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-38
+# TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-35	->	Tez vertex scope-37,
 Tez vertex scope-36	->	Tez vertex scope-37,

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld Fri Nov  7 21:10:21 2014
@@ -2,7 +2,7 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-15
+# TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-10	->	Tez vertex scope-12,
 Tez vertex scope-12

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld Fri Nov  7 21:10:21 2014
@@ -2,7 +2,7 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-49
+# TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-12	->	Tez vertex scope-33,Tez vertex scope-22,
 Tez vertex scope-22	->	Tez vertex scope-33,

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld?rev=1637449&r1=1637448&r2=1637449&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld Fri Nov  7 21:10:21 2014
@@ -2,7 +2,7 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-71
+# TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-33	->	Tez vertex scope-37,Tez vertex scope-60,Tez vertex scope-49,
 Tez vertex scope-49	->	Tez vertex scope-60,
@@ -13,21 +13,21 @@ Tez vertex scope-67
 
 Tez vertex scope-33
 # Plan on vertex
-a: Split - scope-92
+a: Split - scope-91
 |   |
-|   g: Local Rearrange[tuple]{chararray}(false) - scope-83	->	 scope-37
+|   g: Local Rearrange[tuple]{chararray}(false) - scope-82	->	 scope-37
 |   |   |
-|   |   Project[chararray][0] - scope-85
+|   |   Project[chararray][0] - scope-84
 |   |
-|   |---h: New For Each(false,false)[bag] - scope-72
+|   |---h: New For Each(false,false)[bag] - scope-71
 |       |   |
-|       |   Project[chararray][0] - scope-73
+|       |   Project[chararray][0] - scope-72
 |       |   |
-|       |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-74
+|       |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-73
 |       |   |
-|       |   |---Project[bag][1] - scope-75
+|       |   |---Project[bag][1] - scope-74
 |       |
-|       |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-86
+|       |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-85
 |   |
 |   Local Rearrange[tuple]{tuple}(false) - scope-42	->	 scope-49
 |   |   |
@@ -88,19 +88,19 @@ POIdentityInOutTez - scope-61	<-	 scope-
 |   Project[int][1] - scope-12
 Tez vertex scope-37
 # Combine plan on edge <scope-33>
-g: Local Rearrange[tuple]{chararray}(false) - scope-87	->	 scope-37
+g: Local Rearrange[tuple]{chararray}(false) - scope-86	->	 scope-37
 |   |
-|   Project[chararray][0] - scope-89
+|   Project[chararray][0] - scope-88
 |
-|---h: New For Each(false,false)[bag] - scope-76
+|---h: New For Each(false,false)[bag] - scope-75
     |   |
-    |   Project[chararray][0] - scope-77
+    |   Project[chararray][0] - scope-76
     |   |
-    |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-78
+    |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-77
     |   |
-    |   |---Project[bag][1] - scope-79
+    |   |---Project[bag][1] - scope-78
     |
-    |---g: Package(CombinerPackager)[tuple]{chararray} - scope-82
+    |---g: Package(CombinerPackager)[tuple]{chararray} - scope-81
 # Plan on vertex
 POValueOutputTez - scope-70	->	 [scope-67, scope-62]
 |
@@ -108,7 +108,7 @@ POValueOutputTez - scope-70	->	 [scope-6
     |   |
     |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-21
     |   |
-    |   |---Project[bag][1] - scope-80
+    |   |---Project[bag][1] - scope-79
     |
     |---g: Package(CombinerPackager)[tuple]{chararray} - scope-17
 Tez vertex scope-62