You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/04/11 16:57:16 UTC

svn commit: r1586670 [1/2] - in /pig/branches/tez: ivy/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/ src/org/apache/pig/tools/pigstats/ src/org/apache/pig/tools/pigstats/mapred...

Author: rohini
Date: Fri Apr 11 14:57:15 2014
New Revision: 1586670

URL: http://svn.apache.org/r1586670
Log:
PIG-3842: Pig on tez job hangs when AM has a failure and Multiquery fixes (rohini)

Added:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5-OPTOFF.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-1.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld
Removed:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDAG.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC20.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC21.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC6.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC8.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC9.gld
Modified:
    pig/branches/tez/ivy/libraries.properties
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
    pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld
    pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/branches/tez/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/tez/ivy/libraries.properties?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/ivy/libraries.properties (original)
+++ pig/branches/tez/ivy/libraries.properties Fri Apr 11 14:57:15 2014
@@ -91,5 +91,5 @@ mockito.version=1.8.4
 jansi.version=1.9
 asm.version=3.3.1
 snappy.version=1.1.0.1
-tez.version=0.3.0-incubating
+tez.version=0.4.0-incubating
 parquet-pig-bundle.version=1.2.3

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java Fri Apr 11 14:57:15 2014
@@ -23,8 +23,6 @@ import java.util.Map.Entry;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -50,26 +48,15 @@ public class MultiQueryOptimizerTez exte
             List<TezOperator> successors = getPlan().getSuccessors(tezOp);
             List<TezOperator> succ_successors = new ArrayList<TezOperator>();
             for (TezOperator successor : successors) {
-                // don't want to be complicated by nested split
-                if (successor.isSplitter()) {
-                    continue;
-                }
+
                 // If has other dependency, don't merge into split,
                 if (getPlan().getPredecessors(successor).size()!=1) {
                     continue;
                 }
-                boolean containsBlacklistedOp = false;
-                for (PhysicalOperator op : successor.plan) {
-                    if (op instanceof POReservoirSample || op instanceof POPoissonSample) {
-                        containsBlacklistedOp = true;
-                        break;
-                    }
-                }
-                if (containsBlacklistedOp) {
-                    continue;
-                }
+
                 // Detect diamond shape, we cannot merge it into split, since Tez
                 // does not handle double edge between vertexes
+                // TODO: PIG-3876 to handle this by writing to same edge
                 boolean sharedSucc = false;
                 if (getPlan().getSuccessors(successor)!=null) {
                     for (TezOperator succ_successor : getPlan().getSuccessors(successor)) {
@@ -102,8 +89,6 @@ public class MultiQueryOptimizerTez exte
                 tezOp.plan.remove(firstNodeLeaf);
                 singleSplitee.plan.remove(secondNodeRoot);
 
-                //TODO remove filter all
-
                 tezOp.plan.merge(singleSplitee.plan);
                 tezOp.plan.connect(firstNodeLeafPred, secondNodeSucc);
 
@@ -113,6 +98,7 @@ public class MultiQueryOptimizerTez exte
             } else {
                 POValueOutputTez valueOutput = (POValueOutputTez)tezOp.plan.getLeaves().get(0);
                 POSplit split = new POSplit(OperatorKey.genOpKey(valueOutput.getOperatorKey().getScope()));
+                split.setAlias(valueOutput.getAlias());
                 for (TezOperator splitee : splittees) {
                     PhysicalOperator spliteeRoot =  splitee.plan.getRoots().get(0);
                     splitee.plan.remove(spliteeRoot);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java Fri Apr 11 14:57:15 2014
@@ -27,19 +27,24 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
 /**
  * POStoreTez is used to write to a Tez MROutput
  */
-public class POStoreTez extends POStore implements TezOutput {
+public class POStoreTez extends POStore implements TezOutput, TezTaskConfigurable {
 
     private static final long serialVersionUID = 1L;
     private transient MROutput output;
     private transient KeyValueWriter writer;
     private String outputKey;
+    private TezCounter outputRecordCounter;
 
     public POStoreTez(OperatorKey k) {
         super(k);
@@ -65,6 +70,24 @@ public class POStoreTez extends POStore 
     }
 
     @Override
+    public void initialize(TezProcessorContext processorContext)
+            throws ExecException {
+        if (isMultiStore()) {
+            CounterGroup multiStoreGroup = processorContext.getCounters()
+                    .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+            if (multiStoreGroup == null) {
+                processorContext.getCounters().addGroup(
+                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
+                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+            }
+            String name = MRPigStatsUtil.getMultiStoreCounterName(this);
+            if (name != null) {
+                outputRecordCounter = multiStoreGroup.addCounter(name, name, 0);
+            }
+        }
+    }
+
+    @Override
     public void replaceOutput(String oldOutputKey, String newOutputKey) {
     }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Fri Apr 11 14:57:15 2014
@@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Fri Apr 11 14:57:15 2014
@@ -671,11 +671,7 @@ public class TezCompiler extends PhyPlan
 
                     tezOp.plan.addAsLeaf(lr);
                     TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, tezOp, curTezOp);
-                    if (tezOp.getSplitOperatorKey() != null) {
-                        inputKeys.add(tezOp.getSplitOperatorKey().toString());
-                    } else {
-                        inputKeys.add(tezOp.getOperatorKey().toString());
-                    }
+                    inputKeys.add(tezOp.getOperatorKey().toString());
 
                     // Configure broadcast edges for replicated tables
                     edge.dataMovementType = DataMovementType.BROADCAST;
@@ -1971,9 +1967,11 @@ public class TezCompiler extends PhyPlan
                 splitOp.setSplitter(true);
                 phyToTezOpMap.put(op, splitOp);
                 output = new POValueOutputTez(OperatorKey.genOpKey(scope));
+                output.setAlias(op.getAlias());
                 splitOp.plan.addAsLeaf(output);
             }
             curTezOp = getTezOp();
+            curTezOp.setSplitParent(splitOp.getOperatorKey());
             tezPlan.add(curTezOp);
             output.addOutputKey(curTezOp.getOperatorKey().toString());
             TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, splitOp, curTezOp);
@@ -1981,6 +1979,7 @@ public class TezCompiler extends PhyPlan
             TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.ONE_TO_ONE);
             curTezOp.setRequestedParallelismByReference(splitOp);
             POValueInputTez input = new POValueInputTez(OperatorKey.genOpKey(scope));
+            input.setAlias(op.getAlias());
             input.setInputKey(splitOp.getOperatorKey().toString());
             curTezOp.plan.addAsLeaf(input);
         } catch (Exception e) {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Fri Apr 11 14:57:15 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -92,6 +93,7 @@ import org.apache.pig.impl.util.ObjectSe
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -119,7 +121,7 @@ import org.apache.tez.runtime.library.in
 public class TezDagBuilder extends TezOpPlanVisitor {
     private static final Log log = LogFactory.getLog(TezJobControlCompiler.class);
 
-    private TezDAG dag;
+    private DAG dag;
     private Map<String, LocalResource> localResources;
     private PigContext pc;
     private Configuration globalConf;
@@ -127,7 +129,7 @@ public class TezDagBuilder extends TezOp
     private String scope;
     private NodeIdGenerator nig;
 
-    public TezDagBuilder(PigContext pc, TezOperPlan plan, TezDAG dag,
+    public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
             Map<String, LocalResource> localResources) {
         super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         this.pc = pc;
@@ -136,6 +138,14 @@ public class TezDagBuilder extends TezOp
         this.dag = dag;
         this.scope = plan.getRoots().get(0).getOperatorKey().getScope();
         this.nig = NodeIdGenerator.getGenerator();
+
+        try {
+            // Add credentials from binary token file and get tokens for namenodes
+            // specified in mapreduce.job.hdfs-servers
+            SecurityHelper.populateTokenCache(globalConf, dag.getCredentials());
+        } catch (IOException e) {
+            throw new RuntimeException("Error while fetching delegation tokens", e);
+        }
     }
 
     @Override
@@ -358,12 +368,17 @@ public class TezDagBuilder extends TezOp
         ProcessorDescriptor procDesc = new ProcessorDescriptor(
                 tezOp.getProcessorName());
 
+        // Pass physical plans to vertex as user payload.
+        JobConf payloadConf = new JobConf(ConfigurationUtil.toConfiguration(pc.getProperties(), false));
+
+        // We do this so that dag.getCredentials(), job.getCredentials(),
+        // job.getConfiguration().getCredentials() all reference the same Credentials object
+        // Unfortunately there is no setCredentials() on Job
+        payloadConf.setCredentials(dag.getCredentials());
         // We won't actually use this job, but we need it to talk with the Load Store funcs
         @SuppressWarnings("deprecation")
-        Job job = new Job(ConfigurationUtil.toConfiguration(pc.getProperties(), false));
-
-        // Pass physical plans to vertex as user payload.
-        Configuration payloadConf = job.getConfiguration();
+        Job job = new Job(payloadConf);
+        payloadConf = (JobConf) job.getConfiguration();
 
         if (tezOp.sampleOperator != null) {
             payloadConf.set("pig.sampleVertex", tezOp.sampleOperator.getOperatorKey().toString());
@@ -574,19 +589,13 @@ public class TezDagBuilder extends TezOp
                     storeOutDescriptor, MROutputCommitter.class);
         }
 
-        if (stores.size() > 0) {
-            new PigOutputFormat().checkOutputSpecs(job);
-        }
-
         // LoadFunc and StoreFunc add delegation tokens to Job Credentials in
         // setLocation and setStoreLocation respectively. For eg: HBaseStorage
         // InputFormat add delegation token in getSplits and OutputFormat in
         // checkOutputSpecs. For eg: FileInputFormat and FileOutputFormat
-        dag.getCredentials().addAll(job.getCredentials());
-
-        // Add credentials from binary token file and get tokens for namenodes
-        // specified in mapreduce.job.hdfs-servers
-        SecurityHelper.populateTokenCache(job.getConfiguration(), dag.getCredentials());
+        if (stores.size() > 0) {
+            new PigOutputFormat().checkOutputSpecs(job);
+        }
 
         return vertex;
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Fri Apr 11 14:57:15 2014
@@ -19,9 +19,11 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -32,6 +34,7 @@ import org.apache.tez.client.TezSession;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
@@ -51,15 +54,16 @@ public class TezJob extends ControlledJo
     private EnumSet<StatusGetOpts> statusGetOpts;
     private DAGStatus dagStatus;
     private Configuration conf;
-    private TezDAG dag;
+    private DAG dag;
     private DAGClient dagClient;
     private Map<String, LocalResource> requestAMResources;
     private TezSession tezSession;
     private boolean reuseSession;
     private TezCounters dagCounters;
-    private Map<String, Map<String, Long>> vertexCounters;
+    // Vertex, CounterGroup, Counter, Value
+    private Map<String, Map<String, Map<String, Long>>> vertexCounters;
 
-    public TezJob(TezConfiguration conf, TezDAG dag, Map<String, LocalResource> requestAMResources)
+    public TezJob(TezConfiguration conf, DAG dag, Map<String, LocalResource> requestAMResources)
             throws IOException {
         super(conf);
         this.conf = conf;
@@ -70,7 +74,7 @@ public class TezJob extends ControlledJo
         this.vertexCounters = Maps.newHashMap();
     }
 
-    public TezDAG getDag() {
+    public DAG getDag() {
         return dag;
     }
 
@@ -82,8 +86,12 @@ public class TezJob extends ControlledJo
         return dagCounters;
     }
 
-    public Map<String, Long> getVertexCounters(String name) {
-        return vertexCounters.get(name);
+    public Map<String, Map<String, Long>> getVertexCounters(String group) {
+        return vertexCounters.get(group);
+    }
+
+    public Map<String, Long> getVertexCounters(String group, String name) {
+        return vertexCounters.get(group).get(name);
     }
 
     @Override
@@ -149,17 +157,20 @@ public class TezJob extends ControlledJo
             String name = v.getVertexName();
             try {
                 VertexStatus s = dagClient.getVertexStatus(name, statusGetOpts);
-                Map<String, Long> cntMap = Maps.newHashMap();
                 TezCounters counters = s.getVertexCounters();
+                Map<String, Map<String, Long>> grpCounters = Maps.newHashMap();
                 Iterator<CounterGroup> grpIt = counters.iterator();
                 while (grpIt.hasNext()) {
-                    Iterator<TezCounter> cntIt = grpIt.next().iterator();
+                    CounterGroup grp = grpIt.next();
+                    Iterator<TezCounter> cntIt = grp.iterator();
+                    Map<String, Long> cntMap = Maps.newHashMap();
                     while (cntIt.hasNext()) {
                         TezCounter cnt = cntIt.next();
                         cntMap.put(cnt.getName(), cnt.getValue());
                     }
+                    grpCounters.put(grp.getName(), cntMap);
                 }
-                vertexCounters.put(name, cntMap);
+                vertexCounters.put(name, grpCounters);
             } catch (Exception e) {
                 // Don't fail the job even if vertex counters couldn't
                 // be retrieved.
@@ -201,5 +212,24 @@ public class TezJob extends ControlledJo
         }
         return jobState;
     }
+
+    @Override
+    public synchronized String getMessage() {
+        return super.getMessage() + "\n " + getDiagnostics();
+    }
+
+    private String getDiagnostics() {
+        try {
+            if (dagClient != null && dagStatus == null) {
+                dagStatus = dagClient.getDAGStatus(new HashSet<StatusGetOpts>());
+            }
+            if (dagStatus != null) {
+                return StringUtils.join(dagStatus.getDiagnostics(), "\n");
+            }
+        } catch (Exception e) {
+            //Ignore
+        }
+        return "";
+    }
 }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java Fri Apr 11 14:57:15 2014
@@ -17,26 +17,29 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.hadoop23.PigJobControl;
 import org.apache.pig.tools.pigstats.tez.TezStats;
 
 public class TezJobControl extends PigJobControl {
-    
+
+    private static final Log LOG = LogFactory.getLog(TezJobControl.class);
     private TezJobNotifier notifier = null;
     private TezStats stats = null;
 
     public TezJobControl(String groupName, int timeToSleep) {
         super(groupName, timeToSleep);
     }
-    
+
     public void setJobNotifier(TezJobNotifier notifier) {
         this.notifier = notifier;
     }
-    
+
     public void setTezStats(TezStats stats) {
         this.stats = stats;
     }
-    
+
     @Override
     public void run() {
         try {
@@ -57,13 +60,21 @@ public class TezJobControl extends PigJo
                 if (stats!=null) {
                     stats.accumulateStats(this);
                 }
-                if (notifier!=null) {
+                if (notifier != null) {
                     notifier.complete(this);
+                    notifier = null;
                 }
             }
         } catch (Exception e) {
             // should not happen
+            LOG.error("Unexpected error", e);
             throw new RuntimeException(e);
+        } finally {
+            // Try notify if not notified. Else process will hang.
+            if (notifier != null) {
+                notifier.complete(this);
+                notifier = null;
+            }
         }
     }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java Fri Apr 11 14:57:15 2014
@@ -24,11 +24,13 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.impl.PigContext;
+import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
 
 /**
@@ -47,10 +49,11 @@ public class TezJobControlCompiler {
         this.tezConf = new TezConfiguration(conf);
     }
 
-    public TezDAG buildDAG(TezOperPlan tezPlan, Map<String, LocalResource> localResources)
+    public DAG buildDAG(TezOperPlan tezPlan, Map<String, LocalResource> localResources)
             throws IOException, YarnException {
         String jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, "pig");
-        TezDAG tezDag = new TezDAG(jobName);
+        DAG tezDag = new DAG(jobName);
+        tezDag.setCredentials(new Credentials());
         TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlan, tezDag, localResources);
         dagBuilder.visit();
         return tezDag;
@@ -98,7 +101,7 @@ public class TezJobControlCompiler {
             Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
             localResources.putAll(planContainer.getLocalResources());
             localResources.putAll(tezPlan.getExtraResources());
-            TezDAG tezDag = buildDAG(tezPlan, localResources);
+            DAG tezDag = buildDAG(tezPlan, localResources);
             return new TezJob(tezConf, tezDag, localResources);
         } catch (Exception e) {
             int errCode = 2017;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Fri Apr 11 14:57:15 2014
@@ -32,6 +32,7 @@ import org.apache.pig.backend.BackendExc
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.Launcher;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.NoopFilterRemover;
 import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.UnionOptimizer;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -151,6 +152,9 @@ public class TezLauncher extends Launche
         TezCompiler comp = new TezCompiler(php, pc, tezResourceManager);
         TezOperPlan tezPlan = comp.compile();
 
+        NoopFilterRemover filter = new NoopFilterRemover(tezPlan);
+        filter.visit();
+
         boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
                 PigConfiguration.PROP_NO_COMBINER, "false"));
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Fri Apr 11 14:57:15 2014
@@ -142,7 +142,7 @@ public class TezOperPlan extends Operato
     @Override
     public boolean disconnect(TezOperator from, TezOperator to) {
         from.outEdges.remove(to.getOperatorKey());
-        to.outEdges.remove(from.getOperatorKey());
+        to.inEdges.remove(from.getOperatorKey());
         return super.disconnect(from, to);
     }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Fri Apr 11 14:57:15 2014
@@ -70,14 +70,12 @@ public class TezOperator extends Operato
     //int requestedMemory = 1024;
     //int requestedCpu = 1;
 
-    // Presence indicates that this TezOper is sub-plan of a POSplit.
-    // This is in-case when multi-query is turned on
-    // Only POStore or POLocalRearrange leaf can be a sub-plan of POSplit
-    private OperatorKey splitOperatorKey = null;
-
     // This indicates that this TezOper is a split operator
     private boolean splitter;
 
+    // This indicates that this TezOper has POSplit as a predecessor.
+    private OperatorKey splitParent = null;
+
     // Indicates that the plan creation is complete
     boolean closed = false;
 
@@ -177,12 +175,12 @@ public class TezOperator extends Operato
         this.requestedParallelism = oper.requestedParallelism;
     }
 
-    public OperatorKey getSplitOperatorKey() {
-        return splitOperatorKey;
+    public OperatorKey getSplitParent() {
+        return splitParent;
     }
 
-    public void setSplitOperatorKey(OperatorKey splitOperatorKey) {
-        this.splitOperatorKey = splitOperatorKey;
+    public void setSplitParent(OperatorKey splitParent) {
+        this.splitParent = splitParent;
     }
 
     public void setSplitter(boolean spl) {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Fri Apr 11 14:57:15 2014
@@ -86,6 +86,8 @@ public class TezSessionManager {
         while (true) {
             TezSessionStatus status = tezSession.getSessionStatus();
             if (status.equals(TezSessionStatus.SHUTDOWN)) {
+                //TODO: TEZ-1017 Show diagnostics message
+                //log.error("TezSession has already shutdown. Diagnostics: " + tezSession.getSessionDiagnostics());
                 throw new RuntimeException("TezSession has already shutdown");
             }
             if (status.equals(TezSessionStatus.READY)) {

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java?rev=1586670&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java Fri Apr 11 14:57:15 2014
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.optimizers;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * For historical reasons splits will always produce filters that pass
+ * everything through unchanged. This optimizer removes these.
+ *
+ * The condition we look for is POFilters with a constant boolean
+ * (true) expression as it's plan.
+ */
+public class NoopFilterRemover extends TezOpPlanVisitor {
+
+    private static Log LOG = LogFactory.getLog(NoopFilterRemover.class);
+
+    public NoopFilterRemover(TezOperPlan plan) {
+        super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+    }
+
+    @Override
+    public void visitTezOp(TezOperator tezOp) throws VisitorException {
+        if (tezOp.getSplitParent() == null) {
+            return;
+        }
+        try {
+            List<POFilter> filters = PlanHelper.getPhysicalOperators(tezOp.plan, POFilter.class);
+            for (POFilter filter : filters) {
+                PhysicalPlan filterPlan = filter.getPlan();
+                if (filterPlan.size() == 1) {
+                    PhysicalOperator fp = filterPlan.getRoots().get(0);
+                    if (fp instanceof ConstantExpression) {
+                        ConstantExpression exp = (ConstantExpression)fp;
+                        Object value = exp.getValue();
+                        if (value instanceof Boolean) {
+                            Boolean filterValue = (Boolean)value;
+                            if (filterValue) {
+                                tezOp.plan.removeAndReconnect(filter);
+                            }
+                        }
+                    }
+                }
+            }
+        } catch (PlanException e) {
+            throw new VisitorException(e);
+        }
+    }
+}

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java Fri Apr 11 14:57:15 2014
@@ -18,19 +18,27 @@
 
 package org.apache.pig.tools.pigstats;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Counters;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigStatsOutputSizeReader;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.PlanVisitor;
-import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 
 /**
@@ -41,6 +49,8 @@ import org.apache.pig.tools.pigstats.Pig
 @InterfaceStability.Evolving
 public abstract class JobStats extends Operator {
 
+    private static final Log LOG = LogFactory.getLog(JobStats.class);
+
     public static final String ALIAS = "JobStatistics:alias";
     public static final String ALIAS_LOCATION = "JobStatistics:alias_location";
     public static final String FEATURE = "JobStatistics:feature";
@@ -321,4 +331,37 @@ public abstract class JobStats extends O
     @Deprecated
     abstract public Map<String, Long> getMultiInputCounters();
 
+    /**
+     * Looks up the output size reader from OUTPUT_SIZE_READER_KEY and invokes
+     * it to get the size of output. If OUTPUT_SIZE_READER_KEY is not set,
+     * defaults to FileBasedOutputSizeReader.
+     * @param sto POStore
+     * @param conf configuration
+     */
+    public static long getOutputSize(POStore sto, Configuration conf) {
+        PigStatsOutputSizeReader reader = null;
+        String readerNames = conf.get(
+                PigStatsOutputSizeReader.OUTPUT_SIZE_READER_KEY,
+                FileBasedOutputSizeReader.class.getCanonicalName());
+
+        for (String className : readerNames.split(",")) {
+            reader = (PigStatsOutputSizeReader) PigContext.instantiateFuncFromSpec(className);
+            if (reader.supports(sto)) {
+                LOG.info("using output size reader: " + className);
+                try {
+                    return reader.getOutputSize(sto, conf);
+                } catch (FileNotFoundException e) {
+                    LOG.warn("unable to find the output file", e);
+                    return -1;
+                } catch (IOException e) {
+                    LOG.warn("unable to get byte written of the job", e);
+                    return -1;
+                }
+            }
+        }
+
+        LOG.warn("unable to find an output size reader");
+        return -1;
+    }
+
 }

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Fri Apr 11 14:57:15 2014
@@ -23,8 +23,8 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
-import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
 
 /**
@@ -45,31 +45,12 @@ public class PigStatsUtil {
     public static final String HDFS_BYTES_READ
             = "HDFS_BYTES_READ";
 
-    /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_INPUT_RECORD_COUNTER} instead.
-     */
-    @Deprecated
     public static final String MULTI_INPUTS_RECORD_COUNTER
             = "Input records from ";
-
-    /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_INPUT_COUNTER_GROUP} instead.
-     */
-    @Deprecated
     public static final String MULTI_INPUTS_COUNTER_GROUP
             = "MultiInputCounters";
-
-    /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_STORE_RECORD_COUNTER} instead.
-     */
-    @Deprecated
     public static final String MULTI_STORE_RECORD_COUNTER
             = "Output records in ";
