You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/08/13 20:27:45 UTC

svn commit: r1617791 - in /pig/branches/maven: ./ pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/ pig/src/main/java/org/apache/pig/b...

Author: cheolsoo
Date: Wed Aug 13 18:27:45 2014
New Revision: 1617791

URL: http://svn.apache.org/r1617791
Log:
Merge latest trunk changes

Modified:
    pig/branches/maven/   (props changed)
    pig/branches/maven/CHANGES.txt
    pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
    pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java
    pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java
    pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
    pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java
    pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
    pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java
    pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java
    pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java
    pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java
    pig/branches/maven/pig/src/main/java/org/apache/pig/impl/PigImplConstants.java
    pig/branches/maven/pig/src/main/java/org/apache/pig/impl/io/FileLocalizer.java
    pig/branches/maven/pig/src/main/java/org/apache/pig/scripting/ScriptingOutputCapturer.java
    pig/branches/maven/pig/src/main/java/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
    pig/branches/maven/pig/test/e2e/harness/TestDriver.pm
    pig/branches/maven/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2   (props changed)

Propchange: pig/branches/maven/
------------------------------------------------------------------------------
  Merged /pig/trunk:r1617330-1617783

Modified: pig/branches/maven/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/maven/CHANGES.txt?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/CHANGES.txt (original)
+++ pig/branches/maven/CHANGES.txt Wed Aug 13 18:27:45 2014
@@ -24,6 +24,10 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4117: Implement merge cogroup in Tez (daijy)
+
+PIG-4119: Add message at end of each testcase with timestamp in Pig system tests (nmaheshwari via daijy)
+
 PIG-4008: Pig code change to enable Tez Local mode (airbots via daijy)
 
 PIG-4091: Predicate pushdown for ORC (rohini via daijy)
