You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/12/13 20:11:04 UTC
svn commit: r1045314 [1/5] - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src...
Author: yanz
Date: Mon Dec 13 19:11:00 2010
New Revision: 1045314
URL: http://svn.apache.org/viewvc?rev=1045314&view=rev
Log:
PIG-1712: ILLUSTRATE rework (yanz)
Added:
pig/trunk/src/org/apache/pig/pen/FakeRawKeyValueIterator.java
pig/trunk/src/org/apache/pig/pen/Illustrable.java
pig/trunk/src/org/apache/pig/pen/Illustrator.java
pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java
pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java
pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java
pig/trunk/src/org/apache/pig/pen/PhysicalPlanResetter.java
pig/trunk/src/org/apache/pig/pen/util/ReverseDepthFirstWalker.java
pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput.txt
pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput2.txt
pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput_invalid.txt
pig/trunk/test/org/apache/pig/test/data/illustrate.pig
pig/trunk/test/org/apache/pig/test/data/illustrate2.pig
pig/trunk/test/org/apache/pig/test/data/illustrate3.pig
pig/trunk/test/org/apache/pig/test/data/illustrate4.pig
pig/trunk/test/org/apache/pig/test/data/illustrate5.pig
pig/trunk/test/org/apache/pig/test/data/illustrate6.pig
Removed:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java
pig/trunk/src/org/apache/pig/pen/LocalLogToPhyTranslationVisitor.java
pig/trunk/src/org/apache/pig/pen/physicalOperators/
pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java
pig/trunk/test/org/apache/pig/test/TestPOCogroup.java
pig/trunk/test/org/apache/pig/test/TestPOCross.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/PigServer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryComparisonOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONot.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORead.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
pig/trunk/src/org/apache/pig/data/AccumulativeBag.java
pig/trunk/src/org/apache/pig/data/TupleFactory.java
pig/trunk/src/org/apache/pig/impl/PigContext.java
pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java
pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java
pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java
pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java
pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java
pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java
pig/trunk/src/org/apache/pig/pen/util/DisplayExamples.java
pig/trunk/src/org/apache/pig/pen/util/ExampleTuple.java
pig/trunk/src/org/apache/pig/pen/util/LineageTracer.java
pig/trunk/src/org/apache/pig/pen/util/MetricEvaluation.java
pig/trunk/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java
pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java
pig/trunk/test/org/apache/pig/test/TestGrunt.java
pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
pig/trunk/test/org/apache/pig/test/TestStore.java
pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Dec 13 19:11:00 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1712: ILLUSTRATE rework (yanz)
+
PIG-1758: Deep cast of complex type (daijy)
PIG-1728: doc updates (chandec via olgan)
Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Mon Dec 13 19:11:00 2010
@@ -1125,23 +1125,32 @@ public class PigServer {
return currDAG.getAliasOp().keySet();
}
- public Map<LogicalOperator, DataBag> getExamples(String alias) {
+ public Map<LogicalOperator, DataBag> getExamples(String alias) throws IOException {
LogicalPlan plan = null;
-
try {
- if (currDAG.isBatchOn()) {
+ if (currDAG.isBatchOn() && alias != null) {
currDAG.execute();
}
-
- plan = getClonedGraph().getPlan(alias);
+ Graph g = getClonedGraph();
+ plan = g.getPlan(alias);
+ plan = compileLp(plan, g, false);
} catch (IOException e) {
//Since the original script is parsed anyway, there should not be an
//error in this parsing. The only reason there can be an error is when
//the files being loaded in load don't exist anymore.
e.printStackTrace();
}
+
ExampleGenerator exgen = new ExampleGenerator(plan, pigContext);
- return exgen.getExamples();
+ try {
+ return exgen.getExamples();
+ } catch (ExecException e) {
+ e.printStackTrace(System.out);
+ throw new IOException("ExecException : "+ e.getMessage());
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ throw new IOException("Exception : "+ e.getMessage());
+ }
}
private LogicalPlan getStorePlan(String alias) throws IOException {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon Dec 13 19:11:00 2010
@@ -57,6 +57,8 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOForEach;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
@@ -69,17 +71,17 @@ import org.apache.pig.newplan.logical.Lo
import org.apache.pig.newplan.logical.expression.ConstantExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
-import org.apache.pig.newplan.logical.optimizer.ProjectionPatcher.ProjectionFinder;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.rules.InputOutputFileValidator;
-import org.apache.pig.newplan.optimizer.Rule;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.pen.POOptimizeDisabler;
public class HExecutionEngine {
@@ -105,6 +107,11 @@ public class HExecutionEngine {
// map from LOGICAL key to into about the execution
protected Map<OperatorKey, MapRedResult> materializedResults;
+ protected Map<LogicalOperator, PhysicalOperator> logToPhyMap;
+ protected Map<LogicalOperator, LogicalRelationalOperator> opsMap;
+ protected Map<Operator, PhysicalOperator> newLogToPhyMap;
+ private Map<LOForEach, Map<LogicalOperator, LogicalRelationalOperator>> forEachInnerOpMap;
+
public HExecutionEngine(PigContext pigContext) {
this.pigContext = pigContext;
this.logicalToPhysicalKeys = new HashMap<OperatorKey, OperatorKey>();
@@ -255,8 +262,16 @@ public class HExecutionEngine {
// translate old logical plan to new plan
LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(plan);
visitor.visit();
+ opsMap = visitor.getOldToNewLOOpMap();
+ forEachInnerOpMap = visitor.getForEachInnerMap();
org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+ if (pigContext.inIllustrator) {
+ // disable all PO-specific optimizations
+ POOptimizeDisabler pod = new POOptimizeDisabler(newPlan);
+ pod.visit();
+ }
+
SchemaResetter schemaResetter = new SchemaResetter(newPlan);
schemaResetter.visit();
@@ -271,6 +286,22 @@ public class HExecutionEngine {
throw new FrontendException(msg, errCode, PigException.BUG, ioe);
}
+ if (pigContext.inIllustrator) {
+ // disable MergeForEach in illustrator
+ if (optimizerRules == null)
+ optimizerRules = new HashSet<String>();
+ optimizerRules.add("MergeForEach");
+ optimizerRules.add("PartitionFilterOptimizer");
+ optimizerRules.add("LimitOptimizer");
+ optimizerRules.add("SplitFilter");
+ optimizerRules.add("PushUpFilter");
+ optimizerRules.add("MergeFilter");
+ optimizerRules.add("PushDownForEachFlatten");
+ optimizerRules.add("ColumnMapKeyPrune");
+ optimizerRules.add("AddForEach");
+ optimizerRules.add("GroupByConstParallelSetter");
+ }
+
// run optimizer
org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer optimizer =
new org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer(newPlan, 100, optimizerRules);
@@ -294,6 +325,7 @@ public class HExecutionEngine {
translator.setPigContext(pigContext);
translator.visit();
+ newLogToPhyMap = translator.getLogToPhyMap();
return translator.getPhysicalPlan();
}else{
@@ -303,13 +335,40 @@ public class HExecutionEngine {
translator.visit();
return translator.getPhysicalPlan();
}
- } catch (Exception ve) {
+ } catch (ExecException ve) {
int errCode = 2042;
String msg = "Error in new logical plan. Try -Dpig.usenewlogicalplan=false.";
throw new FrontendException(msg, errCode, PigException.BUG, ve);
}
}
+ public Map<LogicalOperator, PhysicalOperator> getLogToPhyMap() {
+ if (logToPhyMap != null)
+ return logToPhyMap;
+ else if (newLogToPhyMap != null) {
+ Map<LogicalOperator, PhysicalOperator> result = new HashMap<LogicalOperator, PhysicalOperator>();
+ for (LogicalOperator lo: opsMap.keySet()) {
+ result.put(lo, newLogToPhyMap.get(opsMap.get(lo)));
+ }
+ return result;
+ } else
+ return null;
+ }
+
+ public Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> getForEachInnerLogToPhyMap() {
+ Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> result =
+ new HashMap<LOForEach, Map<LogicalOperator, PhysicalOperator>>();
+ for (Map.Entry<LOForEach, Map<LogicalOperator, LogicalRelationalOperator>> entry :
+ forEachInnerOpMap.entrySet()) {
+ Map<LogicalOperator, PhysicalOperator> innerOpMap = new HashMap<LogicalOperator, PhysicalOperator>();
+ for (Map.Entry<LogicalOperator, LogicalRelationalOperator> innerEntry : entry.getValue().entrySet()) {
+ innerOpMap.put(innerEntry.getKey(), newLogToPhyMap.get(innerEntry.getValue()));
+ }
+ result.put(entry.getKey(), innerOpMap);
+ }
+ return result;
+ }
+
public static class SortInfoSetter extends LogicalRelationalNodesVisitor {
public SortInfoSetter(OperatorPlan plan) throws FrontendException {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Dec 13 19:11:00 2010
@@ -174,7 +174,10 @@ public class JobControlCompiler{
UDFContext.getUDFContext().reset();
}
- Map<Job, MapReduceOper> getJobMroMap() {
+ /**
+ * Gets the map of Job and the MR Operator
+ */
+ public Map<Job, MapReduceOper> getJobMroMap() {
return Collections.unmodifiableMap(jobMroMap);
}
@@ -378,19 +381,23 @@ public class JobControlCompiler{
inpTargets.add(ldSucKeys);
inpSignatureLists.add(ld.getSignature());
//Remove the POLoad from the plan
- mro.mapPlan.remove(ld);
+ if (!pigContext.inIllustrator)
+ mro.mapPlan.remove(ld);
}
}
- //Create the jar of all functions and classes required
- File submitJarFile = File.createTempFile("Job", ".jar");
- // ensure the job jar is deleted on exit
- submitJarFile.deleteOnExit();
- FileOutputStream fos = new FileOutputStream(submitJarFile);
- JarManager.createJar(fos, mro.UDFs, pigContext);
+ if (!pigContext.inIllustrator)
+ {
+ //Create the jar of all functions and classes required
+ File submitJarFile = File.createTempFile("Job", ".jar");
+ // ensure the job jar is deleted on exit
+ submitJarFile.deleteOnExit();
+ FileOutputStream fos = new FileOutputStream(submitJarFile);
+ JarManager.createJar(fos, mro.UDFs, pigContext);
- //Start setting the JobConf properties
- conf.set("mapred.jar", submitJarFile.getPath());
+ //Start setting the JobConf properties
+ conf.set("mapred.jar", submitJarFile.getPath());
+ }
conf.set("pig.inputs", ObjectSerializer.serialize(inp));
conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
conf.set("pig.inpSignatures", ObjectSerializer.serialize(inpSignatureLists));
@@ -457,22 +464,23 @@ public class JobControlCompiler{
POStore st;
if (reduceStores.isEmpty()) {
st = mapStores.get(0);
- mro.mapPlan.remove(st);
+ if(!pigContext.inIllustrator)
+ mro.mapPlan.remove(st);
}
else {
st = reduceStores.get(0);
- mro.reducePlan.remove(st);
+ if(!pigContext.inIllustrator)
+ mro.reducePlan.remove(st);
}
// set out filespecs
String outputPath = st.getSFile().getFileName();
- FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
conf.set("pig.streaming.log.dir",
new Path(outputPath, LOG_DIR).toString());
conf.set("pig.streaming.task.output.dir", outputPath);
}
- else { // multi store case
+ else if (mapStores.size() + reduceStores.size() > 0) { // multi store case
log.info("Setting up multi store job");
String tmpLocationStr = FileLocalizer
.getTemporaryPath(pigContext).toString();
@@ -513,7 +521,8 @@ public class JobControlCompiler{
//MapOnly Job
nwJob.setMapperClass(PigMapOnly.Map.class);
nwJob.setNumReduceTasks(0);
- conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
+ if(!pigContext.inIllustrator)
+ conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
if(mro.isEndOfAllInputSetInMap()) {
// this is used in Map.close() to decide whether the
// pipeline needs to be rerun one more time in the close()
@@ -535,7 +544,8 @@ public class JobControlCompiler{
log.info("Setting identity combiner class.");
}
pack = (POPackage)mro.reducePlan.getRoots().get(0);
- mro.reducePlan.remove(pack);
+ if(!pigContext.inIllustrator)
+ mro.reducePlan.remove(pack);
nwJob.setMapperClass(PigMapReduce.Map.class);
nwJob.setReducerClass(PigMapReduce.Reduce.class);
@@ -550,21 +560,24 @@ public class JobControlCompiler{
if (mro.customPartitioner != null)
nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
- conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
+ if(!pigContext.inIllustrator)
+ conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
if(mro.isEndOfAllInputSetInMap()) {
// this is used in Map.close() to decide whether the
// pipeline needs to be rerun one more time in the close()
// The pipeline is rerun only if there was a stream or merge-join.
conf.set(END_OF_INP_IN_MAP, "true");
}
- conf.set("pig.reducePlan", ObjectSerializer.serialize(mro.reducePlan));
+ if(!pigContext.inIllustrator)
+ conf.set("pig.reducePlan", ObjectSerializer.serialize(mro.reducePlan));
if(mro.isEndOfAllInputSetInReduce()) {
// this is used in Map.close() to decide whether the
// pipeline needs to be rerun one more time in the close()
// The pipeline is rerun only if there was a stream
conf.set("pig.stream.in.reduce", "true");
}
- conf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
+ if (!pigContext.inIllustrator)
+ conf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
conf.set("pig.reduce.key.type", Byte.toString(pack.getKeyType()));
if (mro.getUseSecondaryKey()) {
@@ -631,9 +644,14 @@ public class JobControlCompiler{
nwJob.setGroupingComparatorClass(PigGroupingPartitionWritableComparator.class);
}
- // unset inputs for POStore, otherwise, map/reduce plan will be unnecessarily deserialized
- for (POStore st: mapStores) { st.setInputs(null); st.setParentPlan(null);}
- for (POStore st: reduceStores) { st.setInputs(null); st.setParentPlan(null);}
+ if (!pigContext.inIllustrator)
+ {
+ // unset inputs for POStore, otherwise, map/reduce plan will be unnecessarily deserialized
+ for (POStore st: mapStores) { st.setInputs(null); st.setParentPlan(null);}
+ for (POStore st: reduceStores) { st.setInputs(null); st.setParentPlan(null);}
+ conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
+ conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores));
+ }
// tmp file compression setups
if (Utils.tmpFileCompression(pigContext)) {
@@ -641,8 +659,6 @@ public class JobControlCompiler{
conf.set("pig.tmpfilecompression.codec", Utils.tmpFileCompressionCodec(pigContext));
}
- conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
- conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores));
String tmp;
long maxCombinedSplitSize = 0;
if (!mro.combineSmallSplits() || pigContext.getProperties().getProperty("pig.splitCombination", "true").equals("false"))
@@ -661,7 +677,6 @@ public class JobControlCompiler{
UDFContext.getUDFContext().serialize(conf);
Job cjob = new Job(new JobConf(nwJob.getConfiguration()), new ArrayList());
jobStoreMap.put(cjob,new Pair<List<POStore>, Path>(storeLocations, tmpLocation));
-
return cjob;
} catch (JobCreationException jce) {
@@ -1142,7 +1157,7 @@ public class JobControlCompiler{
PigContext pigContext, Configuration conf, String filename,
String prefix) throws IOException {
- if (!FileLocalizer.fileExists(filename, pigContext)) {
+ if (!pigContext.inIllustrator && !FileLocalizer.fileExists(filename, pigContext)) {
throw new IOException(
"Internal error: skew join partition file "
+ filename + " does not exist");
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Dec 13 19:11:00 2010
@@ -109,7 +109,6 @@ import org.apache.pig.impl.util.ObjectSe
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UriUtil;
import org.apache.pig.impl.util.Utils;
-import org.mortbay.util.URIUtil;
/**
* The compiler that compiles a given physical plan
@@ -311,6 +310,7 @@ public class MRCompiler extends PhyPlanV
public MROperPlan compile() throws IOException, PlanException, VisitorException {
List<PhysicalOperator> leaves = plan.getLeaves();
+ if (!pigContext.inIllustrator)
for (PhysicalOperator op : leaves) {
if (!(op instanceof POStore)) {
int errCode = 2025;
@@ -324,8 +324,14 @@ public class MRCompiler extends PhyPlanV
// and compile their plans
List<POStore> stores = PlanHelper.getStores(plan);
List<PONative> nativeMRs= PlanHelper.getNativeMRs(plan);
- List<PhysicalOperator> ops = new ArrayList<PhysicalOperator>(stores.size() + nativeMRs.size());
- ops.addAll(stores);
+ List<PhysicalOperator> ops;
+ if (!pigContext.inIllustrator) {
+ ops = new ArrayList<PhysicalOperator>(stores.size() + nativeMRs.size());
+ ops.addAll(stores);
+ } else {
+ ops = new ArrayList<PhysicalOperator>(leaves.size() + nativeMRs.size());
+ ops.addAll(leaves);
+ }
ops.addAll(nativeMRs);
Collections.sort(ops);
@@ -1005,16 +1011,23 @@ public class MRCompiler extends PhyPlanV
if (!mro.isMapDone()) {
// if map plan is open, add a limit for optimization, eventually we
// will add another limit to reduce plan
- mro.mapPlan.addAsLeaf(op);
- mro.setMapDone(true);
+ if (!pigContext.inIllustrator)
+ {
+ mro.mapPlan.addAsLeaf(op);
+ mro.setMapDone(true);
+ }
if (mro.reducePlan.isEmpty())
{
simpleConnectMapToReduce(mro);
mro.requestedParallelism = 1;
- POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pLimit2.setLimit(op.getLimit());
- mro.reducePlan.addAsLeaf(pLimit2);
+ if (!pigContext.inIllustrator) {
+ POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ pLimit2.setLimit(op.getLimit());
+ mro.reducePlan.addAsLeaf(pLimit2);
+ } else {
+ mro.reducePlan.addAsLeaf(op);
+ }
}
else
{
@@ -1848,6 +1861,7 @@ public class MRCompiler extends PhyPlanV
curMROp.reducePlan.addAsLeaf(nfe1);
curMROp.setNeedsDistinctCombiner(true);
phyToMROpMap.put(op, curMROp);
+ curMROp.phyToMRMap.put(op, nfe1);
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " + op.getClass().getSimpleName();
@@ -2221,12 +2235,13 @@ public class MRCompiler extends PhyPlanV
POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps2,flattened);
mro.reducePlan.add(nfe1);
mro.reducePlan.connect(pkg, nfe1);
-
+ mro.phyToMRMap.put(sort, nfe1);
if (limit!=-1)
{
POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
pLimit2.setLimit(limit);
mro.reducePlan.addAsLeaf(pLimit2);
+ mro.phyToMRMap.put(sort, pLimit2);
}
// ep1.add(innGen);
@@ -2649,7 +2664,7 @@ public class MRCompiler extends PhyPlanV
POPackage pack = (POPackage)op;
List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(pack);
- if (sucs.size()!=1) {
+ if (sucs == null || sucs.size()!=1) {
return;
}
@@ -2739,12 +2754,12 @@ public class MRCompiler extends PhyPlanV
{
// Now we can optimize the map-reduce plan
// Replace POPackage->POForeach to POJoinPackage
- replaceWithPOJoinPackage(mr.reducePlan, pack, forEach, chunkSize);
+ replaceWithPOJoinPackage(mr.reducePlan, mr, pack, forEach, chunkSize);
}
}
}
- public static void replaceWithPOJoinPackage(PhysicalPlan plan,
+ public static void replaceWithPOJoinPackage(PhysicalPlan plan, MapReduceOper mr,
POPackage pack, POForEach forEach, String chunkSize) throws VisitorException {
String scope = pack.getOperatorKey().scope;
NodeIdGenerator nig = NodeIdGenerator.getGenerator();
@@ -2772,7 +2787,7 @@ public class MRCompiler extends PhyPlanV
String msg = "Error rewriting POJoinPackage.";
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
-
+ mr.phyToMRMap.put(forEach, joinPackage);
LogFactory.
getLog(LastInputStreamingOptimizer.class).info("Rewrite: POPackage->POForEach to POJoinPackage");
}
@@ -2800,6 +2815,7 @@ public class MRCompiler extends PhyPlanV
throw new MRCompilerException(msg, errCode, PigException.BUG);
}
PhysicalOperator mpLeaf = mpLeaves.get(0);
+ if (!pigContext.inIllustrator)
if (!(mpLeaf instanceof POStore)) {
int errCode = 2025;
String msg = "Expected leaf of reduce plan to " +
@@ -2885,6 +2901,7 @@ public class MRCompiler extends PhyPlanV
throw new MRCompilerException(msg, errCode, PigException.BUG);
}
PhysicalOperator mpLeaf = mpLeaves.get(0);
+ if (!pigContext.inIllustrator)
if (!(mpLeaf instanceof POStore)) {
int errCode = 2025;
String msg = "Expected leaf of reduce plan to " +
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Mon Dec 13 19:11:00 2010
@@ -461,7 +461,7 @@ public class MapReduceLauncher extends L
}
}
- private MROperPlan compile(
+ public MROperPlan compile(
PhysicalPlan php,
PigContext pc) throws PlanException, IOException, VisitorException {
MRCompiler comp = new MRCompiler(php, pc);
@@ -479,7 +479,7 @@ public class MapReduceLauncher extends L
//String prop = System.getProperty("pig.exec.nocombiner");
String prop = pc.getProperties().getProperty("pig.exec.nocombiner");
- if (!("true".equals(prop))) {
+ if (!pc.inIllustrator && !("true".equals(prop))) {
CombinerOptimizer co = new CombinerOptimizer(plan, lastInputChunkSize);
co.visit();
//display the warning message(s) from the CombinerOptimizer
@@ -493,7 +493,7 @@ public class MapReduceLauncher extends L
// Optimize to use secondary sort key if possible
prop = pc.getProperties().getProperty("pig.exec.nosecondarykey");
- if (!("true".equals(prop))) {
+ if (!pc.inIllustrator && !("true".equals(prop))) {
SecondaryKeyOptimizer skOptimizer = new SecondaryKeyOptimizer(plan);
skOptimizer.visit();
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Mon Dec 13 19:11:00 2010
@@ -20,6 +20,8 @@ package org.apache.pig.backend.hadoop.ex
import java.io.ByteArrayOutputStream;
import java.util.HashSet;
import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -146,6 +148,10 @@ public class MapReduceOper extends Opera
// are NOT combinable for correctness.
private boolean combineSmallSplits = true;
+ // Map of the physical operator in physical plan to the one in MR plan: only needed
+ // if the physical operator is changed/replaced in MR compilation due to, e.g., optimization
+ public Map<PhysicalOperator, PhysicalOperator> phyToMRMap;
+
private static enum OPER_FEATURE {
NONE,
// Indicate if this job is a sampling job
@@ -169,6 +175,7 @@ public class MapReduceOper extends Opera
scalars = new HashSet<PhysicalOperator>();
nig = NodeIdGenerator.getGenerator();
scope = k.getScope();
+ phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();
}
/*@Override
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Mon Dec 13 19:11:00 2010
@@ -276,13 +276,6 @@ public class PhyPlanSetter extends PhyPl
stream.setParentPlan(parent);
}
- @Override
- public void visitLocalRearrangeForIllustrate(
- POLocalRearrangeForIllustrate lrfi) throws VisitorException {
- super.visitLocalRearrangeForIllustrate(lrfi);
- lrfi.setParentPlan(parent);
- }
-
/*
@Override
public void visitPartitionRearrange(POPartitionRearrange lrfi) throws VisitorException {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Mon Dec 13 19:11:00 2010
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -28,6 +29,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.log4j.PropertyConfigurator;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -40,6 +43,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.PigNullableWritable;
@@ -48,6 +52,7 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.Pair;
import org.apache.pig.tools.pigstats.PigStatusReporter;
public abstract class PigMapBase extends Mapper<Text, Tuple, PigNullableWritable, Writable> {
@@ -58,13 +63,15 @@ public abstract class PigMapBase extends
protected byte keyType;
//Map Plan
- protected PhysicalPlan mp;
+ protected PhysicalPlan mp = null;
// Store operators
protected List<POStore> stores;
protected TupleFactory tf = TupleFactory.getInstance();
+ boolean inIllustrator = false;
+
Context outputCollector;
// Reporter that will be used by operators
@@ -81,6 +88,14 @@ public abstract class PigMapBase extends
private volatile boolean initialized = false;
/**
+ * for local map/reduce simulation
+ * @param plan the map plan
+ */
+ public void setMapPlan(PhysicalPlan plan) {
+ mp = plan;
+ }
+
+ /**
* Will be called when all the tuples in the input
* are done. So reporter thread should be closed.
*/
@@ -142,14 +157,16 @@ public abstract class PigMapBase extends
SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
PigMapReduce.sJobContext = context;
PigMapReduce.sJobConf = context.getConfiguration();
+ inIllustrator = (context instanceof IllustratorContext);
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
if (pigContext.getLog4jProperties()!=null)
PropertyConfigurator.configure(pigContext.getLog4jProperties());
- mp = (PhysicalPlan) ObjectSerializer.deserialize(
- job.get("pig.mapPlan"));
+ if (mp == null)
+ mp = (PhysicalPlan) ObjectSerializer.deserialize(
+ job.get("pig.mapPlan"));
stores = PlanHelper.getStores(mp);
// To be removed
@@ -207,7 +224,8 @@ public abstract class PigMapBase extends
MapReducePOStoreImpl impl
= new MapReducePOStoreImpl(context);
store.setStoreImpl(impl);
- store.setUp();
+ if (!pigContext.inIllustrator)
+ store.setUp();
}
boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
@@ -225,7 +243,13 @@ public abstract class PigMapBase extends
}
for (PhysicalOperator root : roots) {
- root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));
+ if (inIllustrator) {
+ if (root != null) {
+ root.attachInput(inpTuple);
+ }
+ } else {
+ root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));
+ }
}
runPipeline(leaf);
@@ -284,4 +308,76 @@ public abstract class PigMapBase extends
this.keyType = keyType;
}
+ /**
+ *
+ * Get mapper's illustrator context
+ *
+ * @param conf Configuration
+ * @param input Input bag to serve as data source
+ * @param output Map output buffer
+ * @param split the split
+ * @return Illustrator's context
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public Context getIllustratorContext(Configuration conf, DataBag input,
+ List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
+ throws IOException, InterruptedException {
+ return new IllustratorContext(conf, input, output, split);
+ }
+
+ public class IllustratorContext extends Context {
+ private DataBag input;
+ List<Pair<PigNullableWritable, Writable>> output;
+ private Iterator<Tuple> it = null;
+ private Tuple value = null;
+ private boolean init = false;
+
+ public IllustratorContext(Configuration conf, DataBag input,
+ List<Pair<PigNullableWritable, Writable>> output,
+ InputSplit split) throws IOException, InterruptedException {
+ super(conf, new TaskAttemptID(), null, null, null, null, split);
+ if (output == null)
+ throw new IOException("Null output can not be used");
+ this.input = input; this.output = output;
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (input == null) {
+ if (!init) {
+ init = true;
+ return true;
+ }
+ return false;
+ }
+ if (it == null)
+ it = input.iterator();
+ if (!it.hasNext())
+ return false;
+ value = it.next();
+ return true;
+ }
+
+ @Override
+ public Text getCurrentKey() {
+ return null;
+ }
+
+ @Override
+ public Tuple getCurrentValue() {
+ return value;
+ }
+
+ @Override
+ public void write(PigNullableWritable key, Writable value)
+ throws IOException, InterruptedException {
+ output.add(new Pair<PigNullableWritable, Writable>(key, value));
+ }
+
+ @Override
+ public void progress() {
+
+ }
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Mon Dec 13 19:11:00 2010
@@ -18,18 +18,24 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Collections;
+import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.log4j.PropertyConfigurator;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
@@ -43,6 +49,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.pen.FakeRawKeyValueIterator;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -55,6 +62,7 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.SpillableMemoryManager;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Pair;
import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
@@ -111,8 +119,8 @@ public class PigMapReduce {
// value. The value needs it so that POPackage can properly
// assign the tuple to its slot in the projection.
key.setIndex(index);
- val.setIndex(index);
-
+ val.setIndex(index);
+
oc.write(key, val);
}
}
@@ -194,7 +202,6 @@ public class PigMapReduce {
// set the partition
wrappedKey.setPartition(partitionIndex);
val.setIndex(index);
-
oc.write(wrappedKey, val);
}
@@ -254,7 +261,7 @@ public class PigMapReduce {
protected final Log log = LogFactory.getLog(getClass());
//The reduce plan
- protected PhysicalPlan rp;
+ protected PhysicalPlan rp = null;
// Store operators
protected List<POStore> stores;
@@ -279,6 +286,16 @@ public class PigMapReduce {
PigContext pigContext = null;
protected volatile boolean initialized = false;
+ private boolean inIllustrator = false;
+
+ /**
+ * Set the reduce plan: to be used by local runner for illustrator
+ * @param plan Reduce plan
+ */
+ public void setReducePlan(PhysicalPlan plan) {
+ rp = plan;
+ }
+
/**
* Configures the Reduce plan, the POPackage operator
* and the reporter thread
@@ -287,7 +304,9 @@ public class PigMapReduce {
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
-
+ inIllustrator = (context instanceof IllustratorContext);
+ if (inIllustrator)
+ pack = ((IllustratorContext) context).pack;
Configuration jConf = context.getConfiguration();
SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
sJobContext = context;
@@ -296,11 +315,13 @@ public class PigMapReduce {
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
- rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
- .get("pig.reducePlan"));
+ if (rp == null)
+ rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
+ .get("pig.reducePlan"));
stores = PlanHelper.getStores(rp);
- pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.reduce.package"));
+ if (!inIllustrator)
+ pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.reduce.package"));
// To be removed
if(rp.isEmpty())
log.debug("Reduce Plan empty!");
@@ -352,12 +373,13 @@ public class PigMapReduce {
PhysicalOperator.setPigLogger(pigHadoopLogger);
- for (POStore store: stores) {
- MapReducePOStoreImpl impl
- = new MapReducePOStoreImpl(context);
- store.setStoreImpl(impl);
- store.setUp();
- }
+ if (!inIllustrator)
+ for (POStore store: stores) {
+ MapReducePOStoreImpl impl
+ = new MapReducePOStoreImpl(context);
+ store.setStoreImpl(impl);
+ store.setUp();
+ }
}
// In the case we optimize the join, we combine
@@ -512,6 +534,127 @@ public class PigMapReduce {
PhysicalOperator.setReporter(null);
initialized = false;
}
+
+ /**
+ * Get reducer's illustrator context
+ *
+ * @param input Input buffer as output by maps
+ * @param pkg package
+ * @return reducer's illustrator context
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public Context getIllustratorContext(Job job,
+ List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException {
+ return new IllustratorContext(job, input, pkg);
+ }
+
+ @SuppressWarnings("unchecked")
+ public class IllustratorContext extends Context {
+ private PigNullableWritable currentKey = null, nextKey = null;
+ private NullableTuple nextValue = null;
+ private List<NullableTuple> currentValues = null;
+ private Iterator<Pair<PigNullableWritable, Writable>> it;
+ private final ByteArrayOutputStream bos;
+ private final DataOutputStream dos;
+ private final RawComparator sortComparator, groupingComparator;
+ POPackage pack = null;
+
+ public IllustratorContext(Job job,
+ List<Pair<PigNullableWritable, Writable>> input,
+ POPackage pkg
+ ) throws IOException, InterruptedException {
+ super(job.getJobConf(), new TaskAttemptID(), new FakeRawKeyValueIterator(input.iterator().hasNext()),
+ null, null, null, null, null, null, PigNullableWritable.class, NullableTuple.class);
+ bos = new ByteArrayOutputStream();
+ dos = new DataOutputStream(bos);
+ org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf());
+ sortComparator = nwJob.getSortComparator();
+ groupingComparator = nwJob.getGroupingComparator();
+
+ Collections.sort(input, new Comparator<Pair<PigNullableWritable, Writable>>() {
+ @Override
+ public int compare(Pair<PigNullableWritable, Writable> o1,
+ Pair<PigNullableWritable, Writable> o2) {
+ try {
+ o1.first.write(dos);
+ int l1 = bos.size();
+ o2.first.write(dos);
+ int l2 = bos.size();
+ byte[] bytes = bos.toByteArray();
+ bos.reset();
+ return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1);
+ } catch (IOException e) {
+ throw new RuntimeException("Serialization exception in sort:"+e.getMessage());
+ }
+ }
+ }
+ );
+ currentValues = new ArrayList<NullableTuple>();
+ it = input.iterator();
+ if (it.hasNext()) {
+ Pair<PigNullableWritable, Writable> entry = it.next();
+ nextKey = entry.first;
+ nextValue = (NullableTuple) entry.second;
+ }
+ pack = pkg;
+ }
+
+ @Override
+ public PigNullableWritable getCurrentKey() {
+ return currentKey;
+ }
+
+ @Override
+ public boolean nextKey() {
+ if (nextKey == null)
+ return false;
+ currentKey = nextKey;
+ currentValues.clear();
+ currentValues.add(nextValue);
+ nextKey = null;
+ for(; it.hasNext(); ) {
+ Pair<PigNullableWritable, Writable> entry = it.next();
+ /* Why can't raw comparison be used?
+ byte[] bytes;
+ int l1, l2;
+ try {
+ currentKey.write(dos);
+ l1 = bos.size();
+ entry.first.write(dos);
+ l2 = bos.size();
+ bytes = bos.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException("nextKey exception : "+e.getMessage());
+ }
+ bos.reset();
+ if (groupingComparator.compare(bytes, 0, l1, bytes, l1, l2-l1) == 0)
+ */
+ if (groupingComparator.compare(currentKey, entry.first) == 0)
+ {
+ currentValues.add((NullableTuple)entry.second);
+ } else {
+ nextKey = entry.first;
+ nextValue = (NullableTuple) entry.second;
+ break;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public Iterable<NullableTuple> getValues() {
+ return currentValues;
+ }
+
+ @Override
+ public void write(PigNullableWritable k, Writable t) {
+ }
+
+ @Override
+ public void progress() {
+ }
+ }
}
/**
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Mon Dec 13 19:11:00 2010
@@ -91,6 +91,10 @@ public class LogToPhyTranslationVisitor
return currentPlan;
}
+ public Map<LogicalOperator, PhysicalOperator> getLogToPhyMap() {
+ return logToPhyMap;
+ }
+
@Override
protected void visit(LOGreaterThan op) throws VisitorException {
String scope = op.getOperatorKey().scope;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Mon Dec 13 19:11:00 2010
@@ -35,6 +35,8 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.pen.Illustrator;
+import org.apache.pig.pen.Illustrable;
/**
*
@@ -58,7 +60,7 @@ import org.apache.pig.pen.util.LineageTr
* only those types that are supported.
*
*/
-public abstract class PhysicalOperator extends Operator<PhyPlanVisitor> implements Cloneable {
+public abstract class PhysicalOperator extends Operator<PhyPlanVisitor> implements Illustrable, Cloneable {
private Log log = LogFactory.getLog(getClass());
@@ -125,8 +127,14 @@ public abstract class PhysicalOperator e
static final protected Map dummyMap = null;
+ // TODO: This is not needed. But a lot of tests check serialized physical plans
+ // that are sensitive to the serialized image of the contained physical operators.
+ // So for now, just keep it. Later it'll be cleansed along with those test golden
+ // files
protected LineageTracer lineageTracer;
+ protected transient Illustrator illustrator = null;
+
private boolean accum;
private transient boolean accumStart;
@@ -149,8 +157,13 @@ public abstract class PhysicalOperator e
res = new Result();
}
- public void setLineageTracer(LineageTracer lineage) {
- this.lineageTracer = lineage;
+ @Override
+ public void setIllustrator(Illustrator illustrator) {
+ this.illustrator = illustrator;
+ }
+
+ public Illustrator getIllustrator() {
+ return illustrator;
}
public int getRequestedParallelism() {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryComparisonOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryComparisonOperator.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryComparisonOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryComparisonOperator.java Mon Dec 13 19:11:00 2010
@@ -17,6 +17,7 @@
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
/**
@@ -66,4 +67,11 @@ public abstract class BinaryComparisonOp
operandType = op.operandType;
super.cloneHelper(op);
}
+
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if (illustrator != null) {
+ illustrator.setSubExpResult(eqClassIndex == 0);
+ }
+ return null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java Mon Dec 13 19:11:00 2010
@@ -20,7 +20,9 @@ package org.apache.pig.backend.hadoop.ex
import java.util.ArrayList;
import java.util.List;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.IdentityHashSet;
/**
* A base class for all Binary expression operators.
@@ -84,4 +86,9 @@ public abstract class BinaryExpressionOp
rhs = op.rhs;
super.cloneHelper(op);
}
-}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return null;
+ }
+ }
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java Mon Dec 13 19:11:00 2010
@@ -204,4 +204,9 @@ public class ConstantExpression extends
public List<ExpressionOperator> getChildExpressions() {
return null;
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return (Tuple) out;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java Mon Dec 13 19:11:00 2010
@@ -185,7 +185,7 @@ public class EqualToExpr extends BinaryC
}else{
throw new ExecException("The left side and right side has the different types");
}
-
+ illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1);
return left;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java Mon Dec 13 19:11:00 2010
@@ -35,6 +35,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.Illustrator;
/**
* A base class for all types of expressions. All expression
@@ -55,6 +56,11 @@ public abstract class ExpressionOperator
}
@Override
+ public void setIllustrator(Illustrator illustrator) {
+ this.illustrator = illustrator;
+ }
+
+ @Override
public boolean supportsMultipleOutputs() {
return false;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java Mon Dec 13 19:11:00 2010
@@ -154,6 +154,7 @@ public class GTOrEqualToExpr extends Bin
} else {
left.result = falseRef;
}
+ illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1);
return left;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java Mon Dec 13 19:11:00 2010
@@ -17,15 +17,11 @@
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -37,7 +33,6 @@ public class GreaterThanExpr extends Bin
*
*/
private static final long serialVersionUID = 1L;
- transient private final Log log = LogFactory.getLog(getClass());
public GreaterThanExpr(OperatorKey k) {
this(k, -1);
@@ -60,7 +55,6 @@ public class GreaterThanExpr extends Bin
@Override
public Result getNext(Boolean bool) throws ExecException {
- byte status;
Result left, right;
switch (operandType) {
@@ -154,6 +148,7 @@ public class GreaterThanExpr extends Bin
} else {
left.result = falseRef;
}
+ illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1);
return left;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java Mon Dec 13 19:11:00 2010
@@ -154,6 +154,7 @@ public class LTOrEqualToExpr extends Bin
} else {
left.result = falseRef;
}
+ illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1);
return left;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java Mon Dec 13 19:11:00 2010
@@ -154,6 +154,7 @@ public class LessThanExpr extends Binary
} else {
left.result = falseRef;
}
+ illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1);
return left;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java Mon Dec 13 19:11:00 2010
@@ -184,6 +184,7 @@ public class NotEqualToExpr extends Bina
}else{
throw new ExecException("The left side and right side has the different types");
}
+ illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1);
return left;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java Mon Dec 13 19:11:00 2010
@@ -78,9 +78,20 @@ public class POAnd extends BinaryCompari
// 3) f f f f
// Short circuit - if lhs is false, return false; ROW 3 above is handled with this
- if (left.result != null && !(((Boolean)left.result).booleanValue())) return left;
+ boolean returnLeft = false;
+ if (left.result != null && !(((Boolean)left.result).booleanValue())) {
+ if (illustrator == null) {
+ return left;
+ }
+ illustratorMarkup(null, left.result, 1);
+ returnLeft = true;
+ }
Result right = rhs.getNext(dummyBool);
+ if (returnLeft) {
+ return left;
+ }
+
// pass on ERROR and EOP
if(right.returnStatus != POStatus.STATUS_OK && right.returnStatus != POStatus.STATUS_NULL) {
return right;
@@ -94,6 +105,8 @@ public class POAnd extends BinaryCompari
// No matter what, what we get from the right side is what we'll
// return, null, true, or false.
+ if (right.result != null)
+ illustratorMarkup(null, right.result, (Boolean) right.result ? 0 : 1);
return right;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java Mon Dec 13 19:11:00 2010
@@ -32,6 +32,7 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
public class POBinCond extends ExpressionOperator {
@@ -65,7 +66,9 @@ public class POBinCond extends Expressio
Result res = cond.getNext(b);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
- return ((Boolean)res.result) == true ? lhs.getNext(b) : rhs.getNext(b);
+ Result result = ((Boolean)res.result) == true ? lhs.getNext(b) : rhs.getNext(b);
+ illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+ return result;
}
@@ -88,7 +91,9 @@ public class POBinCond extends Expressio
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
- return ((Boolean)res.result) == true ? lhs.getNext(db) : rhs.getNext(db);
+ Result result = ((Boolean)res.result) == true ? lhs.getNext(db) : rhs.getNext(db);
+ illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+ return result;
}
@Override
@@ -109,7 +114,9 @@ public class POBinCond extends Expressio
}
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
- return ((Boolean)res.result) == true ? lhs.getNext(ba) : rhs.getNext(ba);
+ Result result = ((Boolean)res.result) == true ? lhs.getNext(ba) : rhs.getNext(ba);
+ illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+ return result;
}
@Override
@@ -130,7 +137,9 @@ public class POBinCond extends Expressio
}
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
- return ((Boolean)res.result) == true ? lhs.getNext(d) : rhs.getNext(d);
+ Result result = ((Boolean)res.result) == true ? lhs.getNext(d) : rhs.getNext(d);
+ illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+ return result;
}
@Override
@@ -151,7 +160,9 @@ public class POBinCond extends Expressio
}
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
- return ((Boolean)res.result) == true ? lhs.getNext(f) : rhs.getNext(f);
+ Result result = ((Boolean)res.result) == true ? lhs.getNext(f) : rhs.getNext(f);
+ illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+ return result;
}
@Override
@@ -171,7 +182,9 @@ public class POBinCond extends Expressio
}
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
- return ((Boolean)res.result) == true ? lhs.getNext(i) : rhs.getNext(i);
+ Result result = ((Boolean)res.result) == true ? lhs.getNext(i) : rhs.getNext(i);
+ illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+ return result;
}
@Override
@@ -192,7 +205,9 @@ public class POBinCond extends Expressio
}
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
- return ((Boolean)res.result) == true ? lhs.getNext(l) : rhs.getNext(l);
+ Result result = ((Boolean)res.result) == true ? lhs.getNext(l) : rhs.getNext(l);
+ illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+ return result;
}
@Override
@@ -213,7 +228,9 @@ public class POBinCond extends Expressio
}
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
- return ((Boolean)res.result) == true ? lhs.getNext(m) : rhs.getNext(m);
+ Result result = ((Boolean)res.result) == true ? lhs.getNext(m) : rhs.getNext(m);
+ illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+ return result;
}
@Override
@@ -234,7 +251,9 @@ public class POBinCond extends Expressio
}
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
- return ((Boolean)res.result) == true ? lhs.getNext(s) : rhs.getNext(s);
+ Result result = ((Boolean)res.result) == true ? lhs.getNext(s) : rhs.getNext(s);
+ illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+ return result;
}
@Override
@@ -255,7 +274,9 @@ public class POBinCond extends Expressio
}
Result res = cond.getNext(dummyBool);
if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
- return ((Boolean)res.result) == true ? lhs.getNext(t) : rhs.getNext(t);
+ Result result = ((Boolean)res.result) == true ? lhs.getNext(t) : rhs.getNext(t);
+ illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+ return result;
}
@Override
@@ -338,4 +359,11 @@ public class POBinCond extends Expressio
return child;
}
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null) {
+
+ }
+ return null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Mon Dec 13 19:11:00 2010
@@ -1327,4 +1327,8 @@ public class POCast extends ExpressionOp
return funcSpec;
}
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return (Tuple) out;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java Mon Dec 13 19:11:00 2010
@@ -76,6 +76,7 @@ public class POIsNull extends UnaryCompa
} else {
res.result = false;
}
+ illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
}
return res;
case DataType.DOUBLE:
@@ -86,6 +87,7 @@ public class POIsNull extends UnaryCompa
} else {
res.result = false;
}
+ illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
}
return res;
case DataType.INTEGER:
@@ -96,6 +98,7 @@ public class POIsNull extends UnaryCompa
} else {
res.result = false;
}
+ illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
}
return res;
case DataType.CHARARRAY:
@@ -106,6 +109,7 @@ public class POIsNull extends UnaryCompa
} else {
res.result = false;
}
+ illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
}
return res;
case DataType.BOOLEAN:
@@ -116,6 +120,7 @@ public class POIsNull extends UnaryCompa
} else {
res.result = false;
}
+ illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
}
return res;
case DataType.LONG:
@@ -126,6 +131,7 @@ public class POIsNull extends UnaryCompa
} else {
res.result = false;
}
+ illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
}
return res;
case DataType.FLOAT:
@@ -136,6 +142,7 @@ public class POIsNull extends UnaryCompa
} else {
res.result = false;
}
+ illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
}
return res;
case DataType.MAP:
@@ -146,6 +153,7 @@ public class POIsNull extends UnaryCompa
} else {
res.result = false;
}
+ illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
}
return res;
case DataType.TUPLE:
@@ -156,6 +164,7 @@ public class POIsNull extends UnaryCompa
} else {
res.result = false;
}
+ illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
}
return res;
case DataType.BAG:
@@ -166,6 +175,7 @@ public class POIsNull extends UnaryCompa
} else {
res.result = false;
}
+ illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
}
return res;
default: {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java Mon Dec 13 19:11:00 2010
@@ -165,6 +165,8 @@ public class POMapLookUp extends Express
return null;
}
-
-
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return (Tuple) out;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java Mon Dec 13 19:11:00 2010
@@ -22,6 +22,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.VisitorException;
@@ -101,4 +102,9 @@ public class PONegative extends UnaryExp
clone.cloneHelper(this);
return clone;
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return (Tuple) out;
+ }
}