-
-    /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_STORE_COUNTER_GROUP} instead.
-     */
-    @Deprecated
     public static final String MULTI_STORE_COUNTER_GROUP
             = "MultiStoreCounters";
 
@@ -149,4 +130,56 @@ public class PigStatsUtil {
         PigStats.start(new EmbeddedPigStats(statsMap));
     }
 
+    /**
+     * Returns the counter name for the given input file name
+     *
+     * @param fname the input file name
+     * @return the counter name
+     */
+    public static String getMultiInputsCounterName(String fname, int index) {
+        String shortName = getShortName(fname);
+        return (shortName == null) ? null
+                : MULTI_INPUTS_RECORD_COUNTER + "_" + index + "_" + shortName;
+    }
+
+    /**
+     * Returns the counter name for the given {@link POStore}
+     *
+     * @param store the POStore
+     * @return the counter name
+     */
+    public static String getMultiStoreCounterName(POStore store) {
+        String shortName = getShortName(store.getSFile().getFileName());
+        return (shortName == null) ? null
+                : MULTI_STORE_RECORD_COUNTER + "_" + store.getIndex() + "_" + shortName;
+    }
+
+    // Restrict total string size of a counter name to 64 characters.
+    // Leave 24 characters for prefix string.
+    private static final int COUNTER_NAME_LIMIT = 40;
+    private static final String SEPARATOR = "/";
+    private static final String SEMICOLON = ";";
+
+    private static String getShortName(String uri) {
+        int scolon = uri.indexOf(SEMICOLON);
+        int slash;
+        if (scolon!=-1) {
+            slash = uri.lastIndexOf(SEPARATOR, scolon);
+        } else {
+            slash = uri.lastIndexOf(SEPARATOR);
+        }
+        String shortName = null;
+        if (scolon==-1) {
+            shortName = uri.substring(slash+1);
+        }
+        if (slash < scolon) {
+            shortName = uri.substring(slash+1, scolon);
+        }
+        if (shortName != null && shortName.length() > COUNTER_NAME_LIMIT) {
+            shortName = shortName.substring(shortName.length()
+                    - COUNTER_NAME_LIMIT);
+        }
+        return shortName;
+    }
+
 }

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Fri Apr 11 14:57:15 2014
@@ -18,7 +18,6 @@
 
 package org.apache.pig.tools.pigstats.mapreduce;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -31,24 +30,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskReport;
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.pig.PigCounters;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigStatsOutputSizeReader;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
-import org.apache.pig.newplan.PlanVisitor;
-import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.newplan.PlanVisitor;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
@@ -118,58 +114,80 @@ public final class MRJobStats extends Jo
 
     private Counters counters = null;
 
