You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/28 09:56:34 UTC
svn commit: r1546314 [4/6] - in /pig/branches/tez:
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer...
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Thu Nov 28 08:56:33 2013
@@ -23,8 +23,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import org.joda.time.DateTimeZone;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -40,7 +38,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -60,24 +58,25 @@ import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.SpillableMemoryManager;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStatusReporter;
+import org.joda.time.DateTimeZone;
/**
* This class is the static Mapper & Reducer classes that
* are used by Pig to execute Pig Map Reduce jobs. Since
- * there is a reduce phase, the leaf is bound to be a
+ * there is a reduce phase, the leaf is bound to be a
* POLocalRearrange. So the map phase has to separate the
* key and tuple and collect it into the output
* collector.
- *
+ *
* The shuffle and sort phase sorts these keys & tuples
* and creates key, List<Tuple> and passes the key and
* iterator to the list. The deserialized POPackage operator
- * is used to package the key, List<Tuple> into pigKey,
+ * is used to package the key, List<Tuple> into pigKey,
* Bag<Tuple> where pigKey is of the appropriate pig type and
* then the result of the package is attached to the reduce
- * plan which is executed if its not empty. Either the result
+ * plan which is executed if its not empty. Either the result
* of the reduce plan or the package res is collected into
- * the output collector.
+ * the output collector.
*
* The index of the tuple (that is, which bag it should be placed in by the
* package) is packed into the key. This is done so that hadoop sorts the
@@ -90,28 +89,28 @@ import org.apache.pig.tools.pigstats.Pig
public class PigGenericMapReduce {
public static JobContext sJobContext = null;
-
+
/**
- * @deprecated Use {@link UDFContext} instead in the following way to get
+ * @deprecated Use {@link UDFContext} instead in the following way to get
* the job's {@link Configuration}:
* <pre>UdfContext.getUdfContext().getJobConf()</pre>
*/
@Deprecated
public static Configuration sJobConf = null;
-
+
public static final ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>();
-
+
public static class Map extends PigMapBase {
@Override
- public void collect(Context oc, Tuple tuple)
+ public void collect(Context oc, Tuple tuple)
throws InterruptedException, IOException {
-
+
Byte index = (Byte)tuple.get(0);
PigNullableWritable key =
HDataType.getWritableComparableTypes(tuple.get(1), keyType);
NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
-
+
// Both the key and the value need the index. The key needs it so
// that it can be sorted on the index in addition to the key
// value. The value needs it so that POPackage can properly
@@ -122,7 +121,7 @@ public class PigGenericMapReduce {
oc.write(key, val);
}
}
-
+
/**
* This "specialized" map class is ONLY to be used in pig queries with
* order by a udf. A UDF used for comparison in the order by expects
@@ -132,9 +131,9 @@ public class PigGenericMapReduce {
public static class MapWithComparator extends PigMapBase {
@Override
- public void collect(Context oc, Tuple tuple)
+ public void collect(Context oc, Tuple tuple)
throws InterruptedException, IOException {
-
+
Object keyTuple = null;
if(keyType != DataType.TUPLE) {
Object k = tuple.get(1);
@@ -142,13 +141,13 @@ public class PigGenericMapReduce {
} else {
keyTuple = tuple.get(1);
}
-
+
Byte index = (Byte)tuple.get(0);
PigNullableWritable key =
HDataType.getWritableComparableTypes(keyTuple, DataType.TUPLE);
NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
-
+
// Both the key and the value need the index. The key needs it so
// that it can be sorted on the index in addition to the key
// value. The value needs it so that POPackage can properly
@@ -166,23 +165,23 @@ public class PigGenericMapReduce {
public static class MapWithPartitionIndex extends Map {
@Override
- public void collect(Context oc, Tuple tuple)
+ public void collect(Context oc, Tuple tuple)
throws InterruptedException, IOException {
-
+
Byte tupleKeyIdx = 2;
Byte tupleValIdx = 3;
Byte index = (Byte)tuple.get(0);
- Integer partitionIndex = -1;
- // for partitioning table, the partition index isn't present
- if (tuple.size() == 3) {
- //super.collect(oc, tuple);
- //return;
- tupleKeyIdx--;
- tupleValIdx--;
- } else {
- partitionIndex = (Integer)tuple.get(1);
- }
+ Integer partitionIndex = -1;
+ // for partitioning table, the partition index isn't present
+ if (tuple.size() == 3) {
+ //super.collect(oc, tuple);
+ //return;
+ tupleKeyIdx--;
+ tupleValIdx--;
+ } else {
+ partitionIndex = (Integer)tuple.get(1);
+ }
PigNullableWritable key =
HDataType.getWritableComparableTypes(tuple.get(tupleKeyIdx), keyType);
@@ -190,13 +189,13 @@ public class PigGenericMapReduce {
NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
NullableTuple val = new NullableTuple((Tuple)tuple.get(tupleValIdx));
-
+
// Both the key and the value need the index. The key needs it so
// that it can be sorted on the index in addition to the key
// value. The value needs it so that POPackage can properly
// assign the tuple to its slot in the projection.
wrappedKey.setIndex(index);
-
+
// set the partition
wrappedKey.setPartition(partitionIndex);
val.setIndex(index);
@@ -204,14 +203,14 @@ public class PigGenericMapReduce {
}
@Override
- protected void runPipeline(PhysicalOperator leaf)
+ protected void runPipeline(PhysicalOperator leaf)
throws IOException, InterruptedException {
-
+
while(true){
Result res = leaf.getNextTuple();
-
+
if(res.returnStatus==POStatus.STATUS_OK){
- // For POPartitionRearrange, the result is a bag.
+ // For POPartitionRearrange, the result is a bag.
// This operator is used for skewed join
if (res.result instanceof DataBag) {
Iterator<Tuple> its = ((DataBag)res.result).iterator();
@@ -223,7 +222,7 @@ public class PigGenericMapReduce {
}
continue;
}
-
+
if(res.returnStatus==POStatus.STATUS_EOP) {
return;
}
@@ -233,7 +232,7 @@ public class PigGenericMapReduce {
}
if(res.returnStatus==POStatus.STATUS_ERR){
- // remember that we had an issue so that in
+ // remember that we had an issue so that in
// close() we can do the right thing
errorInMap = true;
// if there is an errmessage use it
@@ -253,39 +252,39 @@ public class PigGenericMapReduce {
}
}
- abstract public static class Reduce
+ abstract public static class Reduce
extends Reducer <PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
-
+
protected final Log log = LogFactory.getLog(getClass());
-
+
//The reduce plan
protected PhysicalPlan rp = null;
// Store operators
protected List<POStore> stores;
-
+
//The POPackage operator which is the
//root of every Map Reduce plan is
//obtained through the job conf. The portion
//remaining after its removal is the reduce
//plan
protected POPackage pack;
-
+
ProgressableReporter pigReporter;
protected Context outputCollector;
protected boolean errorInReduce = false;
-
+
PhysicalOperator[] roots;
private PhysicalOperator leaf;
-
+
PigContext pigContext = null;
protected volatile boolean initialized = false;
-
+
private boolean inIllustrator = false;
-
+
/**
* Set the reduce plan: to be used by local runner for illustrator
* @param plan Reduce plan
@@ -313,7 +312,7 @@ public class PigGenericMapReduce {
try {
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
-
+
// This attempts to fetch all of the generated code from the distributed cache, and resolve it
SchemaTupleBackend.initialize(jConf, pigContext);
@@ -337,36 +336,36 @@ public class PigGenericMapReduce {
roots = rp.getRoots().toArray(new PhysicalOperator[1]);
leaf = rp.getLeaves().get(0);
}
-
+
// Get the UDF specific context
- MapRedUtil.setupUDFContext(jConf);
-
+ MapRedUtil.setupUDFContext(jConf);
+
} catch (IOException ioe) {
String msg = "Problem while configuring reduce plan.";
throw new RuntimeException(msg, ioe);
}
log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location"));
-
+
String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
if (dtzStr != null && dtzStr.length() > 0) {
// ensure that the internal timezone is uniformly in UTC offset style
DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
}
}
-
+
/**
* The reduce function which packages the key and List<Tuple>
* into key, Bag<Tuple> after converting Hadoop type key into Pig type.
* The package result is either collected as is, if the reduce plan is
* empty or after passing through the reduce plan.
- */
+ */
@Override
- protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
- throws IOException, InterruptedException {
-
+ protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
+ throws IOException, InterruptedException {
+
if (!initialized) {
initialized = true;
-
+
// cache the collector for use in runPipeline()
// which could additionally be called from close()
this.outputCollector = context;
@@ -379,24 +378,24 @@ public class PigGenericMapReduce {
pigHadoopLogger.setAggregate(aggregateWarning);
PigStatusReporter.setContext(context);
pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
+
PhysicalOperator.setPigLogger(pigHadoopLogger);
if (!inIllustrator)
for (POStore store: stores) {
- MapReducePOStoreImpl impl
+ MapReducePOStoreImpl impl
= new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
store.setUp();
}
}
-
+
// In the case we optimize the join, we combine
// POPackage and POForeach - so we could get many
// tuples out of the getnext() call of POJoinPackage
- // In this case, we process till we see EOP from
+ // In this case, we process till we see EOP from
// POJoinPacakage.getNext()
- if (pack instanceof POJoinPackage)
+ if (pack.getPkgr() instanceof JoinPackager)
{
pack.attachInput(key, tupIter.iterator());
while (true)
@@ -410,18 +409,18 @@ public class PigGenericMapReduce {
// give only one tuple out for the key
pack.attachInput(key, tupIter.iterator());
processOnePackageOutput(context);
- }
+ }
}
-
+
// return: false-more output
// true- end of processing
- public boolean processOnePackageOutput(Context oc)
+ public boolean processOnePackageOutput(Context oc)
throws IOException, InterruptedException {
Result res = pack.getNextTuple();
if(res.returnStatus==POStatus.STATUS_OK){
Tuple packRes = (Tuple)res.result;
-
+
if(rp.isEmpty()){
oc.write(null, packRes);
return false;
@@ -430,35 +429,35 @@ public class PigGenericMapReduce {
roots[i].attachInput(packRes);
}
runPipeline(leaf);
-
+
}
-
+
if(res.returnStatus==POStatus.STATUS_NULL) {
return false;
}
-
+
if(res.returnStatus==POStatus.STATUS_ERR){
int errCode = 2093;
String msg = "Encountered error in package operator while processing group.";
throw new ExecException(msg, errCode, PigException.BUG);
}
-
+
if(res.returnStatus==POStatus.STATUS_EOP) {
return true;
}
-
+
return false;
-
+
}
-
+
/**
* @param leaf
* @throws InterruptedException
- * @throws IOException
+ * @throws IOException
*/
- protected void runPipeline(PhysicalOperator leaf)
+ protected void runPipeline(PhysicalOperator leaf)
throws InterruptedException, IOException {
-
+
while(true)
{
Result redRes = leaf.getNextTuple();
@@ -470,17 +469,17 @@ public class PigGenericMapReduce {
}
continue;
}
-
+
if(redRes.returnStatus==POStatus.STATUS_EOP) {
return;
}
-
+
if(redRes.returnStatus==POStatus.STATUS_NULL) {
continue;
}
-
+
if(redRes.returnStatus==POStatus.STATUS_ERR){
- // remember that we had an issue so that in
+ // remember that we had an issue so that in
// close() we can do the right thing
errorInReduce = true;
// if there is an errmessage use it
@@ -497,22 +496,22 @@ public class PigGenericMapReduce {
}
}
}
-
+
/**
* Will be called once all the intermediate keys and values are
* processed. So right place to stop the reporter thread.
*/
- @Override
+ @Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
-
+
if(errorInReduce) {
// there was an error in reduce - just return
return;
}
-
+
if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true")) {
- // If there is a stream in the pipeline we could
+ // If there is a stream in the pipeline we could
// potentially have more to process - so lets
// set the flag stating that all map input has been sent
// already and then lets run the pipeline one more time
@@ -525,7 +524,7 @@ public class PigGenericMapReduce {
if (!inIllustrator) {
for (POStore store: stores) {
if (!initialized) {
- MapReducePOStoreImpl impl
+ MapReducePOStoreImpl impl
= new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
store.setUp();
@@ -533,7 +532,7 @@ public class PigGenericMapReduce {
store.tearDown();
}
}
-
+
//Calling EvalFunc.finish()
UDFFinishVisitor finisher = new UDFFinishVisitor(rp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(rp));
try {
@@ -541,14 +540,14 @@ public class PigGenericMapReduce {
} catch (VisitorException e) {
throw new IOException("Error trying to finish UDFs",e);
}
-
+
PhysicalOperator.setReporter(null);
initialized = false;
}
-
+
/**
* Get reducer's illustrator context
- *
+ *
* @param input Input buffer as output by maps
* @param pkg package
* @return reducer's illustrator context
@@ -557,25 +556,25 @@ public class PigGenericMapReduce {
*/
abstract public Context getIllustratorContext(Job job,
List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException;
-
+
abstract public boolean inIllustrator(Context context);
-
+
abstract public POPackage getPack(Context context);
}
-
+
/**
* This "specialized" reduce class is ONLY to be used in pig queries with
* order by a udf. A UDF used for comparison in the order by expects
* to be handed tuples. Hence a specialized map class (PigMapReduce.MapWithComparator)
- * ensures that the "key" used in the order by is wrapped into a tuple (if it
+ * ensures that the "key" used in the order by is wrapped into a tuple (if it
* isn't already a tuple). This reduce class unwraps this tuple in the case where
* the map had wrapped into a tuple and handes the "unwrapped" key to the POPackage
* for processing
*/
public static class ReduceWithComparator extends PigMapReduce.Reduce {
-
+
private byte keyType;
-
+
/**
* Configures the Reduce plan, the POPackage operator
* and the reporter thread
@@ -583,7 +582,7 @@ public class PigGenericMapReduce {
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
- keyType = pack.getKeyType();
+ keyType = pack.getPkgr().getKeyType();
}
/**
@@ -593,12 +592,12 @@ public class PigGenericMapReduce {
* empty or after passing through the reduce plan.
*/
@Override
- protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
+ protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
throws IOException, InterruptedException {
-
+
if (!initialized) {
initialized = true;
-
+
// cache the collector for use in runPipeline()
// which could additionally be called from close()
this.outputCollector = context;
@@ -606,24 +605,24 @@ public class PigGenericMapReduce {
PhysicalOperator.setReporter(pigReporter);
boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
+
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
pigHadoopLogger.setAggregate(aggregateWarning);
PigStatusReporter.setContext(context);
pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
PhysicalOperator.setPigLogger(pigHadoopLogger);
-
+
for (POStore store: stores) {
- MapReducePOStoreImpl impl
+ MapReducePOStoreImpl impl
= new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
store.setUp();
}
}
-
+
// If the keyType is not a tuple, the MapWithComparator.collect()
- // would have wrapped the key into a tuple so that the
+ // would have wrapped the key into a tuple so that the
// comparison UDF used in the order by can process it.
// We need to unwrap the key out of the tuple and hand it
// to the POPackage for processing
@@ -635,31 +634,31 @@ public class PigGenericMapReduce {
throw e;
}
}
-
+
pack.attachInput(key, tupIter.iterator());
-
+
Result res = pack.getNextTuple();
if(res.returnStatus==POStatus.STATUS_OK){
Tuple packRes = (Tuple)res.result;
-
+
if(rp.isEmpty()){
context.write(null, packRes);
return;
}
-
+
rp.attachInput(packRes);
List<PhysicalOperator> leaves = rp.getLeaves();
-
+
PhysicalOperator leaf = leaves.get(0);
runPipeline(leaf);
-
+
}
-
+
if(res.returnStatus==POStatus.STATUS_NULL) {
return;
}
-
+
if(res.returnStatus==POStatus.STATUS_ERR){
int errCode = 2093;
String msg = "Encountered error in package operator while processing group.";
@@ -669,5 +668,5 @@ public class PigGenericMapReduce {
}
}
-
+
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java Thu Nov 28 08:56:33 2013
@@ -30,10 +30,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -100,7 +100,7 @@ public class SecondaryKeyOptimizer exten
columns
.add(rearrange.getIndex()
& PigNullableWritable.idxSpace);
-
+
// The first item inside columnChainInfo is set to type Tuple.
// This value is not actually in use, but it intends to match
// the type of POProject in reduce side
@@ -111,13 +111,13 @@ public class SecondaryKeyOptimizer exten
if (node instanceof POProject) {
POProject project = (POProject) node;
if(project.isProjectToEnd()){
- columnChainInfo.insert(project.getStartCol(),
+ columnChainInfo.insert(project.getStartCol(),
project.getResultType());
}else {
columnChainInfo.insert(
project.getColumns(), project.getResultType());
}
-
+
if (plan.getSuccessors(node) == null)
node = null;
else if (plan.getSuccessors(node).size() != 1) {
@@ -154,8 +154,7 @@ public class SecondaryKeyOptimizer exten
List<PhysicalOperator> mapLeaves = mr.mapPlan.getLeaves();
if (mapLeaves == null || mapLeaves.size() != 1) {
- log
- .debug("Expected map to have single leaf! Skip secondary key optimizing");
+ log.debug("Expected map to have single leaf! Skip secondary key optimizing");
return;
}
PhysicalOperator mapLeaf = mapLeaves.get(0);
@@ -165,8 +164,7 @@ public class SecondaryKeyOptimizer exten
if (mapLeaf instanceof POLocalRearrange) {
SortKeyInfo sortKeyInfo = getSortKeyInfo((POLocalRearrange) mapLeaf);
if (sortKeyInfo == null) {
- log
- .debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
+ log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
return;
}
sortKeyInfos.add(sortKeyInfo);
@@ -188,8 +186,7 @@ public class SecondaryKeyOptimizer exten
return;
}
} catch (ExecException e) {
- log
- .debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
+ log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
return;
}
@@ -200,15 +197,13 @@ public class SecondaryKeyOptimizer exten
List<PhysicalOperator> reduceRoots = mr.reducePlan.getRoots();
if (reduceRoots.size() != 1) {
- log
- .debug("Expected reduce to have single root, skip secondary key optimizing");
+ log.debug("Expected reduce to have single root, skip secondary key optimizing");
return;
}
PhysicalOperator root = reduceRoots.get(0);
if (!(root instanceof POPackage)) {
- log
- .debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
+ log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
return;
}
@@ -217,7 +212,8 @@ public class SecondaryKeyOptimizer exten
PhysicalOperator currentNode = root;
POForEach foreach = null;
while (currentNode != null) {
- if (currentNode instanceof POPackage && !(currentNode instanceof POJoinPackage)
+ if (currentNode instanceof POPackage
+ && !(((POPackage) currentNode).getPkgr() instanceof JoinPackager)
|| currentNode instanceof POFilter
|| currentNode instanceof POLimit) {
List<PhysicalOperator> succs = mr.reducePlan
@@ -237,7 +233,7 @@ public class SecondaryKeyOptimizer exten
return;
}
}
-
+
// We do not find a foreach (we shall not come here, a trick to fool findbugs)
if (foreach==null)
return;
@@ -355,7 +351,7 @@ public class SecondaryKeyOptimizer exten
for (PhysicalOperator pred : preds) {
POLocalRearrange rearrange = (POLocalRearrange) pred;
rearrange.setUseSecondaryKey(true);
- if (rearrange.getIndex() == indexOfRearrangeToChange) {
+ if (rearrange.getIndex() == indexOfRearrangeToChange) {
// Try to find the POLocalRearrange for the secondary key
found = true;
setSecondaryPlan(mr.mapPlan, rearrange, secondarySortKeyInfo);
@@ -368,7 +364,7 @@ public class SecondaryKeyOptimizer exten
}
}
POPackage pack = (POPackage) root;
- pack.setUseSecondaryKey(true);
+ pack.getPkgr().setUseSecondaryKey(true);
}
}
@@ -443,7 +439,7 @@ public class SecondaryKeyOptimizer exten
// sort key.
private static class SecondaryKeyDiscover {
PhysicalPlan mPlan;
-
+
List<POSort> sortsToRemove = new ArrayList<POSort>();
List<PODistinct> distinctsToChange = new ArrayList<PODistinct>();
@@ -461,7 +457,7 @@ public class SecondaryKeyOptimizer exten
this.sortKeyInfos = sortKeyInfos;
this.secondarySortKeyInfo = secondarySortKeyInfo;
}
-
+
public void process() throws FrontendException
{
List<PhysicalOperator> roots = mPlan.getRoots();
@@ -470,7 +466,7 @@ public class SecondaryKeyOptimizer exten
processRoot(root);
}
}
-
+
public void processRoot(PhysicalOperator root) throws FrontendException {
PhysicalOperator currentNode = root;
while (currentNode!=null) {
@@ -486,10 +482,10 @@ public class SecondaryKeyOptimizer exten
// We don't process foreach, since foreach is too complex to get right
currentNode instanceof POForEach)
break;
-
+
if (sawInvalidPhysicalOper)
break;
-
+
List<PhysicalOperator> succs = mPlan.getSuccessors(currentNode);
if (succs==null)
currentNode = null;
@@ -609,7 +605,7 @@ public class SecondaryKeyOptimizer exten
static private boolean collectColumnChain(PhysicalPlan plan,
ColumnChainInfo columnChainInfo) throws PlanException {
if (plan.getRoots().size() != 1) {
- return true;
+ return true;
}
PhysicalOperator currentNode = plan.getRoots().get(0);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Thu Nov 28 08:56:33 2013
@@ -27,11 +27,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
@@ -57,7 +56,7 @@ public class POPackageAnnotator extends
@Override
public void visitMROp(MapReduceOper mr) throws VisitorException {
-
+
// POPackage OR POJoinPackage could be present in the combine plan
// OR in the reduce plan. POPostCombinerPackage could
// be present only in the reduce plan. Search in these two
@@ -68,9 +67,9 @@ public class POPackageAnnotator extends
POPackage pkg = pkgDiscoverer.getPkg();
if(pkg != null) {
handlePackage(mr, pkg);
- }
+ }
}
-
+
if(!mr.reducePlan.isEmpty()) {
PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.reducePlan);
pkgDiscoverer.visit();
@@ -78,7 +77,7 @@ public class POPackageAnnotator extends
if(pkg != null) {
// if the POPackage is actually a POPostCombinerPackage, then we should
// just look for the corresponding LocalRearrange(s) in the combine plan
- if(pkg instanceof POCombinerPackage) {
+ if (pkg.getPkgr() instanceof CombinerPackager) {
if(patchPackage(mr.combinePlan, pkg) != pkg.getNumInps()) {
int errCode = 2085;
String msg = "Unexpected problem during optimization." +
@@ -90,14 +89,14 @@ public class POPackageAnnotator extends
}
}
}
-
+
}
-
+
private void handlePackage(MapReduceOper mr, POPackage pkg) throws VisitorException {
// the LocalRearrange(s) could either be in the map of this MapReduceOper
// OR in the reduce of predecessor MapReduceOpers
int lrFound = 0;
-
+
lrFound = patchPackage(mr.mapPlan, pkg);
if(lrFound != pkg.getNumInps()) {
// we did not find the LocalRearrange(s) in the map plan
@@ -110,7 +109,7 @@ public class POPackageAnnotator extends
lrFound += patchPackage(mrOper.reducePlan, pkg);
if(lrFound == pkg.getNumInps()) {
break;
- }
+ }
}
}
if(lrFound != pkg.getNumInps()) {
@@ -127,7 +126,7 @@ public class POPackageAnnotator extends
// the package
return lrDiscoverer.getLoRearrangeFound();
}
-
+
/**
* Simple visitor of the "Reduce" physical plan
* which will get a reference to the POPacakge
@@ -136,36 +135,15 @@ public class POPackageAnnotator extends
static class PackageDiscoverer extends PhyPlanVisitor {
private POPackage pkg;
-
+
public PackageDiscoverer(PhysicalPlan plan) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
}
-
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
- */
+
@Override
public void visitPackage(POPackage pkg) throws VisitorException {
this.pkg = pkg;
};
-
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitJoinPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage)
- */
- @Override
- public void visitJoinPackage(POJoinPackage joinPackage)
- throws VisitorException {
- this.pkg = joinPackage;
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitCombinerPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage)
- */
- @Override
- public void visitCombinerPackage(POCombinerPackage pkg)
- throws VisitorException {
- this.pkg = pkg;
- }
/**
* @return the pkg
@@ -173,9 +151,9 @@ public class POPackageAnnotator extends
public POPackage getPkg() {
return pkg;
}
-
+
}
-
+
/**
* Physical Plan visitor which tries to get the
* LocalRearrange(s) present in the plan (if any) and
@@ -184,24 +162,21 @@ public class POPackageAnnotator extends
* present in the "key")
*/
static class LoRearrangeDiscoverer extends PhyPlanVisitor {
-
+
private int loRearrangeFound = 0;
private POPackage pkg;
-
+
public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
this.pkg = pkg;
}
-
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
- */
+
@Override
public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
loRearrangeFound++;
Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
- if (pkg instanceof POPackageLite) {
+ if (pkg.getPkgr() instanceof LitePackager) {
if(lrearrange.getIndex() != 0) {
// Throw some exception here
throw new RuntimeException("POLocalRearrange for POPackageLite cannot have index other than 0, but has index - "+lrearrange.getIndex());
@@ -210,26 +185,26 @@ public class POPackageAnnotator extends
// annotate the package with information from the LORearrange
// update the keyInfo information if already present in the POPackage
- keyInfo = pkg.getKeyInfo();
+ keyInfo = pkg.getPkgr().getKeyInfo();
if(keyInfo == null)
keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
-
+
if(keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
- // something is wrong - we should not be getting key info
+ // something is wrong - we should not be getting key info
// for the same index from two different Local Rearranges
int errCode = 2087;
String msg = "Unexpected problem during optimization." +
- " Found index:" + lrearrange.getIndex() +
+ " Found index:" + lrearrange.getIndex() +
" in multiple LocalRearrange operators.";
throw new OptimizerException(msg, errCode, PigException.BUG);
-
+
}
- keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
+ keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
new Pair<Boolean, Map<Integer, Integer>>(
lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
- pkg.setKeyInfo(keyInfo);
- pkg.setKeyTuple(lrearrange.isKeyTuple());
- pkg.setKeyCompound(lrearrange.isKeyCompound());
+ pkg.getPkgr().setKeyInfo(keyInfo);
+ pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+ pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
}
/**
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Nov 28 08:56:33 2013
@@ -20,18 +20,67 @@ package org.apache.pig.backend.hadoop.ex
import java.util.List;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Add;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Divide;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Mod;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Multiply;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.NotEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POIsNull;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POMapLookUp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONegative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONot;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POOr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORegexp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Subtract;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.impl.plan.PlanVisitor;
import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
/**
* The visitor class for the Physical Plan. To use this,
- * create the visitor with the plan to be visited. Call
+ * create the visitor with the plan to be visited. Call
* the visit() method to traverse the plan in a depth first
* fashion.
- *
+ *
* This class also visits the nested plans inside the operators.
* One has to extend this class to modify the nature of each visit
* and to maintain any relevant state information between the visits
@@ -47,15 +96,15 @@ public class PhyPlanVisitor extends Plan
public void visitLoad(POLoad ld) throws VisitorException{
//do nothing
}
-
+
public void visitStore(POStore st) throws VisitorException{
//do nothing
}
-
+
public void visitNative(PONative nat) throws VisitorException{
//do nothing
}
-
+
public void visitFilter(POFilter fl) throws VisitorException{
pushWalker(mCurrentWalker.spawnChildWalker(fl.getPlan()));
visit();
@@ -71,7 +120,7 @@ public class PhyPlanVisitor extends Plan
popWalker();
}
}
-
+
public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
List<PhysicalPlan> inpPlans = lr.getPlans();
for (PhysicalPlan plan : inpPlans) {
@@ -84,19 +133,11 @@ public class PhyPlanVisitor extends Plan
public void visitGlobalRearrange(POGlobalRearrange gr) throws VisitorException{
//do nothing
}
-
+
public void visitPackage(POPackage pkg) throws VisitorException{
//do nothing
}
-
- public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
- //do nothing
- }
-
- public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException{
- //do nothing
- }
-
+
public void visitPOForEach(POForEach nfe) throws VisitorException {
List<PhysicalPlan> inpPlans = nfe.getInputPlans();
for (PhysicalPlan plan : inpPlans) {
@@ -105,11 +146,11 @@ public class PhyPlanVisitor extends Plan
popWalker();
}
}
-
+
public void visitUnion(POUnion un) throws VisitorException{
//do nothing
}
-
+
public void visitSplit(POSplit spl) throws VisitorException{
List<PhysicalPlan> plans = spl.getPlans();
for (PhysicalPlan plan : plans) {
@@ -136,78 +177,78 @@ public class PhyPlanVisitor extends Plan
//do nothing
}
- public void visitDistinct(PODistinct distinct) throws VisitorException {
+ public void visitDistinct(PODistinct distinct) throws VisitorException {
//do nothing
- }
+ }
- public void visitSort(POSort sort) throws VisitorException {
+ public void visitSort(POSort sort) throws VisitorException {
List<PhysicalPlan> inpPlans = sort.getSortPlans();
for (PhysicalPlan plan : inpPlans) {
pushWalker(mCurrentWalker.spawnChildWalker(plan));
visit();
popWalker();
}
- }
-
+ }
+
public void visitConstant(ConstantExpression cnst) throws VisitorException{
//do nothing
}
-
+
public void visitProject(POProject proj) throws VisitorException{
//do nothing
}
-
+
public void visitGreaterThan(GreaterThanExpr grt) throws VisitorException{
//do nothing
}
-
+
public void visitLessThan(LessThanExpr lt) throws VisitorException{
//do nothing
}
-
+
public void visitGTOrEqual(GTOrEqualToExpr gte) throws VisitorException{
//do nothing
}
-
+
public void visitLTOrEqual(LTOrEqualToExpr lte) throws VisitorException{
//do nothing
}
-
+
public void visitEqualTo(EqualToExpr eq) throws VisitorException{
//do nothing
}
-
+
public void visitNotEqualTo(NotEqualToExpr eq) throws VisitorException{
//do nothing
}
-
+
public void visitRegexp(PORegexp re) throws VisitorException{
//do nothing
}
public void visitIsNull(POIsNull isNull) throws VisitorException {
}
-
+
public void visitAdd(Add add) throws VisitorException{
//do nothing
}
-
+
public void visitSubtract(Subtract sub) throws VisitorException {
//do nothing
}
-
+
public void visitMultiply(Multiply mul) throws VisitorException {
//do nothing
}
-
+
public void visitDivide(Divide dv) throws VisitorException {
//do nothing
}
-
+
public void visitMod(Mod mod) throws VisitorException {
//do nothing
}
-
+
public void visitAnd(POAnd and) throws VisitorException {
//do nothing
}
@@ -222,83 +263,79 @@ public class PhyPlanVisitor extends Plan
public void visitBinCond(POBinCond binCond) {
// do nothing
-
+
}
public void visitNegative(PONegative negative) {
//do nothing
-
+
}
-
+
public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
//do nothing
}
-
+
public void visitComparisonFunc(POUserComparisonFunc compFunc) throws VisitorException {
//do nothing
}
public void visitMapLookUp(POMapLookUp mapLookUp) {
// TODO Auto-generated method stub
-
- }
-
- public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException{
- //do nothing
+
}
public void visitCast(POCast cast) {
// TODO Auto-generated method stub
-
+
}
-
+
public void visitLimit(POLimit lim) throws VisitorException{
//do nothing
}
-
+
public void visitCross(POCross cross) throws VisitorException{
//do nothing
}
-
+
public void visitFRJoin(POFRJoin join) throws VisitorException {
//do nothing
}
-
+
public void visitMergeJoin(POMergeJoin join) throws VisitorException {
//do nothing
}
-
+
public void visitMergeCoGroup(POMergeCogroup mergeCoGrp) throws VisitorException{
-
+
}
/**
* @param stream
- * @throws VisitorException
+ * @throws VisitorException
*/
public void visitStream(POStream stream) throws VisitorException {
// TODO Auto-generated method stub
-
+
}
- public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
+ public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
- }
+ }
- public void visitPartitionRearrange(POPartitionRearrange pr) throws VisitorException {
+ public void visitPartitionRearrange(POPartitionRearrange pr) throws VisitorException {
List<PhysicalPlan> inpPlans = pr.getPlans();
for (PhysicalPlan plan : inpPlans) {
pushWalker(mCurrentWalker.spawnChildWalker(plan));
visit();
popWalker();
}
- }
+ }
/**
* @param optimizedForEach
*/
public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException {
// TODO Auto-generated method stub
-
+
}
/**
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Thu Nov 28 08:56:33 2013
@@ -30,7 +30,23 @@ import java.util.Set;
import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorPlan;
@@ -46,7 +62,7 @@ public class PlanPrinter<O extends Opera
String TABMore = "| ";
String LSep = "|\n|---";
-
+
String USep = "| |\n| ";
int levelCntr = -1;
@@ -58,7 +74,7 @@ public class PlanPrinter<O extends Opera
public PlanPrinter(P plan) {
super(plan, new DepthFirstWalker<O, P>(plan));
}
-
+
public PlanPrinter(P plan, PrintStream stream) {
super(plan, new DepthFirstWalker<O, P>(plan));
this.stream = stream;
@@ -124,7 +140,7 @@ public class PlanPrinter<O extends Opera
sb.delete(sb.length() - "\n".length(), sb.length());
return sb.toString();
}
-
+
private String planString(PhysicalPlan pp){
StringBuilder sb = new StringBuilder();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -136,7 +152,7 @@ public class PlanPrinter<O extends Opera
sb.append(shiftStringByTabs(baos.toString(), 2));
return sb.toString();
}
-
+
private String planString(List<PhysicalPlan> lep){
StringBuilder sb = new StringBuilder();
if(lep!=null)
@@ -175,12 +191,6 @@ public class PlanPrinter<O extends Opera
else if(node instanceof POForEach){
sb.append(planString(((POForEach)node).getInputPlans()));
}
- else if (node instanceof POMultiQueryPackage) {
- List<POPackage> pkgs = ((POMultiQueryPackage)node).getPackages();
- for (POPackage pkg : pkgs) {
- sb.append(LSep + pkg.name() + "\n");
- }
- }
else if(node instanceof POFRJoin){
POFRJoin frj = (POFRJoin)node;
List<List<PhysicalPlan>> joinPlans = frj.getJoinPlans();
@@ -193,13 +203,13 @@ public class PlanPrinter<O extends Opera
POSkewedJoin skewed = (POSkewedJoin)node;
MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = skewed.getJoinPlans();
if(joinPlans!=null) {
- List<PhysicalPlan> inner_plans = new ArrayList<PhysicalPlan>();
- inner_plans.addAll(joinPlans.values());
- sb.append(planString(inner_plans));
+ List<PhysicalPlan> inner_plans = new ArrayList<PhysicalPlan>();
+ inner_plans.addAll(joinPlans.values());
+ sb.append(planString(inner_plans));
}
}
}
-
+
if (node instanceof POSplit) {
sb.append(planString(((POSplit)node).getPlans()));
}
@@ -210,13 +220,13 @@ public class PlanPrinter<O extends Opera
plans.addAll(pl);
sb.append(planString(plans));
}
-
+
List<O> originalPredecessors = mPlan.getPredecessors(node);
if (originalPredecessors == null)
return sb.toString();
-
+
List<O> predecessors = new ArrayList<O>(originalPredecessors);
-
+
Collections.sort(predecessors);
int i = 0;
for (O pred : predecessors) {
@@ -280,5 +290,5 @@ public class PlanPrinter<O extends Opera
public void visitStartMap(POUnion op) {
stream.print(op.name() + " ");
}
-
+
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java Thu Nov 28 08:56:33 2013
@@ -40,8 +40,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -56,10 +54,10 @@ import org.w3c.dom.Element;
public class XMLPhysicalPlanPrinter<P extends OperatorPlan<PhysicalOperator>> extends
PhyPlanVisitor {
-
+
private Document doc = null;
private Element parent = null;
-
+
public XMLPhysicalPlanPrinter(PhysicalPlan plan, Document doc, Element parent) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
this.doc = doc;
@@ -84,7 +82,7 @@ public class XMLPhysicalPlanPrinter<P ex
transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
-
+
StringWriter sw = new StringWriter();
StreamResult result = new StreamResult(sw);
DOMSource source = new DOMSource(doc);
@@ -94,7 +92,7 @@ public class XMLPhysicalPlanPrinter<P ex
e.printStackTrace();
}
}
-
+
private Element createAlias(PhysicalOperator po) {
Element aliasNode = null;
String alias = po.getAlias();
@@ -112,8 +110,8 @@ public class XMLPhysicalPlanPrinter<P ex
depthFirst(leaf, parentNode);
}
}
-
-
+
+
private void visitPlan(PhysicalPlan pp, Element parentNode) throws VisitorException {
if(pp!=null) {
XMLPhysicalPlanPrinter<PhysicalPlan> ppp =
@@ -121,15 +119,15 @@ public class XMLPhysicalPlanPrinter<P ex
ppp.visit();
}
}
-
-
+
+
private void visitPlan(List<PhysicalPlan> lep, Element parentNode) throws VisitorException {
if(lep!=null)
for (PhysicalPlan ep : lep) {
visitPlan(ep, parentNode);
}
}
-
+
private Element createPONode(PhysicalOperator node) {
Element PONode = doc.createElement(node.getClass().getSimpleName());
PONode.setAttribute("scope", "" + node.getOperatorKey().id);
@@ -150,18 +148,18 @@ public class XMLPhysicalPlanPrinter<P ex
Element loadFile = doc.createElement("loadFile");
loadFile.setTextContent(((POLoad)node).getLFile().getFileName());
PONode.appendChild(loadFile);
-
+
Element isTmpLoad = doc.createElement("isTmpLoad");
isTmpLoad.setTextContent(Boolean.valueOf(((POLoad)node).isTmpLoad()).toString());
PONode.appendChild(isTmpLoad);
}
return PONode;
}
-
+
private void depthFirst(PhysicalOperator node, Element parentNode) throws VisitorException {
Element childNode = null;
-
+
List<PhysicalPlan> subPlans = new ArrayList<PhysicalPlan>();
if(node instanceof POFilter){
subPlans.add(((POFilter) node).getPlan());
@@ -177,12 +175,6 @@ public class XMLPhysicalPlanPrinter<P ex
subPlans = ((POSplit)node).getPlans();
} else if (node instanceof PODemux) {
subPlans = ((PODemux)node).getPlans();
- } else if (node instanceof POMultiQueryPackage) {
- childNode = createPONode(node);
- List<POPackage> pkgs = ((POMultiQueryPackage)node).getPackages();
- for (POPackage pkg : pkgs) {
- childNode.appendChild(createPONode(pkg));
- }
} else if(node instanceof POFRJoin){
childNode = createPONode(node);
POFRJoin frj = (POFRJoin)node;
@@ -198,11 +190,11 @@ public class XMLPhysicalPlanPrinter<P ex
MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = skewed.getJoinPlans();
if(joinPlans!=null) {
List<PhysicalPlan> inner_plans = new ArrayList<PhysicalPlan>();
- inner_plans.addAll(joinPlans.values());
- visitPlan(inner_plans, childNode);
+ inner_plans.addAll(joinPlans.values());
+ visitPlan(inner_plans, childNode);
}
}
-
+
if (childNode == null) {
childNode = createPONode(node);
if (subPlans.size() > 0) {
@@ -210,17 +202,17 @@ public class XMLPhysicalPlanPrinter<P ex
}
}
parentNode.appendChild(childNode);
-
+
List<PhysicalOperator> originalPredecessors = mPlan.getPredecessors(node);
if (originalPredecessors == null) {
return;
}
-
+
List<PhysicalOperator> predecessors = new ArrayList<PhysicalOperator>(originalPredecessors);
-
+
Collections.sort(predecessors);
for (PhysicalOperator pred : predecessors) {
depthFirst(pred, childNode);
}
}
-}
\ No newline at end of file
+}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1546314&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Thu Nov 28 08:56:33 2013
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.util.Pair;
+/**
+ * The package operator that packages the globally rearranged tuples into
+ * output format after the combiner stage. It differs from POPackage in that
+ * it does not use the index in the NullableTuple to find the bag to put a
+ * tuple in. Instead, the inputs are put in a bag corresponding to their
+ * offset in the tuple.
+ */
+public class CombinerPackager extends Packager {
+
+ private static final long serialVersionUID = 1L;
+
+ private boolean[] mBags; // For each field, indicates whether or not it
+ // needs to be put in a bag.
+
+ private Map<Integer, Integer> keyLookup;
+
+ private int numBags;
+
+ /**
+ * A new POPostCombinePackage will be constructed as a near clone of the
+ * provided POPackage.
+ * @param pkg POPackage to clone.
+ * @param bags for each field, indicates whether it should be a bag (true)
+ * or a simple field (false).
+ */
+ public CombinerPackager(Packager pkgr, boolean[] bags) {
+ super();
+ keyType = pkgr.keyType;
+ numInputs = 1;
+ inner = new boolean[1];
+ for (int i = 0; i < pkgr.inner.length; i++) {
+ inner[i] = true;
+ }
+ if (bags != null) {
+ mBags = Arrays.copyOf(bags, bags.length);
+ }
+ numBags = 0;
+ for (int i = 0; i < mBags.length; i++) {
+ if (mBags[i]) numBags++;
+ }
+ }
+
+ /**
+ * @param keyInfo the keyInfo to set
+ */
+ public void setKeyInfo(Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
+ this.keyInfo = keyInfo;
+ // TODO: IMPORTANT ASSUMPTION: Currently we only combine in the
+ // group case and not in cogroups. So there should only
+ // be one LocalRearrange from which we get the keyInfo for
+ // which field in the value is in the key. This LocalRearrange
+ // has an index of 0. When we do support combiner in Cogroups
+ // THIS WILL NEED TO BE REVISITED.
+ Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
+ keyInfo.get(0); // assumption: only group are "combinable", hence index 0
+ keyLookup = lrKeyInfo.second;
+ }
+
+ private DataBag createDataBag(int numBags) {
+ String bagType = null;
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+ }
+
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ return new NonSpillableDataBag();
+ }
+ return new InternalCachedBag(numBags);
+ }
+
+ @Override
+ public Result getNext() throws ExecException {
+ //Create numInputs bags
+ Object[] fields = new Object[mBags.length];
+ for (int i = 0; i < mBags.length; i++) {
+ if (mBags[i]) fields[i] = createDataBag(numBags);
+ }
+
+ // For each indexed tup in the inp, split them up and place their
+ // fields into the proper bags. If the given field isn't a bag, just
+ // set the value as is.
+ for (Tuple tup : bags[0]) {
+ int tupIndex = 0; // an index for accessing elements from
+ // the value (tup) that we have currently
+ for(int i = 0; i < mBags.length; i++) {
+ Integer keyIndex = keyLookup.get(i);
+ if(keyIndex == null && mBags[i]) {
+ // the field for this index is not the
+ // key - so just take it from the "value"
+ // we were handed - Currently THIS HAS TO BE A BAG
+ // In future if this changes, THIS WILL NEED TO BE
+ // REVISITED.
+ ((DataBag)fields[i]).add((Tuple)tup.get(tupIndex));
+ tupIndex++;
+ } else {
+ // the field for this index is in the key
+ fields[i] = key;
+ }
+ }
+ }
+
+ // The successor of the POCombinerPackage as of
+ // now SHOULD be a POForeach which has been adjusted
+ // to look for its inputs by projecting from the corresponding
+ // positions in the POCombinerPackage output.
+ // So we will NOT be adding the key in the result here but merely
+ // putting all bags into a result tuple and returning it.
+ Tuple res;
+ res = mTupleFactory.newTuple(mBags.length);
+ for (int i = 0; i < mBags.length; i++) res.set(i, fields[i]);
+ Result r = new Result();
+ r.result = res;
+ r.returnStatus = POStatus.STATUS_OK;
+ return r;
+ }
+
+ @Override
+ protected Tuple getValueTuple(Object key, NullableTuple ntup, int index)
+ throws ExecException {
+ return (Tuple) ntup.getValueAsPigType();
+ }
+
+}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java?rev=1546314&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java Thu Nov 28 08:56:33 2013
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+
+public class JoinPackager extends Packager {
+
+ private static final long serialVersionUID = 1L;
+
+ private POOptimizedForEach forEach;
+ private boolean newKey = true;
+ private Tuple res = null;
+ private static final Result eopResult = new Result(POStatus.STATUS_EOP, null);
+ private boolean firstTime = true;
+ private boolean useDefaultBag = false;
+
+ public static final String DEFAULT_CHUNK_SIZE = "1000";
+
+ private long chunkSize = Long.parseLong(DEFAULT_CHUNK_SIZE);
+ private Result forEachResult;
+ private DataBag[] dbs = null;
+
+ private int lastBagIndex;
+
+ private Iterator<Tuple> lastBagIter;
+
+ public JoinPackager(Packager p, POForEach f) {
+ super();
+ String scope = f.getOperatorKey().getScope();
+ NodeIdGenerator nig = NodeIdGenerator.getGenerator();
+ forEach = new POOptimizedForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ if (p!=null)
+ {
+ setKeyType(p.getKeyType());
+ setNumInputs(p.getNumInputs());
+ lastBagIndex = numInputs - 1;
+ setInner(p.getInner());
+ setKeyInfo(p.getKeyInfo());
+ this.isKeyTuple = p.isKeyTuple;
+ this.isKeyCompound = p.isKeyCompound;
+ }
+ if (f!=null)
+ {
+ setInputPlans(f.getInputPlans());
+ setToBeFlattened(f.getToBeFlattened());
+ }
+ }
+
+ /**
+ * Calls getNext to get next ForEach result. The input for POJoinPackage is
+ * a (key, NullableTuple) pair. We will materialize n-1 inputs into bags, feed input#n
+ * one tuple a time to the delegated ForEach operator, the input for ForEach is
+ *
+ * (input#1, input#2, input#3....input#n[i]), i=(1..k), suppose input#n consists
+ *
+ * of k tuples.
+ * For every ForEach input, pull all the results from ForEach.
+ * getNext will be called multiple times for a particular input,
+ * it returns one output tuple from ForEach every time we call getNext,
+ * so we need to maintain internal status to keep tracking of where we are.
+ */
+ @Override
+ public Result getNext() throws ExecException {
+
+ if(firstTime){
+ firstTime = false;
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ useDefaultBag = true;
+ }
+ }
+ }
+
+ Tuple it = null;
+
+ // If we see a new NullableTupleIterator, materialize n-1 inputs, construct ForEach input
+ // tuple res = (key, input#1, input#2....input#n), the only missing value is input#n,
+ // we will get input#n one tuple a time, fill in res, feed to ForEach.
+ // After this block, we have the first tuple of input#n in hand (kept in variable it)
+ if (newKey)
+ {
+ // Put n-1 inputs into bags
+ dbs = new DataBag[numInputs];
+ for (int i = 0; i < numInputs - 1; i++) {
+ if (!readOnce[i]) {
+ dbs[i] = bags[i];
+ } else {
+ dbs[i] = useDefaultBag ? BagFactory.getInstance()
+ .newDefaultBag()
+ // In a very rare case if there is a POStream after this
+ // POJoinPackage in the pipeline and is also blocking the
+ // pipeline;
+ // constructor argument should be 2 * numInputs. But for one
+ // obscure
+ // case we don't want to pay the penalty all the time.
+ : new InternalCachedBag(numInputs - 1);
+ dbs[i].addAll(bags[i]);
+ }
+ }
+
+ // For last bag, we always use NonSpillableBag.
+ dbs[lastBagIndex] = new NonSpillableDataBag((int)chunkSize);
+
+ lastBagIter = bags[lastBagIndex].iterator();
+
+ // If we don't have any tuple for input#n
+ // we do not need any further process, return EOP
+ if (!lastBagIter.hasNext()) {
+ // we will return at this point because we ought
+ // to be having a flatten on this last input
+ // and we have an empty bag which should result
+ // in this key being taken out of the output
+ newKey = true;
+ return eopResult;
+ }
+
+ res = mTupleFactory.newTuple(numInputs+1);
+ for (int i = 0; i < dbs.length; i++)
+ res.set(i+1,dbs[i]);
+
+ res.set(0,key);
+ // if we have an inner anywhere and the corresponding
+ // bag is empty, we can just return
+ for (int i = 0; i < dbs.length - 1; i++) {
+ if(inner[i]&&dbs[i].size()==0){
+ detachInput();
+ return eopResult;
+ }
+ }
+ newKey = false;
+ }
+
+ // Keep attaching input tuple to ForEach, until:
+ // 1. We can initialize ForEach.getNext();
+ // 2. There is no more input#n
+ while (lastBagIter.hasNext() || forEach.processingPlan) {
+ // if a previous call to foreach.getNext()
+ // has still not returned all output, process it
+ while (forEach.processingPlan) {
+ forEachResult = forEach.getNextTuple();
+ switch (forEachResult.returnStatus) {
+ case POStatus.STATUS_OK:
+ case POStatus.STATUS_ERR:
+ return forEachResult;
+ case POStatus.STATUS_NULL:
+ continue;
+ case POStatus.STATUS_EOP:
+ break;
+ }
+ }
+
+ if (lastBagIter.hasNext()) {
+ // try setting up a bag of CHUNKSIZE OR
+ // the remainder of the bag of last input
+ // (if < CHUNKSIZE) to foreach
+ dbs[lastBagIndex].clear(); // clear last chunk
+ for (int i = 0; i < chunkSize && lastBagIter.hasNext(); i++) {
+ it = lastBagIter.next();
+ dbs[lastBagIndex].add(it);
+ }
+ } else {
+ detachInput();
+ return eopResult;
+ }
+
+ // Attach the input to forEach
+ forEach.attachInput(res);
+
+ // pull output tuple from ForEach
+ Result forEachResult = forEach.getNextTuple();
+ {
+ switch (forEachResult.returnStatus) {
+ case POStatus.STATUS_OK:
+ case POStatus.STATUS_ERR:
+ return forEachResult;
+ case POStatus.STATUS_NULL:
+ continue;
+ case POStatus.STATUS_EOP:
+ break;
+ }
+ }
+ }
+ detachInput();
+ return eopResult;
+ }
+
+ @Override
+ void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+ throws ExecException {
+ super.attachInput(key, bags, readOnce);
+ this.newKey = true;
+ };
+
+ public List<PhysicalPlan> getInputPlans() {
+ return forEach.getInputPlans();
+ }
+
+ public void setInputPlans(List<PhysicalPlan> plans) {
+ forEach.setInputPlans(plans);
+ }
+
+ public void setToBeFlattened(List<Boolean> flattens) {
+ forEach.setToBeFlattened(flattens);
+ }
+
+ /**
+ * @return the forEach
+ */
+ public POOptimizedForEach getForEach() {
+ return forEach;
+ }
+
+ /**
+ * @param chunkSize - the chunk size for the biggest input
+ */
+ public void setChunkSize(long chunkSize) {
+ this.chunkSize = chunkSize;
+ }
+}