@@ -56,6 +60,12 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4061: Make Streaming UDF work in Tez (daijy)
+
+PIG-4118: Fix hadoopversion 23 compilation due to TEZ-1237/TEZ-1407 (daijy)
+
+PIG-4109: register local jar fail on Windows when Pig script is remote (daijy)
+
 PIG-4116: Update Pig doc about Hadoop 2 Streaming Python UDF support (cheolsoo)
 
 PIG-4112: NPE in packager when union + group-by followed by replicated join in Tez (rohini via cheolsoo)

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java Wed Aug 13 18:27:45 2014
@@ -22,8 +22,6 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Properties;
@@ -51,14 +49,12 @@ import org.apache.pig.data.InternalCache
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.impl.util.Pair;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
 
 public class POMergeCogroup extends PhysicalOperator {
 
@@ -107,6 +103,8 @@ public class POMergeCogroup extends Phys
     // call or not.
     private transient boolean workingOnNewKey;
 
+    private byte endOfRecordMark = POStatus.STATUS_EOP;
+
     public POMergeCogroup(OperatorKey k,List<PhysicalOperator> inpPOs, 
             POLocalRearrange[] lrs, int parallel) {
 
@@ -115,6 +113,14 @@ public class POMergeCogroup extends Phys
         for(int i=0; i < lrs.length; i++)
             LRs[i].setStripKeyFromValue(false);
     }
+    
+    // Set to POStatus.STATUS_EOP (default) for MR and POStatus.STATUS_NULL for Tez.
+    // This is because:
+    // For MR, we send EOP at the end of every record
+    // For Tez, we only use a global EOP, so send NULL for end of record
+    public void setEndOfRecordMark(byte endOfRecordMark) {
+        this.endOfRecordMark = endOfRecordMark;
+    }
 
     @Override
     public Result getNextTuple() throws ExecException {
@@ -240,7 +246,7 @@ public class POMergeCogroup extends Phys
                     if( (null != tuple) && (Byte)tuple.get(0) == 0){
                         // Is key null for both tuples.
                         if(prevTopOfHeap.get(1) == null && tuple.get(1) == null){
-                            return new Result(POStatus.STATUS_EOP,null); 
+                            return new Result(endOfRecordMark,null); 
                         }
                         // Does key change from non-null to null or from one
                         // non-null to another non-null.
@@ -250,7 +256,7 @@ public class POMergeCogroup extends Phys
                     }
                     // Either top of heap is from different relation or it is
                     // from left relation but having the same key.
-                    return new Result(POStatus.STATUS_EOP,null); 
+                    return new Result(endOfRecordMark,null); 
                 }
                 
                 Tuple nxtTuple = sideLoaders.get(relIdx-1).getNext();
@@ -317,7 +323,12 @@ public class POMergeCogroup extends Phys
     private void setup(Tuple firstRearrangedTup) throws IOException{
 
         // Read our own split Index.
-        int  curSplitIdx = ((PigSplit)((Context)PigMapReduce.sJobContext).getInputSplit()).getSplitIndex();
+        int  curSplitIdx = 0;
+        if (PigMapReduce.sJobContext.getConfiguration().get(PigImplConstants.PIG_SPLIT_INDEX)!=null) {
+            curSplitIdx = Integer.parseInt(PigMapReduce.sJobContext.getConfiguration().get(PigImplConstants.PIG_SPLIT_INDEX));
+        } else {
+            curSplitIdx = ((PigSplit)((Context)PigMapReduce.sJobContext).getInputSplit()).getSplitIndex();
+        }
         Object firstBaseKey = firstRearrangedTup.get(1);
         List<Pair<Integer,Tuple>> index = readIndex();
 

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java Wed Aug 13 18:27:45 2014
@@ -25,13 +25,17 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.lib.MRReader;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
@@ -78,6 +82,12 @@ public class POSimpleTezLoad extends POL
         input = (MRInput) logInput;
         try {
             reader = input.getReader();
+            // Set split index, MergeCoGroup need it. And this input is the only input of the
+            // MergeCoGroup vertex.
+            if (reader instanceof MRReader) {
+                int splitIndex = ((PigSplit)((MRReader)reader).getSplit()).getSplitIndex();
+                PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX, splitIndex);
+            }
         } catch (IOException e) {
             throw new ExecException(e);
         }

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java Wed Aug 13 18:27:45 2014
@@ -32,7 +32,7 @@ import org.apache.tez.common.counters.Co
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
 /**
@@ -70,7 +70,7 @@ public class POStoreTez extends POStore 
     }
 
     @Override
-    public void initialize(TezProcessorContext processorContext)
+    public void initialize(ProcessorContext processorContext)
             throws ExecException {
         if (isMultiStore()) {
             CounterGroup multiStoreGroup = processorContext.getCounters()

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java Wed Aug 13 18:27:45 2014
@@ -40,7 +40,7 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
 public class POValueOutputTez extends PhysicalOperator implements TezOutput, TezTaskConfigurable {
@@ -81,7 +81,7 @@ public class POValueOutputTez extends Ph
     }
 
     @Override
-    public void initialize(TezProcessorContext processorContext)
+    public void initialize(ProcessorContext processorContext)
             throws ExecException {
         taskIndex = processorContext.getTaskIndex();
     }

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java Wed Aug 13 18:27:45 2014
@@ -23,7 +23,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
@@ -87,11 +87,11 @@ public class PartitionerDefinedVertexMan
             if (dynamicParallelism!=currentParallelism) {
                 LOG.info("Pig Partitioner Defined Vertex Manager: reset parallelism to " + dynamicParallelism
                         + " from " + currentParallelism);
-                Map<String, EdgeManagerDescriptor> edgeManagers =
-                    new HashMap<String, EdgeManagerDescriptor>();
+                Map<String, EdgeManagerPluginDescriptor> edgeManagers =
+                    new HashMap<String, EdgeManagerPluginDescriptor>();
                 for(String vertex : getContext().getInputVertexEdgeProperties().keySet()) {
-                    EdgeManagerDescriptor edgeManagerDescriptor =
-                            new EdgeManagerDescriptor(ScatterGatherEdgeManager.class.getName());
+                    EdgeManagerPluginDescriptor edgeManagerDescriptor =
+                            new EdgeManagerPluginDescriptor(ScatterGatherEdgeManager.class.getName());
                     edgeManagers.put(vertex, edgeManagerDescriptor);
                 }
                 getContext().setVertexParallelism(dynamicParallelism, null, edgeManagers, null);

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Wed Aug 13 18:27:45 2014
@@ -57,7 +57,7 @@ import org.apache.tez.runtime.api.Abstra
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
@@ -96,7 +96,7 @@ public class PigProcessor extends Abstra
     public static String sampleVertex;
     public static Map<String, Object> sampleMap;
 
-    public PigProcessor(TezProcessorContext context) {
+    public PigProcessor(ProcessorContext context) {
         super(context);
     }
 

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Wed Aug 13 18:27:45 2014
@@ -43,7 +43,6 @@ import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
@@ -583,7 +582,7 @@ public class TezCompiler extends PhyPlan
         } catch (IOException e) {
             int errCode = 2034;
             String msg = "Error compiling operator " + op.getClass().getSimpleName();
-            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+            throw new TezCompilerException(msg, errCode, PigException.BUG, e);
         }
 
         try{
@@ -592,7 +591,7 @@ public class TezCompiler extends PhyPlan
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + op.getClass().getSimpleName();
-            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+            throw new TezCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
 
@@ -874,10 +873,185 @@ public class TezCompiler extends PhyPlan
     }
 
     @Override
-    public void visitMergeCoGroup(POMergeCogroup op) throws VisitorException {
-        int errCode = 2034;
-        String msg = "Cannot compile " + op.getClass().getSimpleName();
-        throw new TezCompilerException(msg, errCode, PigException.BUG);
+    public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws VisitorException {
+        if(compiledInputs.length < 2){
+            int errCode=2251;
+            String errMsg = "Merge Cogroup work on two or more relations." +
+                    "To use map-side group-by on single relation, use 'collected' qualifier.";
+            throw new TezCompilerException(errMsg, errCode);
+        }
+
+        List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>(compiledInputs.length-1);
+        List<String> fileSpecs = new ArrayList<String>(compiledInputs.length-1);
+        List<String> loaderSigns = new ArrayList<String>(compiledInputs.length-1);
+
+        try{
+            poCoGrp.setEndOfRecordMark(POStatus.STATUS_NULL);
+
+            // Iterate through all the TezOpererators, disconnect side TezOperators from
+            // TezOperator and collect all the information needed in different lists.
+
+            for(int i=0 ; i < compiledInputs.length; i++){
+
+                TezOperator tezOper = compiledInputs[i];
+                PhysicalPlan plan = tezOper.plan;
+                if(plan.getRoots().size() != 1){
+                    int errCode = 2171;
+                    String errMsg = "Expected one but found more then one root physical operator in physical plan.";
+                    throw new TezCompilerException(errMsg,errCode,PigException.BUG);
+                }
+
+                PhysicalOperator rootPOOp = plan.getRoots().get(0);
+                if(! (rootPOOp instanceof POLoad)){
+                    int errCode = 2172;
+                    String errMsg = "Expected physical operator at root to be POLoad. Found : "+rootPOOp.getClass().getCanonicalName();
+                    throw new TezCompilerException(errMsg,errCode);
+                }
+
+                POLoad sideLoader = (POLoad)rootPOOp;
+                FileSpec loadFileSpec = sideLoader.getLFile();
+                FuncSpec funcSpec = loadFileSpec.getFuncSpec();
+                LoadFunc loadfunc = sideLoader.getLoadFunc();
+                if(i == 0){
+
+                    if(!(CollectableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))){
+                        int errCode = 2252;
+                        throw new TezCompilerException("Base loader in Cogroup must implement CollectableLoadFunc.", errCode);
+                    }
+
+                    ((CollectableLoadFunc)loadfunc).ensureAllKeyInstancesInSameSplit();
+                    continue;
+                }
+                if(!(IndexableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))){
+                    int errCode = 2253;
+                    throw new TezCompilerException("Side loaders in cogroup must implement IndexableLoadFunc.", errCode);
+                }
+
+                funcSpecs.add(funcSpec);
+                fileSpecs.add(loadFileSpec.getFileName());
+                loaderSigns.add(sideLoader.getSignature());
+                tezPlan.remove(tezOper);
+            }
+
+            poCoGrp.setSideLoadFuncs(funcSpecs);
+            poCoGrp.setSideFileSpecs(fileSpecs);
+            poCoGrp.setLoaderSignatures(loaderSigns);
+
+            // Use tez operator of base relation for the cogroup operation.
+            TezOperator baseMROp = phyToTezOpMap.get(poCoGrp.getInputs().get(0));
+            if(baseMROp.isClosed()){
+                int errCode = 2254;
+                throw new TezCompilerException("Currently merged cogroup is not supported after blocking operators.", errCode);
+            }
+
+            // Create new map-reduce operator for indexing job and then configure it.
+            TezOperator indexerTezOp = getTezOp();
+            FileSpec idxFileSpec = getIndexingJob(indexerTezOp, baseMROp, poCoGrp.getLRInnerPlansOf(0));
+            poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec());
+            poCoGrp.setIndexFileName(idxFileSpec.getFileName());
+
+            baseMROp.plan.addAsLeaf(poCoGrp);
+            for (FuncSpec funcSpec : funcSpecs)
+                baseMROp.UDFs.add(funcSpec.toString());
+
+            phyToTezOpMap.put(poCoGrp,baseMROp);
+            // Going forward, new operators should be added in baseMRop. To make
+            // sure, reset curMROp.
+            curTezOp = baseMROp;
+        }
+        catch (ExecException e){
+           throw new TezCompilerException(e.getDetailedMessage(),e.getErrorCode(),e.getErrorSource(),e);
+        }
+        catch (TezCompilerException mrce){
+            throw(mrce);
+        }
+        catch (CloneNotSupportedException e) {
+            throw new TezCompilerException(e);
+        }
+        catch(PlanException e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + poCoGrp.getClass().getCanonicalName();
+            throw new TezCompilerException(msg, errCode, PigException.BUG, e);
+        }
+        catch (IOException e){
+            int errCode = 3000;
+            String errMsg = "IOException caught while compiling POMergeCoGroup";
+            throw new TezCompilerException(errMsg, errCode,e);
+        }
+    }
+
+    // Sets up the indexing job for map-side cogroups.
+    private FileSpec getIndexingJob(TezOperator indexerTezOp,
+            final TezOperator baseTezOp, final List<PhysicalPlan> mapperLRInnerPlans)
+        throws TezCompilerException, PlanException, ExecException, IOException, CloneNotSupportedException {
+
+        // First replace loader with  MergeJoinIndexer.
+        PhysicalPlan basePlan = baseTezOp.plan;
+        POLoad baseLoader = (POLoad)basePlan.getRoots().get(0);
+        FileSpec origLoaderFileSpec = baseLoader.getLFile();
+        FuncSpec funcSpec = origLoaderFileSpec.getFuncSpec();
+        LoadFunc loadFunc = baseLoader.getLoadFunc();
+
+        if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
+            int errCode = 1104;
+            String errMsg = "Base relation of merge-coGroup must implement " +
+            "OrderedLoadFunc interface. The specified loader "
+            + funcSpec + " doesn't implement it";
+            throw new TezCompilerException(errMsg,errCode);
+        }
+
+        String[] indexerArgs = new String[6];
+        indexerArgs[0] = funcSpec.toString();
+        indexerArgs[1] = ObjectSerializer.serialize((Serializable)mapperLRInnerPlans);
+        indexerArgs[3] = baseLoader.getSignature();
+        indexerArgs[4] = baseLoader.getOperatorKey().scope;
+        indexerArgs[5] = Boolean.toString(false); // we care for nulls.
+
+        PhysicalPlan phyPlan;
+        if (basePlan.getSuccessors(baseLoader) == null
+                || basePlan.getSuccessors(baseLoader).isEmpty()){
+         // Load-Load-Cogroup case.
+            phyPlan = null;
+        }
+
+        else{ // We got something. Yank it and set it as inner plan.
+            phyPlan = basePlan.clone();
+            PhysicalOperator root = phyPlan.getRoots().get(0);
+            phyPlan.disconnect(root, phyPlan.getSuccessors(root).get(0));
+            phyPlan.remove(root);
+
+        }
+        indexerArgs[2] = ObjectSerializer.serialize(phyPlan);
+
+        POLoad idxJobLoader = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        idxJobLoader.setPc(pigContext);
+        idxJobLoader.setIsTmpLoad(true);
+        idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
+                new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
+        indexerTezOp.plan.add(idxJobLoader);
+        indexerTezOp.UDFs.add(baseLoader.getLFile().getFuncSpec().toString());
+
+        // Loader of mro will return a tuple of form -
+        // (key1, key2, .. , WritableComparable, splitIndex). See MergeJoinIndexer for details.
+
+        // After getting an index entry in each mapper, send all of them to one
+        // vertex where they will be sorted on the way by Hadoop.
+        TezOperator indexAggrOper = getTezOp();
+        tezPlan.add(indexAggrOper);
+        tezPlan.add(indexerTezOp);
+        TezCompilerUtil.simpleConnectTwoVertex(tezPlan, indexerTezOp, indexAggrOper, scope, nig);
+        TezCompilerUtil.connect(tezPlan, indexAggrOper, baseTezOp);
+        indexAggrOper.segmentBelow = true;
+
+        indexerTezOp.setRequestedParallelism(1); // we need exactly one reducer for indexing job.
+
+        POStore st = TezCompilerUtil.getStore(scope, nig);
+        FileSpec strFile = getTempFileSpec();
+        st.setSFile(strFile);
+        indexAggrOper.plan.addAsLeaf(st);
+        indexAggrOper.setClosed(true);
+
+        return strFile;
     }
 
     /** Since merge-join works on two inputs there are exactly two TezOper predecessors identified  as left and right.
@@ -902,7 +1076,7 @@ public class TezCompiler extends PhyPlan
             joinOp.setEndOfRecordMark(POStatus.STATUS_NULL);
             if(compiledInputs.length != 2 || joinOp.getInputs().size() != 2){
                 int errCode=1101;
-                throw new MRCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);
+                throw new TezCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);
             }
 
             curTezOp = phyToTezOpMap.get(joinOp.getInputs().get(0));
@@ -922,14 +1096,14 @@ public class TezCompiler extends PhyPlan
                 if(rightPlan.getRoots().size() != 1){
                     int errCode = 2171;
                     String errMsg = "Expected one but found more then one root physical operator in physical plan.";
-                    throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+                    throw new TezCompilerException(errMsg,errCode,PigException.BUG);
                 }
 
                 PhysicalOperator rightLoader = rightPlan.getRoots().get(0);
                 if(! (rightLoader instanceof POLoad)){
                     int errCode = 2172;
                     String errMsg = "Expected physical operator at root to be POLoad. Found : "+rightLoader.getClass().getCanonicalName();
-                    throw new MRCompilerException(errMsg,errCode);
+                    throw new TezCompilerException(errMsg,errCode);
                 }
 
                 if (rightPlan.getSuccessors(rightLoader) == null || rightPlan.getSuccessors(rightLoader).isEmpty())
@@ -988,7 +1162,7 @@ public class TezCompiler extends PhyPlan
                                 int errCode = 1106;
                                 String errMsg = "Merge join is possible only for simple column or '*' join keys when using " +
                                 rightLoader.getLFile().getFuncSpec() + " as the loader";
-                                throw new MRCompilerException(errMsg, errCode, PigException.INPUT);
+                                throw new TezCompilerException(errMsg, errCode, PigException.INPUT);
                             }
                         }
                     }
@@ -1003,7 +1177,7 @@ public class TezCompiler extends PhyPlan
                     int errCode = 1104;
                     String errMsg = "Right input of merge-join must implement IndexableLoadFunc. " +
                     "The specified loader " + loadFunc + " doesn't implement it";
-                    throw new MRCompilerException(errMsg,errCode);
+                    throw new TezCompilerException(errMsg,errCode);
                 }
 
                 // Replace POLoad with  indexer.
@@ -1013,7 +1187,7 @@ public class TezCompiler extends PhyPlan
                     String errMsg = "Right input of merge-join must implement " +
                     "OrderedLoadFunc interface. The specified loader "
                     + loadFunc + " doesn't implement it";
-                    throw new MRCompilerException(errMsg,errCode);
+                    throw new TezCompilerException(errMsg,errCode);
                 }
 
                 String[] indexerArgs = new String[6];
@@ -1083,17 +1257,17 @@ public class TezCompiler extends PhyPlan
         catch(PlanException e){
             int errCode = 2034;
             String msg = "Error compiling operator " + joinOp.getClass().getCanonicalName();
-            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+            throw new TezCompilerException(msg, errCode, PigException.BUG, e);
         }
        catch (IOException e){
            int errCode = 3000;
            String errMsg = "IOException caught while compiling POMergeJoin";
-            throw new MRCompilerException(errMsg, errCode,e);
+            throw new TezCompilerException(errMsg, errCode,e);
         }
        catch(CloneNotSupportedException e){
            int errCode = 2127;
            String errMsg = "Cloning exception caught while compiling POMergeJoin";
-           throw new MRCompilerException(errMsg, errCode, PigException.BUG, e);
+           throw new TezCompilerException(errMsg, errCode, PigException.BUG, e);
        }
     }
 

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Aug 13 18:27:45 2014
@@ -107,7 +107,7 @@ import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.GroupInputEdge;
@@ -129,6 +129,7 @@ import org.apache.tez.mapreduce.hadoop.M
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
 import org.apache.tez.runtime.library.input.ShuffledMergedInput;
@@ -342,7 +343,7 @@ public class TezDagBuilder extends TezOp
 
         if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
             // Use custom edge
-            return new EdgeProperty((EdgeManagerDescriptor)null,
+            return new EdgeProperty((EdgeManagerPluginDescriptor)null,
                     edge.dataSourceType, edge.schedulingType, out, in);
             }
 
@@ -596,9 +597,9 @@ public class TezDagBuilder extends TezOp
             vertex.setLocationHint(new VertexLocationHint(tezOp.getLoaderInfo().getInputSplitInfo().getTaskLocationHints()));
             vertex.addDataSource(ld.getOperatorKey().toString(),
                     new DataSourceDescriptor(new InputDescriptor(MRInput.class.getName())
-                            .setUserPayload(MRHelpers.createMRInputPayload(
-                                    payloadConf,
-                                    tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto())),
+                            .setUserPayload(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
+                            .setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf))
+                            .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteArray()),
                     new InputInitializerDescriptor(MRInputSplitDistributor.class.getName()), dag.getCredentials()));
         }
 

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java Wed Aug 13 18:27:45 2014
@@ -30,6 +30,7 @@ import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.StreamingUDF;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.OperatorPlan;
@@ -91,6 +92,11 @@ public class TezPlanContainer extends Op
                 }
                 URI jarUri = new File(jarName).toURI();
                 jarLists.add(jarUri);
+                if ("StreamingUDF".equals(clazz.getSimpleName())) {
+                    for (String fileName : StreamingUDF.getResourcesForJar()) {
+                        jarLists.add(new File(fileName).toURI());
+                    }
+                }
             }
         }
 
@@ -99,14 +105,6 @@ public class TezPlanContainer extends Op
             jarLists.add(scriptUDFJarFile.toURI());
         }
 
-        // Streaming UDF's are not working under Hadoop 2 (PIG-3478), so don't bother adding
-        // resources for them yet.
-        // if ("StreamingUDF".equals(clazz.getSimpleName())) {
-        //     for (String fileName : StreamingUDF.getResourcesForJar()) {
-        //         jarLists.add(new File(fileName).toURI().toURL());
-        //     }
-        // }
-
         return TezResourceManager.getInstance().addTezResources(jarLists);
     }
 

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java Wed Aug 13 18:27:45 2014
@@ -19,7 +19,7 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 
 /**
  * This interface is implemented by PhysicalOperators that can need to access
@@ -28,6 +28,6 @@ import org.apache.tez.runtime.api.TezPro
 
 public interface TezTaskConfigurable {
 
-    public void initialize(TezProcessorContext processorContext) throws ExecException;
+    public void initialize(ProcessorContext processorContext) throws ExecException;
 
 }

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java Wed Aug 13 18:27:45 2014
@@ -21,17 +21,17 @@ import org.apache.hadoop.mapreduce.Count
 import org.apache.pig.backend.hadoop.executionengine.TaskContext;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.mapreduce.hadoop.mapred.MRCounters.MRCounter;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 
-public class TezTaskContext extends TaskContext<TezProcessorContext> {
-    private TezProcessorContext context;
+public class TezTaskContext extends TaskContext<ProcessorContext> {
+    private ProcessorContext context;
 
-    public TezTaskContext(TezProcessorContext context) {
+    public TezTaskContext(ProcessorContext context) {
         this.context = context;
     }
 
     @Override
-    public TezProcessorContext get() {
+    public ProcessorContext get() {
         return context;
     }
 

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java Wed Aug 13 18:27:45 2014
@@ -36,7 +36,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
 public class POCounterTez extends POCounter implements TezOutput, TezTaskConfigurable {
@@ -56,7 +56,7 @@ public class POCounterTez extends POCoun
     }
 
     @Override
-    public void initialize(TezProcessorContext processorContext)
+    public void initialize(ProcessorContext processorContext)
             throws ExecException {
         this.setTaskId(processorContext.getTaskIndex());
     }

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java Wed Aug 13 18:27:45 2014
@@ -41,7 +41,7 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 
 public class LoaderProcessor extends TezOpPlanVisitor {
     private Configuration conf;
@@ -130,7 +130,7 @@ public class LoaderProcessor extends Tez
             tezOp.getLoaderInfo().setInpLimits(inpLimits);
             // Not using MRInputAMSplitGenerator because delegation tokens are
             // fetched in FileInputFormat
-            tezOp.getLoaderInfo().setInputSplitInfo(MRHelpers.generateInputSplitsToMem(conf, false, 0));
+            tezOp.getLoaderInfo().setInputSplitInfo(MRInputHelpers.generateInputSplitsToMem(conf, false, 0));
         }
         return lds;
     }

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/impl/PigImplConstants.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/impl/PigImplConstants.java Wed Aug 13 18:27:45 2014
@@ -41,4 +41,9 @@ public class PigImplConstants {
      * Used by pig to indicate that current job has been converted to run in local mode
      */
     public static final String CONVERTED_TO_LOCAL = "pig.job.converted.local";
+
+    /**
+     * Indicate the split index of the task. Used by merge cogroup
+     */
+    public static final String PIG_SPLIT_INDEX = "pig.split.index";
 }

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/impl/io/FileLocalizer.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/impl/io/FileLocalizer.java Wed Aug 13 18:27:45 2014
@@ -798,6 +798,8 @@ public class FileLocalizer {
         FileSystem srcFs;
         if ( (!"true".equals(properties.getProperty("pig.jars.relative.to.dfs"))
                 && uri.getScheme() == null )||
+                // For Windows local files
+                (uri.getScheme() == null && uri.getPath().matches("^/[A-Za-z]:.*")) ||
                 uri.getScheme().equals("local") ) {
             srcFs = localFs;
         } else {

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/scripting/ScriptingOutputCapturer.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/scripting/ScriptingOutputCapturer.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/scripting/ScriptingOutputCapturer.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/scripting/ScriptingOutputCapturer.java Wed Aug 13 18:27:45 2014
@@ -66,7 +66,13 @@ public class ScriptingOutputCapturer {
         
         String jobId = conf.get(MRConfiguration.JOB_ID);
         String taskId = conf.get(MRConfiguration.TASK_ID);
-        String hadoopLogDir = System.getProperty("hadoop.log.dir");
+        String hadoopLogDir = System.getProperty("yarn.app.container.log.dir");
+        if (hadoopLogDir == null) {
+            hadoopLogDir = conf.get("yarn.app.container.log.dir");
+        }
+        if (hadoopLogDir == null) {
+            hadoopLogDir = System.getProperty("hadoop.log.dir");
+        }
         if (hadoopLogDir == null) {
             hadoopLogDir = conf.get("hadoop.log.dir");
         }

Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/tools/pigstats/tez/TezTaskStats.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/tools/pigstats/tez/TezTaskStats.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/tools/pigstats/tez/TezTaskStats.java Wed Aug 13 18:27:45 2014
@@ -115,6 +115,9 @@ public class TezTaskStats extends JobSta
         }
 
         for (POStore sto : stores) {
+            if (sto.isTmpStore()) {
+                continue;
+            }
             long records = -1;
             long hdfsBytesWritten = JobStats.getOutputSize(sto, conf);
             String filename = sto.getSFile().getFileName();

Modified: pig/branches/maven/pig/test/e2e/harness/TestDriver.pm
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/test/e2e/harness/TestDriver.pm?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/test/e2e/harness/TestDriver.pm (original)
+++ pig/branches/maven/pig/test/e2e/harness/TestDriver.pm Wed Aug 13 18:27:45 2014
@@ -717,8 +717,7 @@ sub runTestGroup() {
 					$testStatuses->{$testName} = $failedStr;
 
 				}
-				$msg= "$msg at " . time . "\n";
-				#print $msg;
+                                $msg .= "\nEnding test $testName at " . time ."\n";
 				print $subLog $msg;
 				$duration = $endTime - $beginTime;
 				$dbinfo{'duration'} = $duration;
@@ -728,6 +727,7 @@ sub runTestGroup() {
 
 			if ($@) {
 				$msg= "ERROR $subName at : ".__LINE__." Failed to run test $testName <$@>\n";
+                                $msg .= "\nEnding test $testName at " . time ."\n";
 				#print $msg;
 				print $subLog $msg;
 				$testStatuses->{$testName} = $abortedStr;

Propchange: pig/branches/maven/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
------------------------------------------------------------------------------
  Merged /pig/trunk/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2:r1617330-1617783