You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/08/13 20:27:45 UTC
svn commit: r1617791 - in /pig/branches/maven: ./
pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/
pig/src/main/java/org/apache/pig/b...
Author: cheolsoo
Date: Wed Aug 13 18:27:45 2014
New Revision: 1617791
URL: http://svn.apache.org/r1617791
Log:
Merge latest trunk changes
Modified:
pig/branches/maven/ (props changed)
pig/branches/maven/CHANGES.txt
pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java
pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java
pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java
pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java
pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java
pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java
pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java
pig/branches/maven/pig/src/main/java/org/apache/pig/impl/PigImplConstants.java
pig/branches/maven/pig/src/main/java/org/apache/pig/impl/io/FileLocalizer.java
pig/branches/maven/pig/src/main/java/org/apache/pig/scripting/ScriptingOutputCapturer.java
pig/branches/maven/pig/src/main/java/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
pig/branches/maven/pig/test/e2e/harness/TestDriver.pm
pig/branches/maven/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2 (props changed)
Propchange: pig/branches/maven/
------------------------------------------------------------------------------
Merged /pig/trunk:r1617330-1617783
Modified: pig/branches/maven/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/maven/CHANGES.txt?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/CHANGES.txt (original)
+++ pig/branches/maven/CHANGES.txt Wed Aug 13 18:27:45 2014
@@ -24,6 +24,10 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4117: Implement merge cogroup in Tez (daijy)
+
+PIG-4119: Add message at end of each testcase with timestamp in Pig system tests (nmaheshwari via daijy)
+
PIG-4008: Pig code change to enable Tez Local mode (airbots via daijy)
PIG-4091: Predicate pushdown for ORC (rohini via daijy)
@@ -56,6 +60,12 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4061: Make Streaming UDF work in Tez (daijy)
+
+PIG-4118: Fix hadoopversion 23 compilation due to TEZ-1237/TEZ-1407 (daijy)
+
+PIG-4109: register local jar fail on Windows when Pig script is remote (daijy)
+
PIG-4116: Update Pig doc about Hadoop 2 Streaming Python UDF support (cheolsoo)
PIG-4112: NPE in packager when union + group-by followed by replicated join in Tez (rohini via cheolsoo)
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java Wed Aug 13 18:27:45 2014
@@ -22,8 +22,6 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Comparator;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Properties;
@@ -51,14 +49,12 @@ import org.apache.pig.data.InternalCache
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.impl.util.Pair;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
public class POMergeCogroup extends PhysicalOperator {
@@ -107,6 +103,8 @@ public class POMergeCogroup extends Phys
// call or not.
private transient boolean workingOnNewKey;
+ private byte endOfRecordMark = POStatus.STATUS_EOP;
+
public POMergeCogroup(OperatorKey k,List<PhysicalOperator> inpPOs,
POLocalRearrange[] lrs, int parallel) {
@@ -115,6 +113,14 @@ public class POMergeCogroup extends Phys
for(int i=0; i < lrs.length; i++)
LRs[i].setStripKeyFromValue(false);
}
+
+ // Set to POStatus.STATUS_EOP (default) for MR and POStatus.STATUS_NULL for Tez.
+ // This is because:
+ // For MR, we send EOP at the end of every record
+ // For Tez, we only use a global EOP, so send NULL for end of record
+ public void setEndOfRecordMark(byte endOfRecordMark) {
+ this.endOfRecordMark = endOfRecordMark;
+ }
@Override
public Result getNextTuple() throws ExecException {
@@ -240,7 +246,7 @@ public class POMergeCogroup extends Phys
if( (null != tuple) && (Byte)tuple.get(0) == 0){
// Is key null for both tuples.
if(prevTopOfHeap.get(1) == null && tuple.get(1) == null){
- return new Result(POStatus.STATUS_EOP,null);
+ return new Result(endOfRecordMark,null);
}
// Does key change from non-null to null or from one
// non-null to another non-null.
@@ -250,7 +256,7 @@ public class POMergeCogroup extends Phys
}
// Either top of heap is from different relation or it is
// from left relation but having the same key.
- return new Result(POStatus.STATUS_EOP,null);
+ return new Result(endOfRecordMark,null);
}
Tuple nxtTuple = sideLoaders.get(relIdx-1).getNext();
@@ -317,7 +323,12 @@ public class POMergeCogroup extends Phys
private void setup(Tuple firstRearrangedTup) throws IOException{
// Read our own split Index.
- int curSplitIdx = ((PigSplit)((Context)PigMapReduce.sJobContext).getInputSplit()).getSplitIndex();
+ int curSplitIdx = 0;
+ if (PigMapReduce.sJobContext.getConfiguration().get(PigImplConstants.PIG_SPLIT_INDEX)!=null) {
+ curSplitIdx = Integer.parseInt(PigMapReduce.sJobContext.getConfiguration().get(PigImplConstants.PIG_SPLIT_INDEX));
+ } else {
+ curSplitIdx = ((PigSplit)((Context)PigMapReduce.sJobContext).getInputSplit()).getSplitIndex();
+ }
Object firstBaseKey = firstRearrangedTup.get(1);
List<Pair<Integer,Tuple>> index = readIndex();
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java Wed Aug 13 18:27:45 2014
@@ -25,13 +25,17 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.lib.MRReader;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.library.api.KeyValueReader;
@@ -78,6 +82,12 @@ public class POSimpleTezLoad extends POL
input = (MRInput) logInput;
try {
reader = input.getReader();
+ // Set split index, MergeCoGroup need it. And this input is the only input of the
+ // MergeCoGroup vertex.
+ if (reader instanceof MRReader) {
+ int splitIndex = ((PigSplit)((MRReader)reader).getSplit()).getSplitIndex();
+ PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX, splitIndex);
+ }
} catch (IOException e) {
throw new ExecException(e);
}
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java Wed Aug 13 18:27:45 2014
@@ -32,7 +32,7 @@ import org.apache.tez.common.counters.Co
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
/**
@@ -70,7 +70,7 @@ public class POStoreTez extends POStore
}
@Override
- public void initialize(TezProcessorContext processorContext)
+ public void initialize(ProcessorContext processorContext)
throws ExecException {
if (isMultiStore()) {
CounterGroup multiStoreGroup = processorContext.getCounters()
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java Wed Aug 13 18:27:45 2014
@@ -40,7 +40,7 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
public class POValueOutputTez extends PhysicalOperator implements TezOutput, TezTaskConfigurable {
@@ -81,7 +81,7 @@ public class POValueOutputTez extends Ph
}
@Override
- public void initialize(TezProcessorContext processorContext)
+ public void initialize(ProcessorContext processorContext)
throws ExecException {
taskIndex = processorContext.getTaskIndex();
}
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java Wed Aug 13 18:27:45 2014
@@ -23,7 +23,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
@@ -87,11 +87,11 @@ public class PartitionerDefinedVertexMan
if (dynamicParallelism!=currentParallelism) {
LOG.info("Pig Partitioner Defined Vertex Manager: reset parallelism to " + dynamicParallelism
+ " from " + currentParallelism);
- Map<String, EdgeManagerDescriptor> edgeManagers =
- new HashMap<String, EdgeManagerDescriptor>();
+ Map<String, EdgeManagerPluginDescriptor> edgeManagers =
+ new HashMap<String, EdgeManagerPluginDescriptor>();
for(String vertex : getContext().getInputVertexEdgeProperties().keySet()) {
- EdgeManagerDescriptor edgeManagerDescriptor =
- new EdgeManagerDescriptor(ScatterGatherEdgeManager.class.getName());
+ EdgeManagerPluginDescriptor edgeManagerDescriptor =
+ new EdgeManagerPluginDescriptor(ScatterGatherEdgeManager.class.getName());
edgeManagers.put(vertex, edgeManagerDescriptor);
}
getContext().setVertexParallelism(dynamicParallelism, null, edgeManagers, null);
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Wed Aug 13 18:27:45 2014
@@ -57,7 +57,7 @@ import org.apache.tez.runtime.api.Abstra
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.KeyValueReader;
@@ -96,7 +96,7 @@ public class PigProcessor extends Abstra
public static String sampleVertex;
public static Map<String, Object> sampleMap;
- public PigProcessor(TezProcessorContext context) {
+ public PigProcessor(ProcessorContext context) {
super(context);
}
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Wed Aug 13 18:27:45 2014
@@ -43,7 +43,6 @@ import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
@@ -583,7 +582,7 @@ public class TezCompiler extends PhyPlan
} catch (IOException e) {
int errCode = 2034;
String msg = "Error compiling operator " + op.getClass().getSimpleName();
- throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+ throw new TezCompilerException(msg, errCode, PigException.BUG, e);
}
try{
@@ -592,7 +591,7 @@ public class TezCompiler extends PhyPlan
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " + op.getClass().getSimpleName();
- throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+ throw new TezCompilerException(msg, errCode, PigException.BUG, e);
}
}
@@ -874,10 +873,185 @@ public class TezCompiler extends PhyPlan
}
@Override
- public void visitMergeCoGroup(POMergeCogroup op) throws VisitorException {
- int errCode = 2034;
- String msg = "Cannot compile " + op.getClass().getSimpleName();
- throw new TezCompilerException(msg, errCode, PigException.BUG);
+ public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws VisitorException {
+ if(compiledInputs.length < 2){
+ int errCode=2251;
+ String errMsg = "Merge Cogroup work on two or more relations." +
+ "To use map-side group-by on single relation, use 'collected' qualifier.";
+ throw new TezCompilerException(errMsg, errCode);
+ }
+
+ List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>(compiledInputs.length-1);
+ List<String> fileSpecs = new ArrayList<String>(compiledInputs.length-1);
+ List<String> loaderSigns = new ArrayList<String>(compiledInputs.length-1);
+
+ try{
+ poCoGrp.setEndOfRecordMark(POStatus.STATUS_NULL);
+
+ // Iterate through all the TezOpererators, disconnect side TezOperators from
+ // TezOperator and collect all the information needed in different lists.
+
+ for(int i=0 ; i < compiledInputs.length; i++){
+
+ TezOperator tezOper = compiledInputs[i];
+ PhysicalPlan plan = tezOper.plan;
+ if(plan.getRoots().size() != 1){
+ int errCode = 2171;
+ String errMsg = "Expected one but found more then one root physical operator in physical plan.";
+ throw new TezCompilerException(errMsg,errCode,PigException.BUG);
+ }
+
+ PhysicalOperator rootPOOp = plan.getRoots().get(0);
+ if(! (rootPOOp instanceof POLoad)){
+ int errCode = 2172;
+ String errMsg = "Expected physical operator at root to be POLoad. Found : "+rootPOOp.getClass().getCanonicalName();
+ throw new TezCompilerException(errMsg,errCode);
+ }
+
+ POLoad sideLoader = (POLoad)rootPOOp;
+ FileSpec loadFileSpec = sideLoader.getLFile();
+ FuncSpec funcSpec = loadFileSpec.getFuncSpec();
+ LoadFunc loadfunc = sideLoader.getLoadFunc();
+ if(i == 0){
+
+ if(!(CollectableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))){
+ int errCode = 2252;
+ throw new TezCompilerException("Base loader in Cogroup must implement CollectableLoadFunc.", errCode);
+ }
+
+ ((CollectableLoadFunc)loadfunc).ensureAllKeyInstancesInSameSplit();
+ continue;
+ }
+ if(!(IndexableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))){
+ int errCode = 2253;
+ throw new TezCompilerException("Side loaders in cogroup must implement IndexableLoadFunc.", errCode);
+ }
+
+ funcSpecs.add(funcSpec);
+ fileSpecs.add(loadFileSpec.getFileName());
+ loaderSigns.add(sideLoader.getSignature());
+ tezPlan.remove(tezOper);
+ }
+
+ poCoGrp.setSideLoadFuncs(funcSpecs);
+ poCoGrp.setSideFileSpecs(fileSpecs);
+ poCoGrp.setLoaderSignatures(loaderSigns);
+
+ // Use tez operator of base relation for the cogroup operation.
+ TezOperator baseMROp = phyToTezOpMap.get(poCoGrp.getInputs().get(0));
+ if(baseMROp.isClosed()){
+ int errCode = 2254;
+ throw new TezCompilerException("Currently merged cogroup is not supported after blocking operators.", errCode);
+ }
+
+ // Create new map-reduce operator for indexing job and then configure it.
+ TezOperator indexerTezOp = getTezOp();
+ FileSpec idxFileSpec = getIndexingJob(indexerTezOp, baseMROp, poCoGrp.getLRInnerPlansOf(0));
+ poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec());
+ poCoGrp.setIndexFileName(idxFileSpec.getFileName());
+
+ baseMROp.plan.addAsLeaf(poCoGrp);
+ for (FuncSpec funcSpec : funcSpecs)
+ baseMROp.UDFs.add(funcSpec.toString());
+
+ phyToTezOpMap.put(poCoGrp,baseMROp);
+ // Going forward, new operators should be added in baseMRop. To make
+ // sure, reset curMROp.
+ curTezOp = baseMROp;
+ }
+ catch (ExecException e){
+ throw new TezCompilerException(e.getDetailedMessage(),e.getErrorCode(),e.getErrorSource(),e);
+ }
+ catch (TezCompilerException mrce){
+ throw(mrce);
+ }
+ catch (CloneNotSupportedException e) {
+ throw new TezCompilerException(e);
+ }
+ catch(PlanException e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + poCoGrp.getClass().getCanonicalName();
+ throw new TezCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ catch (IOException e){
+ int errCode = 3000;
+ String errMsg = "IOException caught while compiling POMergeCoGroup";
+ throw new TezCompilerException(errMsg, errCode,e);
+ }
+ }
+
+ // Sets up the indexing job for map-side cogroups.
+ private FileSpec getIndexingJob(TezOperator indexerTezOp,
+ final TezOperator baseTezOp, final List<PhysicalPlan> mapperLRInnerPlans)
+ throws TezCompilerException, PlanException, ExecException, IOException, CloneNotSupportedException {
+
+ // First replace loader with MergeJoinIndexer.
+ PhysicalPlan basePlan = baseTezOp.plan;
+ POLoad baseLoader = (POLoad)basePlan.getRoots().get(0);
+ FileSpec origLoaderFileSpec = baseLoader.getLFile();
+ FuncSpec funcSpec = origLoaderFileSpec.getFuncSpec();
+ LoadFunc loadFunc = baseLoader.getLoadFunc();
+
+ if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
+ int errCode = 1104;
+ String errMsg = "Base relation of merge-coGroup must implement " +
+ "OrderedLoadFunc interface. The specified loader "
+ + funcSpec + " doesn't implement it";
+ throw new TezCompilerException(errMsg,errCode);
+ }
+
+ String[] indexerArgs = new String[6];
+ indexerArgs[0] = funcSpec.toString();
+ indexerArgs[1] = ObjectSerializer.serialize((Serializable)mapperLRInnerPlans);
+ indexerArgs[3] = baseLoader.getSignature();
+ indexerArgs[4] = baseLoader.getOperatorKey().scope;
+ indexerArgs[5] = Boolean.toString(false); // we care for nulls.
+
+ PhysicalPlan phyPlan;
+ if (basePlan.getSuccessors(baseLoader) == null
+ || basePlan.getSuccessors(baseLoader).isEmpty()){
+ // Load-Load-Cogroup case.
+ phyPlan = null;
+ }
+
+ else{ // We got something. Yank it and set it as inner plan.
+ phyPlan = basePlan.clone();
+ PhysicalOperator root = phyPlan.getRoots().get(0);
+ phyPlan.disconnect(root, phyPlan.getSuccessors(root).get(0));
+ phyPlan.remove(root);
+
+ }
+ indexerArgs[2] = ObjectSerializer.serialize(phyPlan);
+
+ POLoad idxJobLoader = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ idxJobLoader.setPc(pigContext);
+ idxJobLoader.setIsTmpLoad(true);
+ idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
+ new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
+ indexerTezOp.plan.add(idxJobLoader);
+ indexerTezOp.UDFs.add(baseLoader.getLFile().getFuncSpec().toString());
+
+ // Loader of mro will return a tuple of form -
+ // (key1, key2, .. , WritableComparable, splitIndex). See MergeJoinIndexer for details.
+
+ // After getting an index entry in each mapper, send all of them to one
+ // vertex where they will be sorted on the way by Hadoop.
+ TezOperator indexAggrOper = getTezOp();
+ tezPlan.add(indexAggrOper);
+ tezPlan.add(indexerTezOp);
+ TezCompilerUtil.simpleConnectTwoVertex(tezPlan, indexerTezOp, indexAggrOper, scope, nig);
+ TezCompilerUtil.connect(tezPlan, indexAggrOper, baseTezOp);
+ indexAggrOper.segmentBelow = true;
+
+ indexerTezOp.setRequestedParallelism(1); // we need exactly one reducer for indexing job.
+
+ POStore st = TezCompilerUtil.getStore(scope, nig);
+ FileSpec strFile = getTempFileSpec();
+ st.setSFile(strFile);
+ indexAggrOper.plan.addAsLeaf(st);
+ indexAggrOper.setClosed(true);
+
+ return strFile;
}
/** Since merge-join works on two inputs there are exactly two TezOper predecessors identified as left and right.
@@ -902,7 +1076,7 @@ public class TezCompiler extends PhyPlan
joinOp.setEndOfRecordMark(POStatus.STATUS_NULL);
if(compiledInputs.length != 2 || joinOp.getInputs().size() != 2){
int errCode=1101;
- throw new MRCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);
+ throw new TezCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);
}
curTezOp = phyToTezOpMap.get(joinOp.getInputs().get(0));
@@ -922,14 +1096,14 @@ public class TezCompiler extends PhyPlan
if(rightPlan.getRoots().size() != 1){
int errCode = 2171;
String errMsg = "Expected one but found more then one root physical operator in physical plan.";
- throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+ throw new TezCompilerException(errMsg,errCode,PigException.BUG);
}
PhysicalOperator rightLoader = rightPlan.getRoots().get(0);
if(! (rightLoader instanceof POLoad)){
int errCode = 2172;
String errMsg = "Expected physical operator at root to be POLoad. Found : "+rightLoader.getClass().getCanonicalName();
- throw new MRCompilerException(errMsg,errCode);
+ throw new TezCompilerException(errMsg,errCode);
}
if (rightPlan.getSuccessors(rightLoader) == null || rightPlan.getSuccessors(rightLoader).isEmpty())
@@ -988,7 +1162,7 @@ public class TezCompiler extends PhyPlan
int errCode = 1106;
String errMsg = "Merge join is possible only for simple column or '*' join keys when using " +
rightLoader.getLFile().getFuncSpec() + " as the loader";
- throw new MRCompilerException(errMsg, errCode, PigException.INPUT);
+ throw new TezCompilerException(errMsg, errCode, PigException.INPUT);
}
}
}
@@ -1003,7 +1177,7 @@ public class TezCompiler extends PhyPlan
int errCode = 1104;
String errMsg = "Right input of merge-join must implement IndexableLoadFunc. " +
"The specified loader " + loadFunc + " doesn't implement it";
- throw new MRCompilerException(errMsg,errCode);
+ throw new TezCompilerException(errMsg,errCode);
}
// Replace POLoad with indexer.
@@ -1013,7 +1187,7 @@ public class TezCompiler extends PhyPlan
String errMsg = "Right input of merge-join must implement " +
"OrderedLoadFunc interface. The specified loader "
+ loadFunc + " doesn't implement it";
- throw new MRCompilerException(errMsg,errCode);
+ throw new TezCompilerException(errMsg,errCode);
}
String[] indexerArgs = new String[6];
@@ -1083,17 +1257,17 @@ public class TezCompiler extends PhyPlan
catch(PlanException e){
int errCode = 2034;
String msg = "Error compiling operator " + joinOp.getClass().getCanonicalName();
- throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+ throw new TezCompilerException(msg, errCode, PigException.BUG, e);
}
catch (IOException e){
int errCode = 3000;
String errMsg = "IOException caught while compiling POMergeJoin";
- throw new MRCompilerException(errMsg, errCode,e);
+ throw new TezCompilerException(errMsg, errCode,e);
}
catch(CloneNotSupportedException e){
int errCode = 2127;
String errMsg = "Cloning exception caught while compiling POMergeJoin";
- throw new MRCompilerException(errMsg, errCode, PigException.BUG, e);
+ throw new TezCompilerException(errMsg, errCode, PigException.BUG, e);
}
}
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Aug 13 18:27:45 2014
@@ -107,7 +107,7 @@ import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.GroupInputEdge;
@@ -129,6 +129,7 @@ import org.apache.tez.mapreduce.hadoop.M
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
import org.apache.tez.runtime.library.input.ShuffledMergedInput;
@@ -342,7 +343,7 @@ public class TezDagBuilder extends TezOp
if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
// Use custom edge
- return new EdgeProperty((EdgeManagerDescriptor)null,
+ return new EdgeProperty((EdgeManagerPluginDescriptor)null,
edge.dataSourceType, edge.schedulingType, out, in);
}
@@ -596,9 +597,9 @@ public class TezDagBuilder extends TezOp
vertex.setLocationHint(new VertexLocationHint(tezOp.getLoaderInfo().getInputSplitInfo().getTaskLocationHints()));
vertex.addDataSource(ld.getOperatorKey().toString(),
new DataSourceDescriptor(new InputDescriptor(MRInput.class.getName())
- .setUserPayload(MRHelpers.createMRInputPayload(
- payloadConf,
- tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto())),
+ .setUserPayload(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
+ .setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf))
+ .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteArray()),
new InputInitializerDescriptor(MRInputSplitDistributor.class.getName()), dag.getCredentials()));
}
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java Wed Aug 13 18:27:45 2014
@@ -30,6 +30,7 @@ import java.util.Set;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.StreamingUDF;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.OperatorPlan;
@@ -91,6 +92,11 @@ public class TezPlanContainer extends Op
}
URI jarUri = new File(jarName).toURI();
jarLists.add(jarUri);
+ if ("StreamingUDF".equals(clazz.getSimpleName())) {
+ for (String fileName : StreamingUDF.getResourcesForJar()) {
+ jarLists.add(new File(fileName).toURI());
+ }
+ }
}
}
@@ -99,14 +105,6 @@ public class TezPlanContainer extends Op
jarLists.add(scriptUDFJarFile.toURI());
}
- // Streaming UDF's are not working under Hadoop 2 (PIG-3478), so don't bother adding
- // resources for them yet.
- // if ("StreamingUDF".equals(clazz.getSimpleName())) {
- // for (String fileName : StreamingUDF.getResourcesForJar()) {
- // jarLists.add(new File(fileName).toURI().toURL());
- // }
- // }
-
return TezResourceManager.getInstance().addTezResources(jarLists);
}
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java Wed Aug 13 18:27:45 2014
@@ -19,7 +19,7 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
/**
* This interface is implemented by PhysicalOperators that can need to access
@@ -28,6 +28,6 @@ import org.apache.tez.runtime.api.TezPro
public interface TezTaskConfigurable {
- public void initialize(TezProcessorContext processorContext) throws ExecException;
+ public void initialize(ProcessorContext processorContext) throws ExecException;
}
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java Wed Aug 13 18:27:45 2014
@@ -21,17 +21,17 @@ import org.apache.hadoop.mapreduce.Count
import org.apache.pig.backend.hadoop.executionengine.TaskContext;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.mapreduce.hadoop.mapred.MRCounters.MRCounter;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
-public class TezTaskContext extends TaskContext<TezProcessorContext> {
- private TezProcessorContext context;
+public class TezTaskContext extends TaskContext<ProcessorContext> {
+ private ProcessorContext context;
- public TezTaskContext(TezProcessorContext context) {
+ public TezTaskContext(ProcessorContext context) {
this.context = context;
}
@Override
- public TezProcessorContext get() {
+ public ProcessorContext get() {
return context;
}
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java Wed Aug 13 18:27:45 2014
@@ -36,7 +36,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
public class POCounterTez extends POCounter implements TezOutput, TezTaskConfigurable {
@@ -56,7 +56,7 @@ public class POCounterTez extends POCoun
}
@Override
- public void initialize(TezProcessorContext processorContext)
+ public void initialize(ProcessorContext processorContext)
throws ExecException {
this.setTaskId(processorContext.getTaskIndex());
}
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java Wed Aug 13 18:27:45 2014
@@ -41,7 +41,7 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
public class LoaderProcessor extends TezOpPlanVisitor {
private Configuration conf;
@@ -130,7 +130,7 @@ public class LoaderProcessor extends Tez
tezOp.getLoaderInfo().setInpLimits(inpLimits);
// Not using MRInputAMSplitGenerator because delegation tokens are
// fetched in FileInputFormat
- tezOp.getLoaderInfo().setInputSplitInfo(MRHelpers.generateInputSplitsToMem(conf, false, 0));
+ tezOp.getLoaderInfo().setInputSplitInfo(MRInputHelpers.generateInputSplitsToMem(conf, false, 0));
}
return lds;
}
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/impl/PigImplConstants.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/impl/PigImplConstants.java Wed Aug 13 18:27:45 2014
@@ -41,4 +41,9 @@ public class PigImplConstants {
* Used by pig to indicate that current job has been converted to run in local mode
*/
public static final String CONVERTED_TO_LOCAL = "pig.job.converted.local";
+
+ /**
+ * Indicate the split index of the task. Used by merge cogroup
+ */
+ public static final String PIG_SPLIT_INDEX = "pig.split.index";
}
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/impl/io/FileLocalizer.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/impl/io/FileLocalizer.java Wed Aug 13 18:27:45 2014
@@ -798,6 +798,8 @@ public class FileLocalizer {
FileSystem srcFs;
if ( (!"true".equals(properties.getProperty("pig.jars.relative.to.dfs"))
&& uri.getScheme() == null )||
+ // For Windows local files
+ (uri.getScheme() == null && uri.getPath().matches("^/[A-Za-z]:.*")) ||
uri.getScheme().equals("local") ) {
srcFs = localFs;
} else {
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/scripting/ScriptingOutputCapturer.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/scripting/ScriptingOutputCapturer.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/scripting/ScriptingOutputCapturer.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/scripting/ScriptingOutputCapturer.java Wed Aug 13 18:27:45 2014
@@ -66,7 +66,13 @@ public class ScriptingOutputCapturer {
String jobId = conf.get(MRConfiguration.JOB_ID);
String taskId = conf.get(MRConfiguration.TASK_ID);
- String hadoopLogDir = System.getProperty("hadoop.log.dir");
+ String hadoopLogDir = System.getProperty("yarn.app.container.log.dir");
+ if (hadoopLogDir == null) {
+ hadoopLogDir = conf.get("yarn.app.container.log.dir");
+ }
+ if (hadoopLogDir == null) {
+ hadoopLogDir = System.getProperty("hadoop.log.dir");
+ }
if (hadoopLogDir == null) {
hadoopLogDir = conf.get("hadoop.log.dir");
}
Modified: pig/branches/maven/pig/src/main/java/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/src/main/java/org/apache/pig/tools/pigstats/tez/TezTaskStats.java?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/src/main/java/org/apache/pig/tools/pigstats/tez/TezTaskStats.java (original)
+++ pig/branches/maven/pig/src/main/java/org/apache/pig/tools/pigstats/tez/TezTaskStats.java Wed Aug 13 18:27:45 2014
@@ -115,6 +115,9 @@ public class TezTaskStats extends JobSta
}
for (POStore sto : stores) {
+ if (sto.isTmpStore()) {
+ continue;
+ }
long records = -1;
long hdfsBytesWritten = JobStats.getOutputSize(sto, conf);
String filename = sto.getSFile().getFileName();
Modified: pig/branches/maven/pig/test/e2e/harness/TestDriver.pm
URL: http://svn.apache.org/viewvc/pig/branches/maven/pig/test/e2e/harness/TestDriver.pm?rev=1617791&r1=1617790&r2=1617791&view=diff
==============================================================================
--- pig/branches/maven/pig/test/e2e/harness/TestDriver.pm (original)
+++ pig/branches/maven/pig/test/e2e/harness/TestDriver.pm Wed Aug 13 18:27:45 2014
@@ -717,8 +717,7 @@ sub runTestGroup() {
$testStatuses->{$testName} = $failedStr;
}
- $msg= "$msg at " . time . "\n";
- #print $msg;
+ $msg .= "\nEnding test $testName at " . time ."\n";
print $subLog $msg;
$duration = $endTime - $beginTime;
$dbinfo{'duration'} = $duration;
@@ -728,6 +727,7 @@ sub runTestGroup() {
if ($@) {
$msg= "ERROR $subName at : ".__LINE__." Failed to run test $testName <$@>\n";
+ $msg .= "\nEnding test $testName at " . time ."\n";
#print $msg;
print $subLog $msg;
$testStatuses->{$testName} = $abortedStr;
Propchange: pig/branches/maven/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
------------------------------------------------------------------------------
Merged /pig/trunk/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2:r1617330-1617783