+    @Override
     public String getJobId() {
         return (jobId == null) ? null : jobId.toString();
     }
 
+    @Override
     public int getNumberMaps() { return numberMaps; }
 
+    @Override
     public int getNumberReduces() { return numberReduces; }
 
+    @Override
     public long getMaxMapTime() { return maxMapTime; }
 
+    @Override
     public long getMinMapTime() { return minMapTime; }
 
+    @Override
     public long getAvgMapTime() { return avgMapTime; }
 
+    @Override
     public long getMaxReduceTime() { return maxReduceTime; }
 
+    @Override
     public long getMinReduceTime() { return minReduceTime; }
 
+    @Override
     public long getAvgReduceTime() { return avgReduceTime; }
 
+    @Override
     public long getMapInputRecords() { return mapInputRecords; }
 
+    @Override
     public long getMapOutputRecords() { return mapOutputRecords; }
 
+    @Override
     public long getReduceInputRecords() { return reduceInputRecords; }
 
+    @Override
     public long getReduceOutputRecords() { return reduceOutputRecords; }
 
+    @Override
     public long getSMMSpillCount() { return spillCount; }
 
+    @Override
     public long getProactiveSpillCountObjects() { return activeSpillCountObj; }
 
+    @Override
     public long getProactiveSpillCountRecs() { return activeSpillCountRecs; }
 
