You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/23 21:10:34 UTC
svn commit: r988256 [1/3] - in /hadoop/pig/trunk: src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/lo...
Author: daijy
Date: Mon Aug 23 19:10:32 2010
New Revision: 988256
URL: http://svn.apache.org/viewvc?rev=988256&view=rev
Log:
PIG-1178: LogicalPlan and Optimizer are too complex and hard to work with (PIG-1178-7.patch)
Added:
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/LoadTypeCastInserter.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/StreamTypeCastInserter.java
Removed:
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune2.java
Modified:
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/CastExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/AddForEach.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
hadoop/pig/trunk/test/newlogicalplan-tests
hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanOperatorPlan.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Mon Aug 23 19:10:32 2010
@@ -884,7 +884,7 @@ public class PigServer {
}
PhysicalPlan pp = compilePp(lp);
lp.explain(lps, format, verbose);
- if( pigContext.getProperties().getProperty("pig.usenewlogicalplan", "false").equals("true") ) {
+ if( pigContext.getProperties().getProperty("pig.usenewlogicalplan", "true").equals("true") ) {
LogicalPlanMigrationVistor migrator = new LogicalPlanMigrationVistor(lp);
migrator.visit();
org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migrator.getNewLogicalPlan();
@@ -1269,7 +1269,7 @@ public class PigServer {
validate(lp, collector, isBeforeOptimizer);
// optimize
- if (optimize && pigContext.getProperties().getProperty("pig.usenewlogicalplan", "false").equals("false")) {
+ if (optimize && pigContext.getProperties().getProperty("pig.usenewlogicalplan", "true").equals("false")) {
HashSet<String> optimizerRules = null;
try {
optimizerRules = (HashSet<String>) ObjectSerializer
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon Aug 23 19:10:32 2010
@@ -234,12 +234,13 @@ public class HExecutionEngine {
}
try {
- if (getConfiguration().getProperty("pig.usenewlogicalplan", "false").equals("true")) {
+ if (getConfiguration().getProperty("pig.usenewlogicalplan", "true").equals("true")) {
log.info("pig.usenewlogicalplan is set to true. New logical plan will be used.");
// translate old logical plan to new plan
LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(plan);
visitor.visit();
+ visitor.finish();
org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
SchemaResetter schemaResetter = new SchemaResetter(newPlan);
@@ -267,6 +268,7 @@ public class HExecutionEngine {
translator.setPigContext(pigContext);
translator.visit();
+ translator.finish();
return translator.getPhysicalPlan();
}else{
@@ -279,7 +281,7 @@ public class HExecutionEngine {
}
} catch (Exception ve) {
int errCode = 2042;
- String msg = "Internal error. Unable to translate logical plan to physical plan.";
+ String msg = "Error in new logical plan. Try -Dpig.usenewlogicalplan=false.";
throw new ExecException(msg, errCode, PigException.BUG, ve);
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Aug 23 19:10:32 2010
@@ -2715,23 +2715,30 @@ public class MRCompiler extends PhyPlanV
private void fixProjectionAfterLimit(MapReduceOper mro,
MapReduceOper sortMROp) throws PlanException, VisitorException {
- PhysicalOperator op = sortMROp.reducePlan.getLeaves().get(0);
+ PhysicalOperator op = sortMROp.reducePlan.getRoots().get(0);
+ assert(op instanceof POPackage);
+
+ op = sortMROp.reducePlan.getSuccessors(op).get(0);
+ assert(op instanceof POForEach);
while (true) {
- List<PhysicalOperator> preds = sortMROp.reducePlan
- .getPredecessors(op);
- op = preds.get(0);
- if (op instanceof POLimit) break;
+ List<PhysicalOperator> succs = sortMROp.reducePlan
+ .getSuccessors(op);
+ if (succs==null) break;
+ op = succs.get(0);
+ if (op instanceof POForEach) break;
}
while (true) {
- List<PhysicalOperator> succes = sortMROp.reducePlan
+ if (op instanceof POStore) break;
+ PhysicalOperator opToMove = op;
+ List<PhysicalOperator> succs = sortMROp.reducePlan
.getSuccessors(op);
- PhysicalOperator succ = succes.get(0);
- if (succ instanceof POStore) break;
-
- sortMROp.reducePlan.removeAndReconnect(succ);
- mro.reducePlan.addAsLeaf(succ);
+ op = succs.get(0);
+
+ sortMROp.reducePlan.removeAndReconnect(opToMove);
+ mro.reducePlan.addAsLeaf(opToMove);
+
}
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java Mon Aug 23 19:10:32 2010
@@ -20,6 +20,7 @@ package org.apache.pig.newplan.logical;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.pig.impl.logicalLayer.ExpressionOperator;
import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -36,10 +37,12 @@ import org.apache.pig.impl.plan.Dependen
import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.expression.DereferenceExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOInnerLoad;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
@@ -53,10 +56,11 @@ public class ForeachInnerPlanVisitor ext
private org.apache.pig.newplan.logical.relational.LogicalRelationalOperator gen;
private int inputNo;
private HashMap<LogicalOperator, LogicalRelationalOperator> innerOpsMap;
+ private Map<LogicalExpression, LogicalOperator> scalarAliasMap = new HashMap<LogicalExpression, LogicalOperator>();
public ForeachInnerPlanVisitor(org.apache.pig.newplan.logical.relational.LOForEach foreach, LOForEach oldForeach, LogicalPlan innerPlan,
- LogicalPlan oldLogicalPlan) throws FrontendException {
- super(innerPlan, foreach, oldLogicalPlan);
+ LogicalPlan oldLogicalPlan, Map<LogicalExpression, LogicalOperator> scalarMap) throws FrontendException {
+ super(innerPlan, oldForeach, foreach, oldLogicalPlan, scalarMap);
newInnerPlan = foreach.getInnerPlan();
// get next inputNo
@@ -71,6 +75,7 @@ public class ForeachInnerPlanVisitor ext
this.oldForeach = oldForeach;
innerOpsMap = new HashMap<LogicalOperator, LogicalRelationalOperator>();
+ scalarAliasMap = scalarMap;
}
private void translateInnerPlanConnection(LogicalOperator oldOp, org.apache.pig.newplan.Operator newOp) throws FrontendException {
@@ -92,11 +97,11 @@ public class ForeachInnerPlanVisitor ext
}
}
- private LogicalExpressionPlan translateInnerExpressionPlan(LogicalPlan lp, LogicalRelationalOperator op, LogicalPlan outerPlan) throws VisitorException {
+ private LogicalExpressionPlan translateInnerExpressionPlan(LogicalPlan lp, LogicalOperator oldOp, LogicalRelationalOperator op, LogicalPlan outerPlan) throws VisitorException {
PlanWalker<LogicalOperator, LogicalPlan> childWalker =
new DependencyOrderWalker<LogicalOperator, LogicalPlan>(lp);
- LogicalExpPlanMigrationVistor childPlanVisitor = new LogicalExpPlanMigrationVistor(lp, op, outerPlan);
+ LogicalExpPlanMigrationVistor childPlanVisitor = new LogicalExpPlanMigrationVistor(lp, oldOp, op, outerPlan, scalarAliasMap);
childWalker.walk(childPlanVisitor);
return childPlanVisitor.exprPlan;
@@ -186,7 +191,7 @@ public class ForeachInnerPlanVisitor ext
}
for (LogicalPlan sortPlan : sortPlans) {
- LogicalExpressionPlan newSortPlan = translateInnerExpressionPlan(sortPlan, newSort, mPlan);
+ LogicalExpressionPlan newSortPlan = translateInnerExpressionPlan(sortPlan, sort, newSort, mPlan);
newSortPlans.add(newSortPlan);
}
}
@@ -228,7 +233,7 @@ public class ForeachInnerPlanVisitor ext
newFilter.setAlias(filter.getAlias());
newFilter.setRequestedParallelism(filter.getRequestedParallelism());
- LogicalExpressionPlan newFilterPlan = translateInnerExpressionPlan(filter.getComparisonPlan(), newFilter, mPlan);
+ LogicalExpressionPlan newFilterPlan = translateInnerExpressionPlan(filter.getComparisonPlan(), filter, newFilter, mPlan);
newFilter.setFilterPlan(newFilterPlan);
newInnerPlan.add(newFilter);
innerOpsMap.put(filter, newFilter);
@@ -238,4 +243,43 @@ public class ForeachInnerPlanVisitor ext
throw new VisitorException(e);
}
}
+
+ public void visit(LOForEach foreach) throws VisitorException {
+ org.apache.pig.newplan.logical.relational.LOForEach newForEach =
+ new org.apache.pig.newplan.logical.relational.LOForEach(newInnerPlan);
+
+ newForEach.setAlias(foreach.getAlias());
+ newForEach.setRequestedParallelism(foreach.getRequestedParallelism());
+
+ org.apache.pig.newplan.logical.relational.LogicalPlan newForEachInnerPlan
+ = new org.apache.pig.newplan.logical.relational.LogicalPlan();
+ newForEach.setInnerPlan(newForEachInnerPlan);
+ List<LogicalExpressionPlan> expPlans = new ArrayList<LogicalExpressionPlan>();
+ boolean[] flattens = new boolean[foreach.getForEachPlans().size()];
+ LOGenerate generate = new LOGenerate(newForEachInnerPlan, expPlans, flattens);
+ newForEachInnerPlan.add(generate);
+
+ for (int i=0;i<foreach.getForEachPlans().size();i++) {
+ LogicalPlan innerPlan = foreach.getForEachPlans().get(i);
+ // Assume only one project is allowed in this level of foreach
+ LOProject project = (LOProject)innerPlan.iterator().next();
+
+ LOInnerLoad innerLoad = new LOInnerLoad(newForEachInnerPlan,
+ newForEach, project.isStar()?-1:project.getCol());
+ newForEachInnerPlan.add(innerLoad);
+ newForEachInnerPlan.connect(innerLoad, generate);
+ LogicalExpressionPlan expPlan = new LogicalExpressionPlan();
+ expPlans.add(expPlan);
+ ProjectExpression pe = new ProjectExpression(expPlan, i, -1, generate);
+ expPlan.add(pe);
+ }
+
+ newInnerPlan.add(newForEach);
+ innerOpsMap.put(foreach, newForEach);
+ try {
+ translateInnerPlanConnection(foreach, newForEach);
+ } catch (FrontendException e) {
+ throw new VisitorException(e);
+ }
+ }
}
\ No newline at end of file
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java Mon Aug 23 19:10:32 2010
@@ -20,6 +20,7 @@ package org.apache.pig.newplan.logical;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.pig.impl.logicalLayer.ExpressionOperator;
import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -44,6 +45,8 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.LOOr;
import org.apache.pig.impl.logicalLayer.LOProject;
import org.apache.pig.impl.logicalLayer.LORegexp;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
import org.apache.pig.impl.logicalLayer.LOSubtract;
import org.apache.pig.impl.logicalLayer.LOUserFunc;
import org.apache.pig.impl.logicalLayer.LOVisitor;
@@ -86,14 +89,19 @@ public class LogicalExpPlanMigrationVist
protected org.apache.pig.newplan.logical.expression.LogicalExpressionPlan exprPlan;
protected HashMap<LogicalOperator, LogicalExpression> exprOpsMap;
protected LogicalRelationalOperator attachedRelationalOp;
+ protected LogicalOperator oldAttachedRelationalOp;
protected LogicalPlan outerPlan;
+ protected Map<LogicalExpression, LogicalOperator> scalarAliasMap = new HashMap<LogicalExpression, LogicalOperator>();
- public LogicalExpPlanMigrationVistor(LogicalPlan expressionPlan, LogicalRelationalOperator attachedOperator, LogicalPlan outerPlan) {
+ public LogicalExpPlanMigrationVistor(LogicalPlan expressionPlan, LogicalOperator oldAttachedOperator,
+ LogicalRelationalOperator attachedOperator, LogicalPlan outerPlan, Map<LogicalExpression, LogicalOperator> scalarMap) {
super(expressionPlan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(expressionPlan));
exprPlan = new org.apache.pig.newplan.logical.expression.LogicalExpressionPlan();
exprOpsMap = new HashMap<LogicalOperator, LogicalExpression>();
attachedRelationalOp = attachedOperator;
+ oldAttachedRelationalOp = oldAttachedOperator;
this.outerPlan = outerPlan;
+ scalarAliasMap = scalarMap;
}
private void translateConnection(LogicalOperator oldOp, org.apache.pig.newplan.Operator newOp) {
@@ -122,8 +130,14 @@ public class LogicalExpPlanMigrationVist
}
else {
LogicalOperator lg = project.getExpression();
- LogicalOperator succed = outerPlan.getSuccessors(lg).get(0);
- int input = outerPlan.getPredecessors(succed).indexOf(lg);
+ int input;
+ if (oldAttachedRelationalOp instanceof LOSplitOutput) {
+ LOSplit split = (LOSplit)outerPlan.getPredecessors(oldAttachedRelationalOp).get(0);
+ input = outerPlan.getPredecessors(split).indexOf(lg);
+ }
+ else {
+ input = outerPlan.getPredecessors(oldAttachedRelationalOp).indexOf(lg);
+ }
pe = new ProjectExpression(exprPlan, input, project.isStar()?-1:col, attachedRelationalOp);
}
@@ -200,6 +214,11 @@ public class LogicalExpPlanMigrationVist
}
exprOpsMap.put(op, exp);
+ // We need to track all the scalars
+ if(op.getImplicitReferencedOperator() != null) {
+ scalarAliasMap.put(exp, op.getImplicitReferencedOperator());
+ }
+
}
public void visit(LOBinCond op) throws VisitorException {
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java Mon Aug 23 19:10:32 2010
@@ -20,6 +20,7 @@ package org.apache.pig.newplan.logical;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -56,7 +57,9 @@ import org.apache.pig.newplan.logical.re
*/
public class LogicalPlanMigrationVistor extends LOVisitor {
private org.apache.pig.newplan.logical.relational.LogicalPlan logicalPlan;
- private HashMap<LogicalOperator, LogicalRelationalOperator> opsMap;
+ private Map<LogicalOperator, LogicalRelationalOperator> opsMap;
+ private Map<org.apache.pig.newplan.logical.expression.LogicalExpression, LogicalOperator> scalarAliasMap =
+ new HashMap<org.apache.pig.newplan.logical.expression.LogicalExpression, LogicalOperator>();
public LogicalPlanMigrationVistor(LogicalPlan plan) {
super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
@@ -75,11 +78,11 @@ public class LogicalPlanMigrationVistor
}
}
- private LogicalExpressionPlan translateExpressionPlan(LogicalPlan lp, LogicalRelationalOperator op) throws VisitorException {
+ private LogicalExpressionPlan translateExpressionPlan(LogicalPlan lp, LogicalOperator oldOp, LogicalRelationalOperator op) throws VisitorException {
PlanWalker<LogicalOperator, LogicalPlan> childWalker =
new DependencyOrderWalker<LogicalOperator, LogicalPlan>(lp);
- LogicalExpPlanMigrationVistor childPlanVisitor = new LogicalExpPlanMigrationVistor(lp, op, mPlan);
+ LogicalExpPlanMigrationVistor childPlanVisitor = new LogicalExpPlanMigrationVistor(lp, oldOp, op, mPlan, scalarAliasMap);
childWalker.walk(childPlanVisitor);
return childPlanVisitor.exprPlan;
@@ -95,6 +98,8 @@ public class LogicalPlanMigrationVistor
org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE grouptype;
if( cg.getGroupType() == GROUPTYPE.COLLECTED ) {
grouptype = org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE.COLLECTED;
+ } else if (cg.getGroupType() == GROUPTYPE.MERGE ){
+ grouptype = org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE.MERGE;
} else {
grouptype = org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE.REGULAR;
}
@@ -113,12 +118,14 @@ public class LogicalPlanMigrationVistor
ArrayList<LogicalPlan> plans =
(ArrayList<LogicalPlan>) cg.getGroupByPlans().get(inputs.get(i));
for( LogicalPlan plan : plans ) {
- LogicalExpressionPlan expPlan = translateExpressionPlan(plan, newCogroup);
+ LogicalExpressionPlan expPlan = translateExpressionPlan(plan, cg, newCogroup);
newExpressionPlans.put(Integer.valueOf(i), expPlan);
}
}
newCogroup.setAlias(cg.getAlias());
+ newCogroup.setRequestedParallelism(cg.getRequestedParallelism());
+ newCogroup.setCustomPartitioner(cg.getCustomPartitioner());
logicalPlan.add(newCogroup);
opsMap.put(cg, newCogroup);
@@ -154,12 +161,13 @@ public class LogicalPlanMigrationVistor
for (int i=0; i<inputs.size(); i++) {
List<LogicalPlan> plans = (List<LogicalPlan>) loj.getJoinPlans().get(inputs.get(i));
for (LogicalPlan lp : plans) {
- joinPlans.put(i, translateExpressionPlan(lp, join));
+ joinPlans.put(i, translateExpressionPlan(lp, loj, join));
}
}
join.setAlias(loj.getAlias());
join.setRequestedParallelism(loj.getRequestedParallelism());
+ join.setCustomPartitioner(join.getCustomPartitioner());
logicalPlan.add(join);
opsMap.put(loj, join);
@@ -173,6 +181,7 @@ public class LogicalPlanMigrationVistor
newCross.setAlias(cross.getAlias());
newCross.setRequestedParallelism(cross.getRequestedParallelism());
+ newCross.setCustomPartitioner(cross.getCustomPartitioner());
logicalPlan.add(newCross);
opsMap.put(cross, newCross);
@@ -205,7 +214,7 @@ public class LogicalPlanMigrationVistor
try {
for(int i=0; i<ll.size(); i++) {
LogicalPlan lp = ll.get(i);
- ForeachInnerPlanVisitor v = new ForeachInnerPlanVisitor(newForeach, forEach, lp, mPlan);
+ ForeachInnerPlanVisitor v = new ForeachInnerPlanVisitor(newForeach, forEach, lp, mPlan, scalarAliasMap);
v.visit();
expPlans.add(v.exprPlan);
@@ -231,7 +240,7 @@ public class LogicalPlanMigrationVistor
newSortPlans, sort.getAscendingCols(), sort.getUserFunc());
for (LogicalPlan sortPlan : sortPlans) {
- LogicalExpressionPlan newSortPlan = translateExpressionPlan(sortPlan, newSort);
+ LogicalExpressionPlan newSortPlan = translateExpressionPlan(sortPlan, sort, newSort);
newSortPlans.add(newSortPlan);
}
@@ -256,9 +265,17 @@ public class LogicalPlanMigrationVistor
}
public void visit(LOStream stream) throws VisitorException {
+
+ LogicalSchema s;
+ try {
+ s = Util.translateSchema(stream.getSchema());
+ }catch(Exception e) {
+ throw new VisitorException("Failed to translate schema.", e);
+ }
+
org.apache.pig.newplan.logical.relational.LOStream newStream =
new org.apache.pig.newplan.logical.relational.LOStream(logicalPlan,
- stream.getExecutableManager(), stream.getStreamingCommand());
+ stream.getExecutableManager(), stream.getStreamingCommand(), s);
newStream.setAlias(stream.getAlias());
newStream.setRequestedParallelism(stream.getRequestedParallelism());
@@ -272,7 +289,7 @@ public class LogicalPlanMigrationVistor
org.apache.pig.newplan.logical.relational.LOFilter newFilter = new org.apache.pig.newplan.logical.relational.LOFilter(logicalPlan);
LogicalPlan filterPlan = filter.getComparisonPlan();
- LogicalExpressionPlan newFilterPlan = translateExpressionPlan(filterPlan, newFilter);
+ LogicalExpressionPlan newFilterPlan = translateExpressionPlan(filterPlan, filter, newFilter);
newFilter.setFilterPlan(newFilterPlan);
newFilter.setAlias(filter.getAlias());
@@ -327,6 +344,10 @@ public class LogicalPlanMigrationVistor
newStore.setAlias(store.getAlias());
newStore.setRequestedParallelism(store.getRequestedParallelism());
+ newStore.setSignature(store.getSignature());
+ newStore.setInputSpec(store.getInputSpec());
+ newStore.setSortInfo(store.getSortInfo());
+ newStore.setTmpStore(store.isTmpStore());
logicalPlan.add(newStore);
opsMap.put(store, newStore);
@@ -349,7 +370,7 @@ public class LogicalPlanMigrationVistor
new org.apache.pig.newplan.logical.relational.LOSplitOutput(logicalPlan);
LogicalPlan filterPlan = splitOutput.getConditionPlan();
- LogicalExpressionPlan newFilterPlan = translateExpressionPlan(filterPlan, newSplitOutput);
+ LogicalExpressionPlan newFilterPlan = translateExpressionPlan(filterPlan, splitOutput, newSplitOutput);
newSplitOutput.setFilterPlan(newFilterPlan);
newSplitOutput.setAlias(splitOutput.getAlias());
@@ -366,11 +387,18 @@ public class LogicalPlanMigrationVistor
newDistinct.setAlias(distinct.getAlias());
newDistinct.setRequestedParallelism(distinct.getRequestedParallelism());
+ newDistinct.setCustomPartitioner(distinct.getCustomPartitioner());
logicalPlan.add(newDistinct);
opsMap.put(distinct, newDistinct);
translateConnection(distinct, newDistinct);
}
+ public void finish() {
+ for(org.apache.pig.newplan.logical.expression.LogicalExpression exp: scalarAliasMap.keySet()) {
+ ((org.apache.pig.newplan.logical.expression.UserFuncExpression)exp).setImplicitReferencedOperator(
+ opsMap.get(scalarAliasMap.get(exp)));
+ }
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java Mon Aug 23 19:10:32 2010
@@ -64,6 +64,13 @@ public class Util {
return newFs;
}
+ /**
+ * This function translates the new LogicalSchema into old Schema format required
+ * by PhysicalOperators
+ * @param schema LogicalSchema to be converted to Schema
+ * @return Schema that is converted from LogicalSchema
+ * @throws FrontendException
+ */
public static Schema translateSchema(LogicalSchema schema) {
if (schema == null) {
return null;
@@ -75,6 +82,7 @@ public class Util {
Schema.FieldSchema f2 = null;
try {
f2 = new Schema.FieldSchema(f.alias, translateSchema(f.schema), f.type);
+ f2.canonicalName = ((Long)f.uid).toString();
s2.add(f2);
} catch (FrontendException e) {
}
@@ -99,14 +107,14 @@ public class Util {
return newFs;
}
- public static LOForEach addForEachAfter(LogicalPlan plan, LogicalRelationalOperator op,
+ public static LOForEach addForEachAfter(LogicalPlan plan, LogicalRelationalOperator op, int branch,
Set<Integer> columnsToDrop) throws FrontendException {
LOForEach foreach = new LOForEach(plan);
plan.add(foreach);
List<Operator> next = plan.getSuccessors(op);
if (next != null) {
- LogicalRelationalOperator nextOp = (LogicalRelationalOperator)next.get(0);
+ LogicalRelationalOperator nextOp = (LogicalRelationalOperator)next.get(branch);
Pair<Integer, Integer> pos = plan.disconnect(op, nextOp);
plan.connect(foreach, pos.first, nextOp, pos.second);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/CastExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/CastExpression.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/CastExpression.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/CastExpression.java Mon Aug 23 19:10:32 2010
@@ -76,7 +76,10 @@ public class CastExpression extends Unar
uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
// Bring back the top level uid, this is not changed
LogicalExpression exp = (LogicalExpression)plan.getSuccessors(this).get(0);
- fieldSchema.uid = exp.getFieldSchema().uid;
+ if (exp.getFieldSchema()!=null) {
+ fieldSchema.uid = exp.getFieldSchema().uid;
+ fieldSchema.alias = exp.getFieldSchema().alias;
+ }
return fieldSchema;
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java Mon Aug 23 19:10:32 2010
@@ -127,16 +127,29 @@ public class DereferenceExpression exten
LogicalExpression successor = (LogicalExpression)plan.getSuccessors(this).get(0);
LogicalFieldSchema predFS = successor.getFieldSchema();
if (predFS!=null) {
- LogicalSchema innerSchema = new LogicalSchema();
if (columns.size()>1 || predFS.type==DataType.BAG) {
- for (int column:columns) {
- innerSchema.addField(predFS.schema.getField(column));
+ LogicalSchema innerSchema = null;
+ if (predFS.schema!=null) {
+ innerSchema = new LogicalSchema();
+ LogicalSchema realSchema;
+ if (predFS.schema.isTwoLevelAccessRequired()) {
+ realSchema = predFS.schema.getField(0).schema;
+ }
+ else {
+ realSchema = predFS.schema;
+ }
+ if (realSchema!=null) {
+ for (int column:columns) {
+ innerSchema.addField(realSchema.getField(column));
+ }
+ }
}
fieldSchema = new LogicalSchema.LogicalFieldSchema(null, innerSchema, predFS.type,
LogicalExpression.getNextUid());
}
else { // Dereference a field out of a tuple
- fieldSchema = predFS.schema.getField(columns.get(0));
+ if (predFS.schema!=null)
+ fieldSchema = predFS.schema.getField(columns.get(0));
}
}
return fieldSchema;
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Mon Aug 23 19:10:32 2010
@@ -78,21 +78,21 @@ public class ExpToPhyTranslationVisitor
// This value points to the current LogicalRelationalOperator we are working on
protected LogicalRelationalOperator currentOp;
+ protected Map<PhysicalOperator, LogicalRelationalOperator> scalarAliasMap;
- public ExpToPhyTranslationVisitor(OperatorPlan plan, LogicalRelationalOperator op, PhysicalPlan phyPlan, Map<Operator, PhysicalOperator> map) throws FrontendException {
- super(plan, new DependencyOrderWalker(plan));
- currentOp = op;
- logToPhyMap = map;
- currentPlan = phyPlan;
- currentPlans = new Stack<PhysicalPlan>();
+ public ExpToPhyTranslationVisitor(OperatorPlan plan, LogicalRelationalOperator op, PhysicalPlan phyPlan,
+ Map<Operator, PhysicalOperator> map, Map<PhysicalOperator, LogicalRelationalOperator> scalarMap) throws FrontendException {
+ this(plan, new DependencyOrderWalker(plan), op, phyPlan, map, scalarMap);
}
- public ExpToPhyTranslationVisitor(OperatorPlan plan, PlanWalker walker, LogicalRelationalOperator op, PhysicalPlan phyPlan, Map<Operator, PhysicalOperator> map) throws FrontendException {
+ public ExpToPhyTranslationVisitor(OperatorPlan plan, PlanWalker walker, LogicalRelationalOperator op, PhysicalPlan phyPlan, Map<Operator, PhysicalOperator> map,
+ Map<PhysicalOperator, LogicalRelationalOperator> scalarMap) throws FrontendException {
super(plan, walker);
currentOp = op;
logToPhyMap = map;
currentPlan = phyPlan;
currentPlans = new Stack<PhysicalPlan>();
+ scalarAliasMap = scalarMap;
}
protected Map<Operator, PhysicalOperator> logToPhyMap;
@@ -374,7 +374,7 @@ public class ExpToPhyTranslationVisitor
.getExpression());
pIsNull.setExpr(from);
pIsNull.setResultType(op.getType());
- pIsNull.setOperandType(op.getType());
+ pIsNull.setOperandType(op.getExpression().getType());
try {
currentPlan.connect(from, pIsNull);
} catch (PlanException e) {
@@ -394,6 +394,7 @@ public class ExpToPhyTranslationVisitor
logToPhyMap.put(op, pNegative);
ExpressionOperator from = (ExpressionOperator) logToPhyMap.get(op
.getExpression());
+ pNegative.setExpr(from);
pNegative.setResultType(op.getType());
try {
currentPlan.connect(from, pNegative);
@@ -507,6 +508,11 @@ public class ExpToPhyTranslationVisitor
}
}
logToPhyMap.put(op, p);
+
+ //We need to track all the scalars
+ if(op.getImplicitReferencedOperator() != null) {
+ scalarAliasMap.put(p, (LogicalRelationalOperator)op.getImplicitReferencedOperator());
+ }
}
@Override
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java Mon Aug 23 19:10:32 2010
@@ -123,39 +123,42 @@ public class ProjectExpression extends C
LogicalSchema schema = referent.getSchema();
- if (schema == null) {
- fieldSchema = new LogicalSchema.LogicalFieldSchema(null, null, DataType.BYTEARRAY);
- uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
- }
- else {
- if (attachedRelationalOp instanceof LOGenerate && plan.getSuccessors(this)==null) {
- if (!(findReferent() instanceof LOInnerLoad)||
- ((LOInnerLoad)findReferent()).sourceIsBag()) {
- String alias = findReferent().getAlias();
- List<LOInnerLoad> innerLoads = LOForEach.findReacheableInnerLoadFromBoundaryProject(this);
-
- // pull tuple information from innerload
- if (innerLoads.get(0).getProjection().getFieldSchema().schema.isTwoLevelAccessRequired()) {
- LogicalFieldSchema originalTupleFieldSchema = innerLoads.get(0).getProjection().getFieldSchema().schema.getField(0);
- LogicalFieldSchema newTupleFieldSchema = new LogicalFieldSchema(originalTupleFieldSchema.alias,
- schema, DataType.TUPLE);
- newTupleFieldSchema.uid = originalTupleFieldSchema.uid;
- LogicalSchema newTupleSchema = new LogicalSchema();
- newTupleSchema.setTwoLevelAccessRequired(true);
- newTupleSchema.addField(newTupleFieldSchema);
- fieldSchema = new LogicalSchema.LogicalFieldSchema(alias, newTupleSchema, DataType.BAG);
- }
- else {
- fieldSchema = new LogicalSchema.LogicalFieldSchema(alias, schema, DataType.BAG);
- }
- fieldSchema.uid = innerLoads.get(0).getProjection().getFieldSchema().uid;
+ if (attachedRelationalOp instanceof LOGenerate && plan.getSuccessors(this)==null) {
+ if (!(findReferent() instanceof LOInnerLoad)||
+ ((LOInnerLoad)findReferent()).sourceIsBag()) {
+ String alias = findReferent().getAlias();
+ List<LOInnerLoad> innerLoads = LOForEach.findReacheableInnerLoadFromBoundaryProject(this);
+
+ // pull tuple information from innerload
+ if (innerLoads.get(0).getProjection().getFieldSchema().schema!=null &&
+ innerLoads.get(0).getProjection().getFieldSchema().schema.isTwoLevelAccessRequired()) {
+ LogicalFieldSchema originalTupleFieldSchema = innerLoads.get(0).getProjection().getFieldSchema().schema.getField(0);
+ LogicalFieldSchema newTupleFieldSchema = new LogicalFieldSchema(originalTupleFieldSchema.alias,
+ schema, DataType.TUPLE);
+ newTupleFieldSchema.uid = originalTupleFieldSchema.uid;
+ LogicalSchema newTupleSchema = new LogicalSchema();
+ newTupleSchema.setTwoLevelAccessRequired(true);
+ newTupleSchema.addField(newTupleFieldSchema);
+ fieldSchema = new LogicalSchema.LogicalFieldSchema(alias, newTupleSchema, DataType.BAG);
}
else {
- fieldSchema = findReferent().getSchema().getField(0);
+ fieldSchema = new LogicalSchema.LogicalFieldSchema(alias, schema, DataType.BAG);
}
- uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
+ fieldSchema.uid = innerLoads.get(0).getProjection().getFieldSchema().uid;
}
else {
+ if (findReferent().getSchema()!=null)
+ fieldSchema = findReferent().getSchema().getField(0);
+ }
+ if (fieldSchema!=null)
+ uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
+ }
+ else {
+ if (schema == null) {
+ fieldSchema = new LogicalSchema.LogicalFieldSchema(null, null, DataType.BYTEARRAY);
+ uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
+ }
+ else {
int index = -1;
if (!isProjectStar() && uidOnlyFieldSchema!=null) {
long uid = uidOnlyFieldSchema.uid;
@@ -170,11 +173,17 @@ public class ProjectExpression extends C
index = col;
if (!isProjectStar()) {
- fieldSchema = schema.getField(index);
+ if (schema!=null && schema.size()>index)
+ fieldSchema = schema.getField(index);
+ else
+ fieldSchema = new LogicalSchema.LogicalFieldSchema(null, null, DataType.BYTEARRAY);
uidOnlyFieldSchema = fieldSchema.cloneUid();
}
else {
- fieldSchema = new LogicalSchema.LogicalFieldSchema(null, schema.deepCopy(), DataType.TUPLE);
+ LogicalSchema newTupleSchema = null;
+ if (schema!=null)
+ newTupleSchema = schema.deepCopy();
+ fieldSchema = new LogicalSchema.LogicalFieldSchema(null, newTupleSchema, DataType.TUPLE);
uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
}
}
@@ -194,7 +203,7 @@ public class ProjectExpression extends C
if (preds == null || input >= preds.size()) {
throw new FrontendException("Projection with nothing to reference!", 2225);
}
-
+
LogicalRelationalOperator pred =
(LogicalRelationalOperator)preds.get(input);
if (pred == null) {
@@ -215,6 +224,8 @@ public class ProjectExpression extends C
public String toString() {
StringBuilder msg = new StringBuilder();
+ if (fieldSchema!=null && fieldSchema.alias!=null)
+ msg.append(fieldSchema.alias+":");
msg.append("(Name: " + name + " Type: ");
if (fieldSchema!=null)
msg.append(DataType.findTypeName(fieldSchema.type));
@@ -242,4 +253,18 @@ public class ProjectExpression extends C
public void setAttachedRelationalOp(LogicalRelationalOperator attachedRelationalOp) {
this.attachedRelationalOp = attachedRelationalOp;
}
+
+ @Override
+ public byte getType() throws FrontendException {
+ // for boundary project, if
+ if (getFieldSchema()==null) {
+ if (attachedRelationalOp instanceof LOGenerate && findReferent() instanceof
+ LOInnerLoad) {
+ if (((LOInnerLoad)findReferent()).getProjection().getColNum()==-1)
+ return DataType.TUPLE;
+ }
+ return DataType.BYTEARRAY;
+ }
+ return super.getType();
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java Mon Aug 23 19:10:32 2010
@@ -36,6 +36,7 @@ import org.apache.pig.newplan.logical.re
public class UserFuncExpression extends LogicalExpression {
private FuncSpec mFuncSpec;
+ private Operator implicitReferencedOperator = null;
public UserFuncExpression(OperatorPlan plan, FuncSpec funcSpec) {
super("UserFunc", plan);
@@ -97,8 +98,14 @@ public class UserFuncExpression extends
LogicalSchema inputSchema = new LogicalSchema();
List<Operator> succs = plan.getSuccessors(this);
- for(Operator lo : succs){
- inputSchema.addField(((LogicalExpression)lo).getFieldSchema());
+ if (succs!=null) {
+ for(Operator lo : succs){
+ if (((LogicalExpression)lo).getFieldSchema()==null) {
+ inputSchema = null;
+ break;
+ }
+ inputSchema.addField(((LogicalExpression)lo).getFieldSchema());
+ }
}
EvalFunc<?> ef = (EvalFunc<?>) PigContext.instantiateFuncFromSpec(mFuncSpec);
@@ -120,4 +127,12 @@ public class UserFuncExpression extends
uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
return fieldSchema;
}
+
+ public Operator getImplicitReferencedOperator() {
+ return implicitReferencedOperator;
+ }
+
+ public void setImplicitReferencedOperator(Operator implicitReferencedOperator) {
+ this.implicitReferencedOperator = implicitReferencedOperator;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java Mon Aug 23 19:10:32 2010
@@ -74,7 +74,7 @@ public abstract class AllExpressionVisit
@Override
public void visit(LOJoin join) throws FrontendException {
currentOp = join;
- Collection<LogicalExpressionPlan> c = join.getExpressionPlans();
+ Collection<LogicalExpressionPlan> c = join.getExpressionPlanValues();
for (LogicalExpressionPlan plan : c) {
LogicalExpressionVisitor v = getVisitor(plan);
v.visit();
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Mon Aug 23 19:10:32 2010
@@ -22,15 +22,18 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.pig.impl.logicalLayer.LOLoad;
import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOStream;
import org.apache.pig.newplan.logical.rules.AddForEach;
import org.apache.pig.newplan.logical.rules.ColumnMapKeyPrune;
import org.apache.pig.newplan.logical.rules.FilterAboveForeach;
import org.apache.pig.newplan.logical.rules.ImplicitSplitInserter;
+import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
import org.apache.pig.newplan.logical.rules.MergeFilter;
import org.apache.pig.newplan.logical.rules.PushUpFilter;
import org.apache.pig.newplan.logical.rules.SplitFilter;
+import org.apache.pig.newplan.logical.rules.StreamTypeCastInserter;
import org.apache.pig.newplan.logical.rules.TypeCastInserter;
import org.apache.pig.newplan.optimizer.PlanOptimizer;
import org.apache.pig.newplan.optimizer.Rule;
@@ -61,7 +64,9 @@ public class LogicalPlanOptimizer extend
// This set of rules Insert Foreach dedicated for casting after load
s = new HashSet<Rule>();
// add split filter rule
- r = new TypeCastInserter("TypeCastInserter", LOLoad.class.getName());
+ r = new LoadTypeCastInserter("LoadTypeCastInserter");
+ checkAndAddRule(s, r);
+ r = new StreamTypeCastInserter("StreamTypeCastInserter");
checkAndAddRule(s, r);
if (!s.isEmpty())
ls.add(s);
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java Mon Aug 23 19:10:32 2010
@@ -42,6 +42,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOStream;
import org.apache.pig.newplan.logical.relational.LOUnion;
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
@@ -107,7 +108,7 @@ public class LogicalPlanPrinter extends
LogicalExpressionVisitor v = null;
level++;
- for (LogicalExpressionPlan plan : op.getExpressionPlans()) {
+ for (LogicalExpressionPlan plan : op.getExpressionPlanValues()) {
v = getVisitor(plan);
v.visit();
}
@@ -208,6 +209,12 @@ public class LogicalPlanPrinter extends
stream.println( op.toString() );
}
+ @Override
+ public void visit(LOStream op) throws FrontendException {
+ printLevel();
+ stream.println( op.toString() );
+ }
+
public String toString() {
return stream.toString();
}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java Mon Aug 23 19:10:32 2010
@@ -8,6 +8,7 @@ import org.apache.pig.impl.util.MultiMap
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanWalker;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
import org.apache.pig.newplan.logical.expression.AllSameExpressionVisitor;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
@@ -58,7 +59,7 @@ public class SchemaResetter extends Logi
@Override
public void visit(LOJoin join) throws FrontendException {
join.resetSchema();
- Collection<LogicalExpressionPlan> joinPlans = join.getExpressionPlans();
+ Collection<LogicalExpressionPlan> joinPlans = join.getExpressionPlanValues();
for (LogicalExpressionPlan joinPlan : joinPlans) {
FieldSchemaResetter fsResetter = new FieldSchemaResetter(joinPlan);
fsResetter.visit();
@@ -165,7 +166,7 @@ public class SchemaResetter extends Logi
class FieldSchemaResetter extends AllSameExpressionVisitor {
protected FieldSchemaResetter(OperatorPlan p) throws FrontendException {
- super(p, new DependencyOrderWalker(p));
+ super(p, new ReverseDependencyOrderWalker(p));
}
@Override
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java Mon Aug 23 19:10:32 2010
@@ -49,7 +49,8 @@ public class LOCogroup extends LogicalRe
*/
public static enum GROUPTYPE {
REGULAR, // Regular (co)group
- COLLECTED // Collected group
+ COLLECTED, // Collected group
+ MERGE // Map-side CoGroup on sorted data
};
private GROUPTYPE mGroupType;
@@ -175,11 +176,6 @@ public class LOCogroup extends LogicalRe
int counter = 0;
for (Operator op : inputs) {
LogicalSchema inputSchema = ((LogicalRelationalOperator)op).getSchema();
- // the schema of one input is unknown, so the join schema is unknown, just return
- if (inputSchema == null) {
- schema = null;
- return schema;
- }
// Check if we already have calculated Uid for this bag for given
// input operator
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java Mon Aug 23 19:10:32 2010
@@ -33,10 +33,14 @@ public class LODistinct extends LogicalR
@Override
public LogicalSchema getSchema() throws FrontendException {
+ if (schema!=null)
+ return schema;
+
LogicalRelationalOperator input = null;
input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
- return input.getSchema();
+ schema = input.getSchema();
+ return schema;
}
@Override
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java Mon Aug 23 19:10:32 2010
@@ -46,9 +46,14 @@ public class LOFilter extends LogicalRel
@Override
public LogicalSchema getSchema() throws FrontendException {
+ if (schema!=null)
+ return schema;
+
LogicalRelationalOperator input = null;
input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
- return input.getSchema();
+
+ schema = input.getSchema();
+ return schema;
}
@Override
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Mon Aug 23 19:10:32 2010
@@ -51,6 +51,10 @@ public class LOGenerate extends LogicalR
LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSources().get(0);
LogicalFieldSchema fieldSchema = null;
+ if (exp.getFieldSchema()==null) {
+ schema = null;
+ break;
+ }
fieldSchema = exp.getFieldSchema().deepCopy();
if (fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG) {
@@ -58,19 +62,26 @@ public class LOGenerate extends LogicalR
schema.addField(fieldSchema);
continue;
} else {
+ // if bag/tuple don't have inner schema, after flatten, we don't have schema for the entire operator
+ if (fieldSchema.schema==null) {
+ schema=null;
+ break;
+ }
// if flatten is set, set schema of tuple field to this schema
List<LogicalSchema.LogicalFieldSchema> innerFieldSchemas = new ArrayList<LogicalSchema.LogicalFieldSchema>();
if (flattenFlags[i]) {
if (fieldSchema.type == DataType.BAG) {
// if it is bag of tuples, get the schema of tuples
- if (fieldSchema.schema.isTwoLevelAccessRequired()) {
- // assert(fieldSchema.schema.size() == 1 && fieldSchema.schema.getField(0).type == DataType.TUPLE)
- innerFieldSchemas = fieldSchema.schema.getField(0).schema.getFields();
- } else {
- innerFieldSchemas = fieldSchema.schema.getFields();
- }
- for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
- fs.alias = fieldSchema.alias + "::" + fs.alias;
+ if (fieldSchema.schema!=null) {
+ if (fieldSchema.schema.isTwoLevelAccessRequired()) {
+ // assert(fieldSchema.schema.size() == 1 && fieldSchema.schema.getField(0).type == DataType.TUPLE)
+ innerFieldSchemas = fieldSchema.schema.getField(0).schema.getFields();
+ } else {
+ innerFieldSchemas = fieldSchema.schema.getFields();
+ }
+ for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
+ fs.alias = fieldSchema.alias + "::" + fs.alias;
+ }
}
} else { // DataType.TUPLE
innerFieldSchemas = fieldSchema.schema.getFields();
@@ -79,7 +90,6 @@ public class LOGenerate extends LogicalR
}
}
-
for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas)
schema.addField(fs);
}
@@ -87,6 +97,8 @@ public class LOGenerate extends LogicalR
schema.addField(fieldSchema);
}
}
+ if (schema!=null && schema.size()==0)
+ schema = null;
return schema;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java Mon Aug 23 19:10:32 2010
@@ -17,6 +17,8 @@
*/
package org.apache.pig.newplan.logical.relational;
+import java.util.Map;
+
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.newplan.Operator;
@@ -53,23 +55,31 @@ public class LOInnerLoad extends Logical
if (schema!=null)
return schema;
- if (prj.getFieldSchema()!=null) {
- schema = new LogicalSchema();
- if (prj.getFieldSchema().type==DataType.BAG && prj.getFieldSchema().schema.isTwoLevelAccessRequired()) {
- LogicalFieldSchema tupleSchema = prj.getFieldSchema().schema.getField(0);
- for (int i=0;i<tupleSchema.schema.size();i++)
- schema.addField(tupleSchema.schema.getField(i));
- sourceIsBag = true;
- alias = prj.getFieldSchema().alias;
- }
- else if (prj.getFieldSchema().type==DataType.BAG){
- for (int i=0;i<prj.getFieldSchema().schema.size();i++)
- schema.addField(prj.getFieldSchema().schema.getField(i));
- sourceIsBag = true;
- alias = prj.getFieldSchema().alias;
- }
- else {
- schema.addField(prj.getFieldSchema());
+ if (prj.findReferent().getSchema()!=null) {
+ if (prj.getFieldSchema()!=null) {
+ if (prj.getFieldSchema().type==DataType.BAG && prj.getFieldSchema().schema!=null &&
+ prj.getFieldSchema().schema.isTwoLevelAccessRequired()) {
+ schema = new LogicalSchema();
+ LogicalFieldSchema tupleSchema = prj.getFieldSchema().schema.getField(0);
+ for (int i=0;i<tupleSchema.schema.size();i++)
+ schema.addField(tupleSchema.schema.getField(i));
+ sourceIsBag = true;
+ alias = prj.getFieldSchema().alias;
+ }
+ else if (prj.getFieldSchema().type==DataType.BAG){
+ sourceIsBag = true;
+ alias = prj.getFieldSchema().alias;
+ if (prj.getFieldSchema().schema!=null) {
+ schema = new LogicalSchema();
+ for (int i=0;i<prj.getFieldSchema().schema.size();i++)
+ schema.addField(prj.getFieldSchema().schema.getField(i));
+ }
+
+ }
+ else {
+ schema = new LogicalSchema();
+ schema.addField(prj.getFieldSchema());
+ }
}
}
return schema;
@@ -111,4 +121,31 @@ public class LOInnerLoad extends Logical
public boolean sourceIsBag() {
return sourceIsBag;
}
+
+ public String toString() {
+ StringBuilder msg = new StringBuilder();
+
+ if (alias!=null) {
+ msg.append(alias + ": ");
+ }
+ msg.append("(Name: " + name);
+ msg.append("[");
+ if (getProjection().getColNum()==-1)
+ msg.append("*");
+ else
+ msg.append(getProjection().getColNum());
+ msg.append("]");
+ msg.append(" Schema: ");
+ if (schema!=null)
+ msg.append(schema);
+ else
+ msg.append("null");
+ msg.append(")");
+ if (annotations!=null) {
+ for (Map.Entry<String, Object> entry : annotations.entrySet()) {
+ msg.append(entry);
+ }
+ }
+ return msg.toString();
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java Mon Aug 23 19:10:32 2010
@@ -90,7 +90,11 @@ public class LOJoin extends LogicalRelat
* Get all of the expressions plans that are in this join.
* @return collection of all expression plans.
*/
- public Collection<LogicalExpressionPlan> getExpressionPlans() {
+ public MultiMap<Integer,LogicalExpressionPlan> getExpressionPlans() {
+ return mJoinPlans;
+ }
+
+ public Collection<LogicalExpressionPlan> getExpressionPlanValues() {
return mJoinPlans.values();
}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java Mon Aug 23 19:10:32 2010
@@ -44,10 +44,14 @@ public class LOLimit extends LogicalRela
@Override
public LogicalSchema getSchema() throws FrontendException {
+ if (schema!=null)
+ return schema;
+
LogicalRelationalOperator input = null;
input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
- return input.getSchema();
+ schema = input.getSchema();
+ return schema;
}
@Override
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java Mon Aug 23 19:10:32 2010
@@ -18,16 +18,22 @@
package org.apache.pig.newplan.logical.relational;
+import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.ResourceSchema;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.Util;
public class LOLoad extends LogicalRelationalOperator {
@@ -56,7 +62,7 @@ public class LOLoad extends LogicalRelat
public LoadFunc getLoadFunc() throws FrontendException {
try {
- if (loadFunc == null) {
+ if (loadFunc == null && fs!=null) {
loadFunc = (LoadFunc)PigContext.instantiateFuncFromSpec(fs.getFuncSpec());
loadFunc.setUDFContextSignature(getAlias());
}
@@ -88,8 +94,8 @@ public class LOLoad extends LogicalRelat
return schema;
LogicalSchema originalSchema = null;
- // TODO get schema from LoaderMetadata interface.
- if (determinedSchema!=null) {
+
+ if (determinedSchema==null) {
determinedSchema = getSchemaFromMetaData();
}
@@ -128,7 +134,16 @@ public class LOLoad extends LogicalRelat
return schema;
}
- private LogicalSchema getSchemaFromMetaData() {
+ private LogicalSchema getSchemaFromMetaData() throws FrontendException {
+ if (getLoadFunc()!=null && getLoadFunc() instanceof LoadMetadata) {
+ try {
+ ResourceSchema resourceSchema = ((LoadMetadata)loadFunc).getSchema(getFileSpec().getFileName(), new Job(conf));
+ Schema oldSchema = Schema.getPigSchema(resourceSchema);
+ return Util.translateSchema(oldSchema);
+ } catch (IOException e) {
+ throw new FrontendException("Cannot get schema from loadFunc " + loadFunc.getClass().getName(), 9999, e);
+ }
+ }
return null;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java Mon Aug 23 19:10:32 2010
@@ -97,9 +97,14 @@ public class LOSort extends LogicalRelat
@Override
public LogicalSchema getSchema() throws FrontendException {
+ if (schema!=null)
+ return schema;
+
LogicalRelationalOperator input = null;
input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
- return input.getSchema();
+
+ schema = input.getSchema();
+ return schema;
}
@Override
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java Mon Aug 23 19:10:32 2010
@@ -31,10 +31,14 @@ public class LOSplit extends LogicalRela
@Override
public LogicalSchema getSchema() throws FrontendException {
+ if (schema!=null)
+ return schema;
+
LogicalRelationalOperator input = null;
input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
- return input.getSchema();
+ schema = input.getSchema();
+ return schema;
}
@Override
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java Mon Aug 23 19:10:32 2010
@@ -44,10 +44,14 @@ public class LOSplitOutput extends Logic
@Override
public LogicalSchema getSchema() throws FrontendException {
+ if (schema!=null)
+ return schema;
+
LogicalRelationalOperator input = null;
input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
- return input.getSchema();
+ schema = input.getSchema();
+ return schema;
}
@Override
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java Mon Aug 23 19:10:32 2010
@@ -19,6 +19,7 @@ package org.apache.pig.newplan.logical.r
//import org.apache.commons.logging.Log;
//import org.apache.commons.logging.LogFactory;
+import org.apache.pig.SortInfo;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
@@ -29,7 +30,15 @@ import org.apache.pig.newplan.PlanVisito
public class LOStore extends LogicalRelationalOperator {
private static final long serialVersionUID = 2L;
- private FileSpec output;
+ private FileSpec output;
+
+ // If we know how to reload the store, here's how. The lFile
+ // FileSpec is set in PigServer.postProcess. It can be used to
+ // reload this store, if the optimizer has the need.
+ private FileSpec mInputSpec;
+ private String signature;
+ private boolean isTmpStore;
+ private SortInfo sortInfo;
transient private StoreFuncInterface storeFunc;
//private static Log log = LogFactory.getLog(LOStore.class);
@@ -85,4 +94,36 @@ public class LOStore extends LogicalRela
return false;
}
}
+
+ public SortInfo getSortInfo() {
+ return sortInfo;
+ }
+
+ public void setSortInfo(SortInfo sortInfo) {
+ this.sortInfo = sortInfo;
+ }
+
+ public boolean isTmpStore() {
+ return isTmpStore;
+ }
+
+ public void setTmpStore(boolean isTmpStore) {
+ this.isTmpStore = isTmpStore;
+ }
+
+ public void setInputSpec(FileSpec in) {
+ mInputSpec = in;
+ }
+
+ public FileSpec getInputSpec() {
+ return mInputSpec;
+ }
+
+ public String getSignature() {
+ return signature;
+ }
+
+ public void setSignature(String sig) {
+ signature = sig;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java Mon Aug 23 19:10:32 2010
@@ -17,6 +17,7 @@
*/
package org.apache.pig.newplan.logical.relational;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.streaming.ExecutableManager;
import org.apache.pig.impl.streaming.StreamingCommand;
@@ -25,6 +26,7 @@ import org.apache.pig.newplan.PlanVisito
public class LOStream extends LogicalRelationalOperator {
+ private LogicalSchema scriptSchema;
private static final long serialVersionUID = 2L;
//private static Log log = LogFactory.getLog(LOFilter.class);
@@ -32,11 +34,14 @@ public class LOStream extends LogicalRel
// Stream Operator this operator represents
private StreamingCommand command;
transient private ExecutableManager executableManager;
+ private LogicalSchema uidOnlySchema;
+ private boolean castInserted = false;
- public LOStream(LogicalPlan plan, ExecutableManager exeManager, StreamingCommand cmd) {
- super("LODistinct", plan);
+ public LOStream(LogicalPlan plan, ExecutableManager exeManager, StreamingCommand cmd, LogicalSchema schema) {
+ super("LOStream", plan);
command = cmd;
executableManager = exeManager;
+ scriptSchema = schema;
}
/**
@@ -60,10 +65,22 @@ public class LOStream extends LogicalRel
public LogicalSchema getSchema() throws FrontendException {
if (schema!=null)
return schema;
- LogicalRelationalOperator input = null;
- input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
- schema = input.getSchema();
+ if (isCastInserted()) {
+ schema = new LogicalSchema();
+ for (int i=0;i<scriptSchema.size();i++) {
+ LogicalSchema.LogicalFieldSchema fs = scriptSchema.getField(i).deepCopy();
+ fs.type = DataType.BYTEARRAY;
+ schema.addField(fs);
+ }
+ } else {
+ if (scriptSchema!=null)
+ schema = scriptSchema.deepCopy();
+ }
+
+ if (schema!=null)
+ uidOnlySchema = schema.mergeUid(uidOnlySchema);
+
return schema;
}
@@ -83,5 +100,13 @@ public class LOStream extends LogicalRel
return false;
}
}
+
+ public void setCastInserted(boolean flag) {
+ castInserted = flag;
+ }
+
+ public boolean isCastInserted() {
+ return castInserted;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Mon Aug 23 19:10:32 2010
@@ -46,38 +46,34 @@ public class LOUnion extends LogicalRela
List<Operator> inputs = null;
inputs = plan.getPredecessors(this);
- // If any predecessor's schema is null, or length of predecessor's schema does not match,
- // then the schema for union is null
- int length = -1;
+ // If any predecessor's schema is null, then the schema for union is null
for (Operator input : inputs) {
LogicalRelationalOperator op = (LogicalRelationalOperator)input;
if (op.getSchema()==null)
return null;
- if (length==-1)
- length = op.getSchema().size();
- else {
- if (op.getSchema().size()!=length)
- return null;
- }
}
- // Check if all predecessor's schema are compatible.
- // TODO: Migrate all existing schema merging rules
- LogicalSchema schema0 = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
- for (int i=1;i<inputs.size();i++) {
+ LogicalSchema s0 = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
+ if (inputs.size()==1)
+ return s0;
+ LogicalSchema s1 = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
+ LogicalSchema mergedSchema = LogicalSchema.merge(s0, s1);
+
+ // Merge schema
+ for (int i=2;i<inputs.size();i++) {
LogicalSchema otherSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema();
- if (!schema0.isEqual(otherSchema))
+ mergedSchema = LogicalSchema.merge(mergedSchema, otherSchema);
+ if (mergedSchema == null)
return null;
}
- // Generate merged schema based on schema of first input
- schema = new LogicalSchema();
- for (int i=0;i<schema0.size();i++)
+ // Bring back cached uid if any; otherwise, cache uid generated
+ for (int i=0;i<s0.size();i++)
{
- LogicalSchema.LogicalFieldSchema fs = new LogicalSchema.LogicalFieldSchema(schema0.getField(i));
+ LogicalSchema.LogicalFieldSchema fs = mergedSchema.getField(i);
long uid = -1;
for (Pair<Long, Long> pair : uidMapping) {
- if (pair.second==schema0.getField(i).uid) {
+ if (pair.second==s0.getField(i).uid) {
uid = pair.first;
break;
}
@@ -91,8 +87,8 @@ public class LOUnion extends LogicalRela
}
fs.uid = uid;
- schema.addField(fs);
}
+ schema = mergedSchema;
return schema;
}