+    @Override
     public Counters getHadoopCounters() { return counters; }
 
+    @Override
     public Map<String, Long> getMultiStoreCounters() {
         return Collections.unmodifiableMap(multiStoreCounters);
     }
 
+    @Override
     public Map<String, Long> getMultiInputCounters() {
         return Collections.unmodifiableMap(multiInputCounters);
     }
 
+    @Override
     public String getAlias() {
         return (String)getAnnotation(ALIAS);
     }
 
+    @Override
     public String getAliasLocation() {
         return (String)getAnnotation(ALIAS_LOCATION);
     }
 
+    @Override
     public String getFeature() {
         return (String)getAnnotation(FEATURE);
     }
@@ -219,6 +237,7 @@ public final class MRJobStats extends Jo
         medianReduceTime = median;
     }
 
+    @Override
     public String getDisplayString(boolean local) {
         StringBuilder sb = new StringBuilder();
         String id = (jobId == null) ? "N/A" : jobId.toString();
@@ -420,39 +439,6 @@ public final class MRJobStats extends Jo
         }
     }
 
-    /**
-     * Looks up the output size reader from OUTPUT_SIZE_READER_KEY and invokes
-     * it to get the size of output. If OUTPUT_SIZE_READER_KEY is not set,
-     * defaults to FileBasedOutputSizeReader.
-     * @param sto POStore
-     * @param conf configuration
-     */
-    static long getOutputSize(POStore sto, Configuration conf) {
-        PigStatsOutputSizeReader reader = null;
-        String readerNames = conf.get(
-                PigStatsOutputSizeReader.OUTPUT_SIZE_READER_KEY,
-                FileBasedOutputSizeReader.class.getCanonicalName());
-
-        for (String className : readerNames.split(",")) {
-            reader = (PigStatsOutputSizeReader) PigContext.instantiateFuncFromSpec(className);
-            if (reader.supports(sto)) {
-                LOG.info("using output size reader: " + className);
-                try {
-                    return reader.getOutputSize(sto, conf);
-                } catch (FileNotFoundException e) {
-                    LOG.warn("unable to find the output file", e);
-                    return -1;
-                } catch (IOException e) {
-                    LOG.warn("unable to get byte written of the job", e);
-                    return -1;
-                }
-            }
-        }
-
-        LOG.warn("unable to find an output size reader");
-        return -1;
-    }
-
     private void addOneOutputStats(POStore sto) {
         long records = -1;
         if (sto.isMultiStore()) {

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java Fri Apr 11 14:57:15 2014
@@ -20,6 +20,7 @@ package org.apache.pig.tools.pigstats.ma
 
 import java.io.IOException;
 import java.util.Map;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.Counters;
@@ -32,16 +33,13 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.classification.InterfaceAudience.Private;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.tools.pigstats.PigStats.JobGraph;
-import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
-import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
+import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
-import org.apache.pig.tools.pigstats.JobStats;
 
 
 
@@ -54,21 +52,9 @@ public class MRPigStatsUtil extends PigS
             = "org.apache.hadoop.mapred.Task$Counter";
     public static final String FS_COUNTER_GROUP
             = HadoopShims.getFsCounterGroupName();
-    public static final String MULTI_INPUTS_RECORD_COUNTER
-            = "Input records from ";
-    public static final String MULTI_INPUTS_COUNTER_GROUP
-            = "MultiInputCounters";
-    public static final String MULTI_STORE_RECORD_COUNTER
-            = "Output records in ";
-    public static final String MULTI_STORE_COUNTER_GROUP
-            = "MultiStoreCounters";
 
     private static final Log LOG = LogFactory.getLog(MRPigStatsUtil.class);
 
-    // Restrict total string size of a counter name to 64 characters.
-    // Leave 24 characters for prefix string.
-    private static final int COUNTER_NAME_LIMIT = 40;
-
     /**
      * Returns the count for the given counter name in the counter group
      * 'MultiStoreCounters'
@@ -95,55 +81,6 @@ public class MRPigStatsUtil extends PigS
     }
 
     /**
-     * Returns the counter name for the given {@link POStore}
-     *
-     * @param store the POStore
-     * @return the counter name
-     */
-    public static String getMultiStoreCounterName(POStore store) {
-        String shortName = getShortName(store.getSFile().getFileName());
-        return (shortName == null) ? null
-                : MULTI_STORE_RECORD_COUNTER + "_" + store.getIndex() + "_" + shortName;
-    }
-
-    /**
-     * Returns the counter name for the given input file name
-     *
-     * @param fname the input file name
-     * @return the counter name
-     */
-    public static String getMultiInputsCounterName(String fname, int index) {
-        String shortName = getShortName(fname);
-        return (shortName == null) ? null
-                : MULTI_INPUTS_RECORD_COUNTER + "_" + index + "_" + shortName;
-    }
-
-    private static final String SEPARATOR = "/";
-    private static final String SEMICOLON = ";";
-
-    private static String getShortName(String uri) {
-        int scolon = uri.indexOf(SEMICOLON);
-        int slash;
-        if (scolon!=-1) {
-            slash = uri.lastIndexOf(SEPARATOR, scolon);
-        } else {
-            slash = uri.lastIndexOf(SEPARATOR);
-        }
-        String shortName = null;
-        if (scolon==-1) {
-            shortName = uri.substring(slash+1);
-        }
-        if (slash < scolon) {
-            shortName = uri.substring(slash+1, scolon);
-        }
-        if (shortName != null && shortName.length() > COUNTER_NAME_LIMIT) {
-            shortName = shortName.substring(shortName.length()
-                    - COUNTER_NAME_LIMIT);
-        }
-        return shortName;
-    }
-
-    /**
      * Starts collecting statistics for the given MR plan
      *
      * @param pc the Pig context

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java Fri Apr 11 14:57:15 2014
@@ -23,6 +23,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -44,6 +45,9 @@ import org.apache.pig.tools.pigstats.Out
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
@@ -55,12 +59,9 @@ import com.google.common.collect.Maps;
 public class TezStats extends PigStats {
     private static final Log LOG = LogFactory.getLog(TezStats.class);
 
-    public static final String DAG_COUNTER =
-            "org.apache.tez.common.counters.DAGCounter";
-    public static final String FS_COUNTER =
-            "org.apache.tez.common.counters.FileSystemCounter";
-    public static final String TASK_COUNTER =
-            "org.apache.tez.common.counters.TaskCounter";
+    public static final String DAG_COUNTER_GROUP = DAGCounter.class.getName();
+    public static final String FS_COUNTER_GROUP = FileSystemCounter.class.getName();
+    public static final String TASK_COUNTER_GROUP = TaskCounter.class.getName();
 
     private List<String> dagStatsStrings;
     private Map<String, TezTaskStats> tezOpVertexMap;
@@ -152,7 +153,9 @@ public class TezStats extends PigStats {
                 String[] lines = errorMessage.split("\n");
                 for (int i = 0; i < lines.length; i++) {
                     String s = lines[i].trim();
-                    sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "ErrorMessage" : "", s));
+                    if (i == 0 || !StringUtils.isEmpty(s)) {
+                        sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "ErrorMessage" : "", s));
+                    }
                 }
                 sb.append("\n");
             }
@@ -163,15 +166,14 @@ public class TezStats extends PigStats {
             sb.append("\n");
         }
 
-        List<InputStats> is = getInputStats();
-        for (int i = 0; i < is.size(); i++) {
-            String s = is.get(i).getDisplayString(isLocal).trim();
-            sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "Input(s)" : "", s));
-        }
-        List<OutputStats> os = getOutputStats();
-        for (int i = 0; i < os.size(); i++) {
-            String s = os.get(i).getDisplayString(isLocal).trim();
-            sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "Output(s)" : "", s));
+        sb.append("Input(s):\n");
+        for (InputStats is : getInputStats()) {
+            sb.append(is.getDisplayString(isLocal).trim()).append("\n");
+        }
+        sb.append("\n");
+        sb.append("Output(s):\n");
+        for (OutputStats os : getOutputStats()) {
+            sb.append(os.getDisplayString(isLocal).trim()).append("\n");
         }
         LOG.info("Script Statistics:\n" + sb.toString());
     }
@@ -203,33 +205,42 @@ public class TezStats extends PigStats {
             }
         }
         if (!succeeded) {
-            errorMessage = tezJob.getMessage();
+            errorMessage = tezJob.getMessage().trim();
         }
     }
 
     private void addVertexStats(String tezOpName, Configuration conf, boolean succeeded,
-            Map<String, Long> counters) {
+            Map<String, Map<String, Long>> map) {
         TezTaskStats stats = tezOpVertexMap.get(tezOpName);
         stats.setConf(conf);
         stats.setId(tezOpName);
         stats.setSuccessful(succeeded);
-        stats.addInputStatistics(counters);
-        stats.addOutputStatistics(counters);
+        if (map == null) {
+            if (stats.hasLoadOrStore()) {
+                LOG.warn("Unable to get input(s)/output(s) of the job");
+            }
+        } else {
+            stats.addInputStatistics(map);
+            stats.addOutputStatistics(map);
+        }
     }
 
     private static String getDisplayString(TezJob tezJob) {
         StringBuilder sb = new StringBuilder();
         TezCounters cnt = tezJob.getDagCounters();
+        if (cnt == null) {
+            return "";
+        }
 
         sb.append(String.format("%1$20s: %2$-100s%n", "JobId",
                 tezJob.getJobID()));
 
-        CounterGroup dagGrp = cnt.getGroup(DAG_COUNTER);
+        CounterGroup dagGrp = cnt.getGroup(DAG_COUNTER_GROUP);
         TezCounter numTasks = dagGrp.findCounter("TOTAL_LAUNCHED_TASKS");
         sb.append(String.format("%1$20s: %2$-100s%n", "TotalLaunchedTasks",
                 numTasks.getValue()));
 
-        CounterGroup fsGrp = cnt.getGroup(FS_COUNTER);
+        CounterGroup fsGrp = cnt.getGroup(FS_COUNTER_GROUP);
         TezCounter bytesRead = fsGrp.findCounter("FILE_BYTES_READ");
         TezCounter bytesWritten = fsGrp.findCounter("FILE_BYTES_WRITTEN");
         sb.append(String.format("%1$20s: %2$-100s%n", "FileBytesRead",

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java Fri Apr 11 14:57:15 2014
@@ -1,5 +1,8 @@
 package org.apache.pig.tools.pigstats.tez;
 
+import static org.apache.pig.tools.pigstats.tez.TezStats.FS_COUNTER_GROUP;
+import static org.apache.pig.tools.pigstats.tez.TezStats.TASK_COUNTER_GROUP;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -20,12 +23,12 @@ import org.apache.pig.tools.pigstats.Out
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.tez.common.counters.TaskCounter;
 
 public class TezTaskStats extends JobStats {
     private static final Log LOG = LogFactory.getLog(TezTaskStats.class);
 
     private String vertexName;
-    private Configuration conf;
     private List<POStore> stores = null;
     private List<FileSpec> loads = null;
 
@@ -63,6 +66,7 @@ public class TezTaskStats extends JobSta
         return sb.toString();
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public void setConf(Configuration conf) {
         super.setConf(conf);
@@ -78,9 +82,8 @@ public class TezTaskStats extends JobSta
         }
     }
 
-    public void addInputStatistics(Map<String, Long> counters) {
-        if (inputs == null) {
-            LOG.warn("Unable to get inputs of the job");
+    public void addInputStatistics(Map<String, Map<String, Long>> map) {
+        if (loads == null) {
             return;
         }
 
@@ -88,11 +91,19 @@ public class TezTaskStats extends JobSta
             long records = -1;
             long hdfsBytesRead = -1;
             String filename = fs.getFileName();
-            if (counters.get(PigStatsUtil.MAP_INPUT_RECORDS) != null) {
-                records = counters.get(PigStatsUtil.MAP_INPUT_RECORDS);
+            Map<String, Long> taskCounter = map.get(TASK_COUNTER_GROUP);
+            if (taskCounter != null) {
+                //TaskCounter.INPUT_RECORDS_PROCESSED.name()
+                if (taskCounter.get("INPUT_RECORDS_PROCESSED") != null) {
+                    records = taskCounter.get("INPUT_RECORDS_PROCESSED");
+                } else if (taskCounter.get(PigStatsUtil.MAP_INPUT_RECORDS) != null) {
+                    // Tez 0.3 has MAP_INPUT_RECORDS TODO: Remove after we move away from Tez 0.3
+                    records = taskCounter.get(PigStatsUtil.MAP_INPUT_RECORDS);
+                }
             }
-            if (counters.get(PigStatsUtil.HDFS_BYTES_READ) != null) {
-                hdfsBytesRead = counters.get(PigStatsUtil.HDFS_BYTES_READ);
+            if (map.get(FS_COUNTER_GROUP) !=null &&
+                    map.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
+                hdfsBytesRead = map.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ);
             }
             InputStats is = new InputStats(filename, hdfsBytesRead,
                     records, (state == JobState.SUCCESS));
@@ -101,22 +112,32 @@ public class TezTaskStats extends JobSta
         }
     }
 
-    public void addOutputStatistics(Map<String, Long> counters) {
+    public void addOutputStatistics(Map<String, Map<String, Long>> map) {
         if (stores == null) {
-            LOG.warn("Unable to get stores of the job");
             return;
         }
 
         for (POStore sto : stores) {
             long records = -1;
-            long hdfsBytesWritten = -1;
+            long hdfsBytesWritten = JobStats.getOutputSize(sto, conf);
             String filename = sto.getSFile().getFileName();
-            if (counters.get(PigStatsUtil.MAP_OUTPUT_RECORDS) != null) {
-                records = counters.get(PigStatsUtil.MAP_OUTPUT_RECORDS);
+            if (sto.isMultiStore()) {
+                Long n = map.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP).get(PigStatsUtil.getMultiStoreCounterName(sto));
+                if (n != null) records = n;
+            } else if (map.get(TASK_COUNTER_GROUP) != null) {
+                if(map.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name()) != null) {
+                    records = map.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name());
+                } else if(map.get(TASK_COUNTER_GROUP).get(PigStatsUtil.MAP_OUTPUT_RECORDS) != null) {
+                    // Tez 0.3 has MAP_OUTPUT_RECORDS TODO: Remove after we move away from Tez 0.3
+                    records = map.get(TASK_COUNTER_GROUP).get(PigStatsUtil.MAP_OUTPUT_RECORDS);
+                }
             }
-            if (counters.get(PigStatsUtil.HDFS_BYTES_WRITTEN) != null) {
-                hdfsBytesWritten = counters.get(PigStatsUtil.HDFS_BYTES_WRITTEN);
+            /*
+            if (map.get(FS_COUNTER_GROUP)!= null &&
+                    map.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN) != null) {
+                hdfsBytesWritten = map.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN);
             }
+            */
             OutputStats os = new OutputStats(filename, hdfsBytesWritten,
                     records, (state == JobState.SUCCESS));
             os.setPOStore(sto);
@@ -232,4 +253,12 @@ public class TezTaskStats extends JobSta
     public Map<String, Long> getMultiInputCounters() {
         throw new UnsupportedOperationException();
     }
+
+    public boolean hasLoadOrStore() {
+        if ((loads != null && !loads.isEmpty())
+                || (stores != null && !stores.isEmpty())) {
+            return true;
+        }
+        return false;
+    }
 }

Modified: pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java?rev=1586670&r1=1586669&r2=1586670&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java Fri Apr 11 14:57:15 2014
@@ -38,8 +38,11 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.plan.VisitorException;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
+// ant sends abstract classes also to junit. skipNonTests attribute does not work with all ant versions
+@Ignore
 public abstract class TestSecondarySort {
     protected static MiniGenericCluster cluster = null;
     protected PigServer pigServer;
@@ -476,7 +479,7 @@ public abstract class TestSecondarySort 
 
         Util.deleteFile(cluster, clusterFilePath);
     }
-    
+
     @Test
     // Once custom partitioner is used, we cannot use secondary key optimizer, see PIG-3827
     public void testCustomPartitionerWithSort() throws Exception {
@@ -502,9 +505,9 @@ public abstract class TestSecondarySort 
         } catch (Exception e) {
             captureException = true;
         }
-        
+
         assertTrue(captureException);
-        
+
         Util.deleteFile(cluster, clusterPath);
     }
 }

Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld?rev=1586670&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld Fri Apr 11 14:57:15 2014
@@ -0,0 +1,65 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-66
+#--------------------------------------------------
+Tez vertex scope-58	->	Tez vertex scope-60,Tez vertex scope-62,Tez vertex scope-64,
+Tez vertex scope-64
+Tez vertex scope-62
+Tez vertex scope-60
+
+Tez vertex scope-58
+# Plan on vertex
+POValueOutputTez - scope-59	->	 [scope-64, scope-62, scope-60]
+|
+|---a: New For Each(false,false)[bag] - scope-41
+    |   |
+    |   Cast[int] - scope-36
+    |   |
+    |   |---Project[bytearray][0] - scope-35
+    |   |
+    |   Cast[int] - scope-39
+    |   |
+    |   |---Project[bytearray][1] - scope-38
+    |
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-34
+Tez vertex scope-64
+# Plan on vertex
+d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-57
+|
+|---d: Filter[bag] - scope-53
+    |   |
+    |   Greater Than[boolean] - scope-56
+    |   |
+    |   |---Project[int][0] - scope-54
+    |   |
+    |   |---Constant(10) - scope-55
+    |
+    |---POValueInputTez - scope-65	<-	 scope-58
+Tez vertex scope-62
+# Plan on vertex
+c: Store(file:///tmp/output/c:org.apache.pig.builtin.PigStorage) - scope-52
+|
+|---c: Filter[bag] - scope-48
+    |   |
+    |   Less Than or Equal[boolean] - scope-51
+    |   |
+    |   |---Project[int][0] - scope-49
+    |   |
+    |   |---Constant(10) - scope-50
+    |
+    |---POValueInputTez - scope-63	<-	 scope-58
+Tez vertex scope-60
+# Plan on vertex
+b: Store(file:///tmp/output/b:org.apache.pig.builtin.PigStorage) - scope-47
+|
+|---b: Filter[bag] - scope-43
+    |   |
+    |   Less Than or Equal[boolean] - scope-46
+    |   |
+    |   |---Project[int][0] - scope-44
+    |   |
+    |   |---Constant(5) - scope-45
+    |
+    |---POValueInputTez - scope-61	<-	 scope-58

Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld?rev=1586670&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld Fri Apr 11 14:57:15 2014
@@ -0,0 +1,53 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-33
+#--------------------------------------------------
+Tez vertex scope-24
+
+Tez vertex scope-24
+# Plan on vertex
+1-1: Split - scope-32
+|   |
+|   b: Store(file:///tmp/output/b:org.apache.pig.builtin.PigStorage) - scope-13
+|   |
+|   |---b: Filter[bag] - scope-9
+|       |   |
+|       |   Less Than or Equal[boolean] - scope-12
+|       |   |
+|       |   |---Project[int][0] - scope-10
+|       |   |
+|       |   |---Constant(5) - scope-11
+|   |
+|   c: Store(file:///tmp/output/c:org.apache.pig.builtin.PigStorage) - scope-18
+|   |
+|   |---c: Filter[bag] - scope-14
+|       |   |
+|       |   Less Than or Equal[boolean] - scope-17
+|       |   |
+|       |   |---Project[int][0] - scope-15
+|       |   |
+|       |   |---Constant(10) - scope-16
+|   |
+|   d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-23
+|   |
+|   |---d: Filter[bag] - scope-19
+|       |   |
+|       |   Greater Than[boolean] - scope-22
+|       |   |
+|       |   |---Project[int][0] - scope-20
+|       |   |
+|       |   |---Constant(10) - scope-21
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0

Added: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld?rev=1586670&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld (added)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld Fri Apr 11 14:57:15 2014
@@ -0,0 +1,311 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: scope-412
+#--------------------------------------------------
+Tez vertex scope-316	->	Tez vertex scope-318,Tez vertex scope-329,Tez vertex scope-340,
+Tez vertex scope-318	->	Tez vertex scope-321,Tez vertex scope-334,Tez vertex scope-345,
+Tez vertex scope-345	->	Tez vertex scope-348,Tez vertex scope-372,
+Tez vertex scope-348	->	Tez vertex scope-366,Tez vertex scope-356,
+Tez vertex scope-356	->	Tez vertex scope-366,
+Tez vertex scope-366	->	Tez vertex scope-368,
+Tez vertex scope-368
+Tez vertex scope-340	->	Tez vertex scope-343,Tez vertex scope-384,
+Tez vertex scope-384	->	Tez vertex scope-388,
+Tez vertex scope-372	->	Tez vertex scope-376,
+Tez vertex scope-376	->	Tez vertex scope-382,Tez vertex scope-386,
+Tez vertex scope-386	->	Tez vertex scope-388,
+Tez vertex scope-388
+Tez vertex scope-329	->	Tez vertex scope-332,Tez vertex scope-337,
+Tez vertex scope-337	->	Tez vertex scope-339,
+Tez vertex scope-339
+Tez vertex scope-382
+Tez vertex scope-321	->	Tez vertex scope-323,
+Tez vertex scope-323	->	Tez vertex scope-325,Tez vertex scope-327,
+Tez vertex scope-327
+Tez vertex scope-325
+Tez vertex scope-343
+Tez vertex scope-332	->	Tez vertex scope-336,
+Tez vertex scope-334	->	Tez vertex scope-336,
+Tez vertex scope-336
+
+Tez vertex scope-316
+# Plan on vertex
+POValueOutputTez - scope-317	->	 [scope-340, scope-318, scope-329]
+|
+|---a: New For Each(false,false)[bag] - scope-217
+    |   |
+    |   Cast[int] - scope-212
+    |   |
+    |   |---Project[bytearray][0] - scope-211
+    |   |
+    |   Cast[int] - scope-215
+    |   |
+    |   |---Project[bytearray][1] - scope-214
+    |
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-210
+Tez vertex scope-318
+# Plan on vertex
+POValueOutputTez - scope-320	->	 [scope-321, scope-345, scope-334]
+|
+|---b: Filter[bag] - scope-219
+    |   |
+    |   Less Than or Equal[boolean] - scope-222
+    |   |
+    |   |---Project[int][0] - scope-220
+    |   |
+    |   |---Constant(5) - scope-221
+    |
+    |---POValueInputTez - scope-319	<-	 scope-316
+Tez vertex scope-345
+# Plan on vertex
+POValueOutputTez - scope-347	->	 [scope-348, scope-372]
+|
+|---POValueInputTez - scope-346	<-	 scope-318
+Tez vertex scope-348
+# Plan on vertex
+Local Rearrange[tuple]{tuple}(false) - scope-352	->	 scope-356
+|   |
+|   Constant(DummyVal) - scope-351
+|
+|---ReservoirSample - scope-355
+    |
+    |---New For Each(false)[tuple] - scope-354
+        |   |
+        |   Project[int][0] - scope-353
+        |
+        |---e1: Local Rearrange[tuple]{int}(false) - scope-350	->	 scope-366
+            |   |
+            |   Project[int][0] - scope-298
+            |
+            |---e: Filter[bag] - scope-294
+                |   |
+                |   Less Than[boolean] - scope-297
+                |   |
+                |   |---Project[int][0] - scope-295
+                |   |
+                |   |---Constant(3) - scope-296
+                |
+                |---POValueInputTez - scope-349	<-	 scope-345
+Tez vertex scope-356
+# Plan on vertex
+POValueOutputTez - scope-365	->	 [scope-366]
+|
+|---New For Each(false)[tuple] - scope-364
+    |   |
+    |   POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-363
+    |   |
+    |   |---Project[tuple][*] - scope-362
+    |
+    |---New For Each(false,false)[tuple] - scope-361
+        |   |
+        |   Constant(1) - scope-360
+        |   |
+        |   Project[bag][1] - scope-358
+        |
+        |---Package(Packager)[tuple]{bytearray} - scope-357
+Tez vertex scope-366
+# Plan on vertex
+POIdentityInOutTez - scope-367	<-	 scope-348	->	 scope-368
+|   |
+|   Project[int][0] - scope-298
+Tez vertex scope-368
+# Plan on vertex
+e1: Store(file:///tmp/output/e1:org.apache.pig.builtin.PigStorage) - scope-300
+|
+|---New For Each(true)[tuple] - scope-371
+    |   |
+    |   Project[bag][1] - scope-370
+    |
+    |---Package(LitePackager)[tuple]{int} - scope-369
+Tez vertex scope-340
+# Plan on vertex
+POValueOutputTez - scope-342	->	 [scope-384, scope-343]
+|
+|---d1: Filter[bag] - scope-283
+    |   |
+    |   Equal To[boolean] - scope-286
+    |   |
+    |   |---Project[int][0] - scope-284
+    |   |
+    |   |---Constant(5) - scope-285
+    |
+    |---d: Filter[bag] - scope-279
+        |   |
+        |   Greater Than[boolean] - scope-282
+        |   |
+        |   |---Project[int][0] - scope-280
+        |   |
+        |   |---Constant(10) - scope-281
+        |
+        |---POValueInputTez - scope-341	<-	 scope-316
+Tez vertex scope-384
+# Plan on vertex
+POValueOutputTez - scope-390	->	 [scope-388]
+|
+|---POValueInputTez - scope-385	<-	 scope-340
+Tez vertex scope-372
+# Plan on vertex
+f1: Local Rearrange[tuple]{tuple}(false) - scope-375	->	 scope-376
+|   |
+|   Project[tuple][*] - scope-374
+|
+|---f1: Limit - scope-305
+    |
+    |---f: Filter[bag] - scope-301
+        |   |
+        |   Greater Than or Equal[boolean] - scope-304
+        |   |
+        |   |---Project[int][0] - scope-302
+        |   |
+        |   |---Constant(3) - scope-303
+        |
+        |---POValueInputTez - scope-373	<-	 scope-345
+Tez vertex scope-376
+# Plan on vertex
+POValueOutputTez - scope-381	->	 [scope-386, scope-382]
+|
+|---f1: Limit - scope-380
+    |
+    |---f1: New For Each(true)[bag] - scope-379
+        |   |
+        |   Project[tuple][1] - scope-378
+        |
+        |---f1: Package(Packager)[tuple]{tuple} - scope-377
+Tez vertex scope-386
+# Plan on vertex
+POValueOutputTez - scope-391	->	 [scope-388]
+|
+|---POValueInputTez - scope-387	<-	 scope-376
+Tez vertex scope-388
+# Plan on vertex
+f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-315
+|
+|---POShuffledValueInputTez - scope-389	<-	 [scope-386, scope-384]
+Tez vertex scope-329
+# Plan on vertex
+POValueOutputTez - scope-331	->	 [scope-337, scope-332]
+|
+|---c: Filter[bag] - scope-244
+    |   |
+    |   Less Than or Equal[boolean] - scope-247
+    |   |
+    |   |---Project[int][0] - scope-245
+    |   |
+    |   |---Constant(10) - scope-246
+    |
+    |---POValueInputTez - scope-330	<-	 scope-316
+Tez vertex scope-337
+# Plan on vertex
+c2: Local Rearrange[tuple]{int}(false) - scope-404	->	 scope-339
+|   |
+|   Project[int][0] - scope-406
+|
+|---c3: New For Each(false,false)[bag] - scope-392
+    |   |
+    |   Project[int][0] - scope-393
+    |   |
+    |   POUserFunc(org.apache.pig.builtin.AlgebraicMathBase$Initial)[tuple] - scope-394
+    |   |
+    |   |---Project[bag][0] - scope-395
+    |       |
+    |       |---Project[bag][1] - scope-396
+    |
+    |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-407
+        |
+        |---POValueInputTez - scope-338	<-	 scope-329
+Tez vertex scope-339
+# Combine plan on edge <scope-337>
+c2: Local Rearrange[tuple]{int}(false) - scope-408	->	 scope-339
+|   |
+|   Project[int][0] - scope-410
+|
+|---c3: New For Each(false,false)[bag] - scope-397
+    |   |
+    |   Project[int][0] - scope-398
+    |   |
+    |   POUserFunc(org.apache.pig.builtin.LongSum$Intermediate)[tuple] - scope-399
+    |   |
+    |   |---Project[bag][1] - scope-400
+    |
+    |---c2: Package(CombinerPackager)[tuple]{int} - scope-403
+# Plan on vertex
+c3: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-278
+|
+|---c3: New For Each(false,false)[bag] - scope-277
+    |   |
+    |   Project[int][0] - scope-271
+    |   |
+    |   POUserFunc(org.apache.pig.builtin.LongSum$Final)[long] - scope-275
+    |   |
+    |   |---Project[bag][1] - scope-401
+    |
+    |---c2: Package(CombinerPackager)[tuple]{int} - scope-268
+Tez vertex scope-382
+# Plan on vertex
+f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-309
+|
+|---POValueInputTez - scope-383	<-	 scope-376
+Tez vertex scope-321
+# Plan on vertex
+b1: Local Rearrange[tuple]{int}(false) - scope-228	->	 scope-323
+|   |
+|   Project[int][0] - scope-229
+|
+|---POValueInputTez - scope-322	<-	 scope-318
+Tez vertex scope-323
+# Plan on vertex
+POValueOutputTez - scope-324	->	 [scope-327, scope-325]
+|
+|---b1: Package(Packager)[tuple]{int} - scope-227
+Tez vertex scope-327
+# Plan on vertex
+b2: Store(file:///tmp/output/b2:org.apache.pig.builtin.PigStorage) - scope-243
+|
+|---b2: New For Each(false,false)[bag] - scope-242
+    |   |
+    |   Project[int][0] - scope-236
+    |   |
+    |   POUserFunc(org.apache.pig.builtin.LongSum)[long] - scope-240
+    |   |
+    |   |---Project[bag][0] - scope-239
+    |       |
+    |       |---Project[bag][1] - scope-238
+    |
+    |---POValueInputTez - scope-328	<-	 scope-323
+Tez vertex scope-325
+# Plan on vertex
+b1: Store(file:///tmp/output/b1:org.apache.pig.builtin.PigStorage) - scope-233
+|
+|---POValueInputTez - scope-326	<-	 scope-323
+Tez vertex scope-343
+# Plan on vertex
+d1: Store(file:///tmp/output/d1:org.apache.pig.builtin.PigStorage) - scope-290
+|
+|---POValueInputTez - scope-344	<-	 scope-340
+Tez vertex scope-332
+# Plan on vertex
+c1: Local Rearrange[tuple]{int}(false) - scope-257	->	 scope-336
+|   |
+|   Project[int][0] - scope-258
+|
+|---POValueInputTez - scope-333	<-	 scope-329
+Tez vertex scope-334
+# Plan on vertex
+c1: Local Rearrange[tuple]{int}(false) - scope-259	->	 scope-336
+|   |
+|   Project[int][0] - scope-260
+|
+|---POValueInputTez - scope-335	<-	 scope-318
+Tez vertex scope-336
+# Plan on vertex
+c1: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-264
+|
+|---c1: New For Each(true,true)[tuple] - scope-263
+    |   |
+    |   Project[bag][1] - scope-261
+    |   |
+    |   Project[bag][2] - scope-262
+    |
+    |---c1: Package(Packager)[tuple]{int} - scope-256