You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2010/03/05 22:55:20 UTC
svn commit: r919634 [2/3] - in /hadoop/pig/trunk: src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/experimental/logical/
src/org/apache/pig/experimental/logical/expression/
src/org/apache/pig/experimental/logica...
Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/PlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/PlanPrinter.java?rev=919634&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/PlanPrinter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/PlanPrinter.java Fri Mar 5 21:55:19 2010
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.optimizer;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.pig.experimental.logical.expression.AddExpression;
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.CastExpression;
+import org.apache.pig.experimental.logical.expression.ConstantExpression;
+import org.apache.pig.experimental.logical.expression.DivideExpression;
+import org.apache.pig.experimental.logical.expression.EqualExpression;
+import org.apache.pig.experimental.logical.expression.GreaterThanEqualExpression;
+import org.apache.pig.experimental.logical.expression.GreaterThanExpression;
+import org.apache.pig.experimental.logical.expression.IsNullExpression;
+import org.apache.pig.experimental.logical.expression.LessThanEqualExpression;
+import org.apache.pig.experimental.logical.expression.LessThanExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.experimental.logical.expression.MapLookupExpression;
+import org.apache.pig.experimental.logical.expression.ModExpression;
+import org.apache.pig.experimental.logical.expression.MultiplyExpression;
+import org.apache.pig.experimental.logical.expression.NegativeExpression;
+import org.apache.pig.experimental.logical.expression.NotEqualExpression;
+import org.apache.pig.experimental.logical.expression.NotExpression;
+import org.apache.pig.experimental.logical.expression.OrExpression;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.expression.SubtractExpression;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOForEach;
+import org.apache.pig.experimental.logical.relational.LOGenerate;
+import org.apache.pig.experimental.logical.relational.LOInnerLoad;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LOStore;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.plan.DepthFirstWalker;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.experimental.plan.PlanWalker;
+import org.apache.pig.experimental.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.impl.plan.optimizer.Rule.WalkerAlgo;
+
+public class PlanPrinter extends AllExpressionVisitor {
+
+ protected PrintStream stream = null;
+ protected int level = 0;
+
+// private String TAB1 = " ";
+// private String TABMore = "| ";
+// private String LSep = "|\n|---";
+// private String USep = "| |\n| ";
+// private int levelCntr = -1;
+
+ public class DepthFirstMemoryWalker extends DepthFirstWalker {
+
+ private int level = 0;
+ private int startingLevel = 0;
+ private Stack<String> prefixStack;
+ private String currentPrefix = "";
+
+ public DepthFirstMemoryWalker(OperatorPlan plan, int startingLevel) {
+ super(plan);
+ level = startingLevel;
+ this.startingLevel = startingLevel;
+ prefixStack = new Stack<String>();
+ }
+
+ @Override
+ public PlanWalker spawnChildWalker(OperatorPlan plan) {
+ return new DepthFirstMemoryWalker(plan, level);
+ }
+
+ /**
+ * Begin traversing the graph.
+ * @param visitor Visitor this walker is being used by.
+ * @throws IOException if an error is encountered while walking.
+ */
+ @Override
+ public void walk(PlanVisitor visitor) throws IOException {
+ List<Operator> roots = plan.getSources();
+ Set<Operator> seen = new HashSet<Operator>();
+
+ depthFirst(null, roots, seen, visitor);
+ }
+
+ public String getPrefix() {
+ return currentPrefix;
+ }
+
+ private void depthFirst(Operator node,
+ Collection<Operator> successors,
+ Set<Operator> seen,
+ PlanVisitor visitor) throws IOException {
+ if (successors == null) return;
+
+ StringBuilder strb = new StringBuilder();
+ for(int i = 0; i < startingLevel; i++ ) {
+ strb.append("|\t");
+ }
+ if( ((level-1) - startingLevel ) >= 0 )
+ strb.append("\t");
+ for(int i = 0; i < ((level-1) - startingLevel ); i++ ) {
+ strb.append("|\t");
+ }
+ strb.append( "|\n" );
+ for(int i = 0; i < startingLevel; i++ ) {
+ strb.append("|\t");
+ }
+ if( ((level-1) - startingLevel ) >= 0 )
+ strb.append("\t");
+ for(int i = 0; i < ((level-1) - startingLevel ); i++ ) {
+ strb.append("|\t");
+ }
+ strb.append("|---");
+ currentPrefix = strb.toString();
+
+ for (Operator suc : successors) {
+ if (seen.add(suc)) {
+ suc.accept(visitor);
+ Collection<Operator> newSuccessors = plan.getSuccessors(suc);
+ level++;
+ prefixStack.push(currentPrefix);
+ depthFirst(suc, newSuccessors, seen, visitor);
+ level--;
+ currentPrefix = prefixStack.pop();
+ }
+ }
+ }
+ }
+
+ public PlanPrinter(OperatorPlan plan, PrintStream ps) {
+ super(plan, new ReverseDependencyOrderWalker(plan));
+ stream = ps;
+ }
+
+ @Override
+ protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) {
+ return new ExprPrinter(expr, level+1);
+ }
+
+ class ExprPrinter extends LogicalExpressionVisitor {
+
+ protected ExprPrinter(OperatorPlan plan, int startingLevel) {
+ super(plan, new DepthFirstMemoryWalker(plan, startingLevel));
+ }
+
+ private void simplevisit(LogicalExpression exp) {
+ stream.print( ((DepthFirstMemoryWalker)currentWalker).getPrefix() );
+ stream.println( exp.toString() );
+ }
+
+ @Override
+ public void visitAnd(AndExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitOr(OrExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitEqual(EqualExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitProject(ProjectExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitMapLookup(MapLookupExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitConstant(ConstantExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitCast(CastExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitGreaterThan(GreaterThanExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitGreaterThanEqual(GreaterThanEqualExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitLessThan(LessThanExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitLessThanEqual(LessThanEqualExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitNotEqual(NotEqualExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitNot(NotExpression exp ) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitIsNull(IsNullExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitNegative(NegativeExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitAdd(AddExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitSubtract(SubtractExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitMultiply(MultiplyExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitMod(ModExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+
+ @Override
+ public void visitDivide(DivideExpression exp) throws IOException {
+ simplevisit(exp);
+ }
+ }
+
+ @Override
+ public void visitLOLoad(LOLoad op) throws IOException {
+ printLevel();
+ stream.println( op.toString() );
+ }
+
+ @Override
+ public void visitLOStore(LOStore op) throws IOException {
+ printLevel();
+ stream.println( op.toString() );
+ }
+
+ @Override
+ public void visitLOForEach(LOForEach op) throws IOException {
+ printLevel();
+ stream.println( op.toString() );
+ level++;
+ OperatorPlan innerPlan = op.getInnerPlan();
+ PlanWalker newWalker = currentWalker.spawnChildWalker(innerPlan);
+ pushWalker(newWalker);
+ currentWalker.walk(this);
+ popWalker();
+ level--;
+ }
+
+ @Override
+ public void visitLOFilter(LOFilter op) throws IOException {
+ printLevel();
+ stream.println( op.toString() );
+ LogicalExpressionVisitor v = getVisitor(op.getFilterPlan());
+ level++;
+ v.visit();
+ level--;
+ }
+
+ @Override
+ public void visitLOGenerate(LOGenerate op) throws IOException {
+ printLevel();
+ stream.println( op.toString() );
+ List<LogicalExpressionPlan> plans = op.getOutputPlans();
+ LogicalExpressionVisitor v = null;
+ level++;
+ for( LogicalExpressionPlan plan : plans ) {
+ v = getVisitor(plan);
+ v.visit();
+ }
+ level--;
+ }
+
+ @Override
+ public void visitLOInnerLoad(LOInnerLoad op) throws IOException {
+ printLevel();
+ stream.println( op.toString() );
+ }
+
+ public String toString() {
+ return stream.toString();
+ }
+
+ private void printLevel() {
+ for(int i =0; i < level; i++ ) {
+ stream.print("|\t");
+ }
+ stream.println("|");
+ for(int i =0; i < level; i++ ) {
+ stream.print("|\t");
+ }
+ stream.print("|---");
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/UidStamper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/UidStamper.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/UidStamper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/optimizer/UidStamper.java Fri Mar 5 21:55:19 2010
@@ -21,19 +21,29 @@
import java.io.IOException;
import java.util.List;
+import org.apache.pig.experimental.logical.expression.AddExpression;
import org.apache.pig.experimental.logical.expression.AndExpression;
import org.apache.pig.experimental.logical.expression.CastExpression;
import org.apache.pig.experimental.logical.expression.ConstantExpression;
+import org.apache.pig.experimental.logical.expression.DivideExpression;
import org.apache.pig.experimental.logical.expression.EqualExpression;
import org.apache.pig.experimental.logical.expression.GreaterThanEqualExpression;
import org.apache.pig.experimental.logical.expression.GreaterThanExpression;
+import org.apache.pig.experimental.logical.expression.IsNullExpression;
import org.apache.pig.experimental.logical.expression.LessThanEqualExpression;
import org.apache.pig.experimental.logical.expression.LessThanExpression;
import org.apache.pig.experimental.logical.expression.LogicalExpression;
import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
import org.apache.pig.experimental.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.experimental.logical.expression.MapLookupExpression;
+import org.apache.pig.experimental.logical.expression.ModExpression;
+import org.apache.pig.experimental.logical.expression.MultiplyExpression;
+import org.apache.pig.experimental.logical.expression.NegativeExpression;
+import org.apache.pig.experimental.logical.expression.NotEqualExpression;
+import org.apache.pig.experimental.logical.expression.NotExpression;
import org.apache.pig.experimental.logical.expression.OrExpression;
import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.expression.SubtractExpression;
import org.apache.pig.experimental.logical.relational.LOLoad;
import org.apache.pig.experimental.logical.relational.LogicalSchema;
import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
@@ -98,6 +108,11 @@
public void visitProject(ProjectExpression project) throws IOException {
project.setUid(currentOp);
}
+
+ @Override
+ public void visitMapLookup( MapLookupExpression op ) throws IOException {
+ op.setUid(currentOp);
+ }
@Override
public void visitConstant(ConstantExpression constant) throws IOException {
@@ -108,6 +123,48 @@
public void visitCast(CastExpression cast) throws IOException {
cast.setUid(currentOp);
}
+
+ @Override
+ public void visitNotEqual(NotEqualExpression exp) throws IOException {
+ exp.setUid(currentOp);
+ }
+
+ @Override
+ public void visitNot(NotExpression exp ) throws IOException {
+ exp.setUid(currentOp);
+ }
+
+ @Override
+ public void visitIsNull(IsNullExpression exp) throws IOException {
+ exp.setUid(currentOp);
+ }
+
+ @Override
+ public void visitNegative(NegativeExpression exp) throws IOException {
+ exp.setUid(currentOp);
+ }
+
+ @Override
+ public void visitAdd(AddExpression exp) throws IOException {
+ exp.setUid(currentOp);
+ }
+
+ @Override
+ public void visitSubtract(SubtractExpression exp) throws IOException {
+ exp.setUid(currentOp);
+ }
+
+ public void visitMultiply(MultiplyExpression op) throws IOException {
+ op.setUid(currentOp);
+ }
+
+ public void visitMod(ModExpression op) throws IOException {
+ op.setUid(currentOp);
+ }
+
+ public void visitDivide(DivideExpression op) throws IOException {
+ op.setUid(currentOp);
+ }
}
/* (non-Javadoc)
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOForEach.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOForEach.java Fri Mar 5 21:55:19 2010
@@ -18,6 +18,7 @@
package org.apache.pig.experimental.logical.relational;
import java.io.IOException;
+import java.util.Iterator;
import java.util.List;
import org.apache.pig.experimental.plan.Operator;
@@ -50,6 +51,17 @@
return innerPlan.isEqual(((LOForEach)other).innerPlan);
}
+
+ public void resetSchema() {
+ super.resetSchema();
+
+ // reset schema in the inner plan
+ Iterator<Operator> iter = innerPlan.getOperators();
+ while(iter.hasNext()) {
+ LogicalRelationalOperator op = (LogicalRelationalOperator)iter.next();
+ op.resetSchema();
+ }
+ }
@Override
public LogicalSchema getSchema() {
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java Fri Mar 5 21:55:19 2010
@@ -48,20 +48,13 @@
schema = new LogicalSchema();
for(int i=0; i<outputPlans.size(); i++) {
- LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSinks().get(0);
+ LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSources().get(0);
byte t = exp.getType();
LogicalSchema fieldSchema = null;
String alias = null;
- // if type is primitive, just add to schema
- if (t != DataType.TUPLE && t != DataType.BAG) {
- LogicalFieldSchema f = new LogicalSchema.LogicalFieldSchema(alias, fieldSchema, t, exp.getUid());
- schema.addField(f);
- continue;
- }
-
// for tuple and bag type, if there is projection, calculate schema of this field
- if (exp instanceof ProjectExpression) {
+ if (exp instanceof ProjectExpression) {
LogicalRelationalOperator op = null;
try{
op = ((ProjectExpression)exp).findReferent(this);
@@ -74,7 +67,14 @@
alias = s.getField(((ProjectExpression)exp).getColNum()).alias;
}
}
-
+
+ // if type is primitive, just add to schema
+ if (t != DataType.TUPLE && t != DataType.BAG) {
+ LogicalFieldSchema f = new LogicalSchema.LogicalFieldSchema(alias, fieldSchema, t, exp.getUid());
+ schema.addField(f);
+ continue;
+ }
+
// if flatten is set, set schema of tuple field to this schema
if (flattenFlags[i]) {
if (t == DataType.BAG) {
@@ -91,7 +91,8 @@
if (fieldSchema != null) {
List<LogicalFieldSchema> ll = fieldSchema.getFields();
for(LogicalFieldSchema f: ll) {
- schema.addField(f);
+ LogicalFieldSchema nf = new LogicalSchema.LogicalFieldSchema(alias+"::"+f.alias, f.schema, f.type, f.uid);
+ schema.addField(nf);
}
} else {
schema = null;
@@ -102,7 +103,6 @@
schema.addField(f);
}
}
-
return schema;
}
@@ -148,4 +148,12 @@
((LogicalPlanVisitor)v).visitLOGenerate(this);
}
+ @Override
+ public String toString() {
+ StringBuilder msg = new StringBuilder();
+
+ msg.append("(Name: " + name + " Schema: " + getSchema() + ")");
+
+ return msg.toString();
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java Fri Mar 5 21:55:19 2010
@@ -19,6 +19,8 @@
import java.io.IOException;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
import org.apache.pig.experimental.plan.Operator;
import org.apache.pig.experimental.plan.OperatorPlan;
import org.apache.pig.experimental.plan.PlanVisitor;
@@ -28,13 +30,19 @@
* It can only be used in the inner plan of LOForEach
*
*/
-public class LOInnerLoad extends LogicalRelationalOperator {
- private int colNum;
+public class LOInnerLoad extends LogicalRelationalOperator {
+ private ProjectExpression prj;
private LOForEach foreach;
public LOInnerLoad(OperatorPlan plan, LOForEach foreach, int colNum) {
- super("LOInnerLoad", plan);
- this.colNum = colNum;
+ super("LOInnerLoad", plan);
+
+ // store column number as a ProjectExpression in a plan
+ // to be able to dynamically adjust column number during optimization
+ LogicalExpressionPlan exp = new LogicalExpressionPlan();
+
+ // we don't care about type, so set to -1
+ prj = new ProjectExpression(exp, (byte)-1, 0, colNum);
this.foreach = foreach;
}
@@ -47,9 +55,19 @@
LogicalPlan p = (LogicalPlan)foreach.getPlan();
try {
LogicalRelationalOperator op = (LogicalRelationalOperator)p.getPredecessors(foreach).get(0);
- if (op.getSchema() != null) {
- schema = new LogicalSchema();
- schema.addField(op.getSchema().getField(colNum));
+ LogicalSchema s = op.getSchema();
+ if (s != null) {
+ schema = new LogicalSchema();
+ long uid = prj.getUid();
+ for(int i=0; i<s.size(); i++) {
+ if (uid == s.getField(i).uid) {
+ schema.addField(s.getField(i));
+ }
+ }
+ }
+
+ if ( schema != null && schema.size() == 0) {
+ schema = null;
}
}catch(Exception e) {
throw new RuntimeException(e);
@@ -57,6 +75,10 @@
return schema;
}
+
+ public LogicalExpressionPlan getExpression() {
+ return (LogicalExpressionPlan)prj.getPlan();
+ }
@Override
public boolean isEqual(Operator other) {
@@ -64,7 +86,7 @@
return false;
}
- return (colNum == ((LOInnerLoad)other).colNum);
+ return (getColNum() == ((LOInnerLoad)other).getColNum());
}
@Override
@@ -76,7 +98,7 @@
}
public int getColNum() {
- return colNum;
+ return prj.getColNum();
}
/**
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java Fri Mar 5 21:55:19 2010
@@ -20,14 +20,18 @@
import java.io.IOException;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadPushDown;
import org.apache.pig.experimental.plan.Operator;
import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
public class LOLoad extends LogicalRelationalOperator {
private LogicalSchema scriptSchema;
private FileSpec fs;
+ private transient LoadPushDown loadFunc;
/**
*
@@ -42,6 +46,25 @@
fs = loader;
}
+ public LoadPushDown getLoadPushDown() {
+ try {
+ if (loadFunc == null) {
+ Object obj = PigContext.instantiateFuncFromSpec(fs.getFuncSpec());
+ if (obj instanceof LoadPushDown) {
+ loadFunc = (LoadPushDown)obj;
+ }
+ }
+
+ return loadFunc;
+ }catch (ClassCastException cce) {
+ throw new RuntimeException(fs.getFuncSpec() + " should implement the LoadFunc interface.");
+ }
+ }
+
+ public void setScriptSchema(LogicalSchema schema) {
+ scriptSchema = schema;
+ }
+
/**
* Get the schema for this load. The schema will be either be what was
* given by the user in the script or what the load functions getSchema
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java Fri Mar 5 21:55:19 2010
@@ -187,11 +187,6 @@
exprOp.setColumn(load.getColNum());
// set input to POProject to the predecessor of foreach
- List<PhysicalOperator> l = new ArrayList<PhysicalOperator>();
- LOForEach foreach = load.getLOForEach();
- Operator pred = foreach.getPlan().getPredecessors(foreach).get(0);
- l.add(logToPhyMap.get(pred));
- exprOp.setInputs(l);
logToPhyMap.put(load, exprOp);
currentPlan.add(exprOp);
@@ -214,19 +209,10 @@
// we need to translate each predecessor of LOGenerate into a physical plan.
// The physical plan should contain the expression plan for this predecessor plus
// the subtree starting with this predecessor
- for (int i=0; i<preds.size(); i++) {
+ for (int i=0; i<exps.size(); i++) {
currentPlan = new PhysicalPlan();
- // translate the predecessors into a physical plan
- PlanWalker childWalker = new SubtreeDependencyOrderWalker(inner, preds.get(i));
- pushWalker(childWalker);
- childWalker.walk(this);
- popWalker();
-
- // get the leaf of partially translated plan
- PhysicalOperator leaf = currentPlan.getLeaves().get(0);
-
- // add up the expressions
- childWalker = new ReverseDependencyOrderWalker(exps.get(i));
+ // translate the expression plan
+ PlanWalker childWalker = new ReverseDependencyOrderWalker(exps.get(i));
pushWalker(childWalker);
childWalker.walk(new ExpToPhyTranslationVisitor(exps.get(i),
childWalker, gen, currentPlan, logToPhyMap ));
@@ -237,29 +223,34 @@
PhysicalOperator op = logToPhyMap.get(l);
if (l instanceof ProjectExpression) {
int input = ((ProjectExpression)l).getInputNum();
+
+ // for each sink projection, get its input logical plan and translate it
Operator pred = preds.get(input);
+ childWalker = new SubtreeDependencyOrderWalker(inner, pred);
+ pushWalker(childWalker);
+ childWalker.walk(this);
+ popWalker();
+
+ // get the physical operator of the leaf of input logical plan
+ PhysicalOperator leaf = logToPhyMap.get(pred);
+
if (pred instanceof LOInnerLoad) {
- List<PhysicalOperator> ll = currentPlan.getSuccessors(op);
- PhysicalOperator[] ll2 = null;
- if (ll != null) {
- ll2 = ll.toArray(new PhysicalOperator[0]);
- }
- currentPlan.remove(op);
- if (ll2 != null) {
- for(PhysicalOperator suc: ll2) {
- currentPlan.connect(leaf, suc);
- }
- }
-
- innerPlans.add(currentPlan);
-
- continue;
+ // if predecessor is only an LOInnerLoad, remove the project that
+ // comes from LOInnerLoad and change the column of project that
+ // comes from expression plan
+ currentPlan.remove(leaf);
+ logToPhyMap.remove(pred);
+ ((POProject)op).setColumn( ((POProject)leaf).getColumn() );
+
+ }else{
+ currentPlan.connect(leaf, op);
}
}
-
- currentPlan.connect(leaf, op);
- innerPlans.add(currentPlan);
- }
+ }
+
+
+
+ innerPlans.add(currentPlan);
}
currentPlan = currentPlans.pop();
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java Fri Mar 5 21:55:19 2010
@@ -18,172 +18,18 @@
package org.apache.pig.experimental.logical.relational;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
import org.apache.pig.experimental.plan.BaseOperatorPlan;
-import org.apache.pig.experimental.plan.Operator;
import org.apache.pig.experimental.plan.OperatorPlan;
-import org.apache.pig.experimental.plan.OperatorSubPlan;
/**
* LogicalPlan is the logical view of relational operations Pig will execute
- * for a given script. Note that it contains only realtional operations.
+ * for a given script. Note that it contains only relational operations.
* All expressions will be contained in LogicalExpressionPlans inside
- * each relational operator. LogicalPlan provides operations for
- * removing and adding LogicalRelationalOperators. These will handle doing
- * all of the necessary add, remove, connect, and disconnect calls in
- * OperatorPlan. They will not handle patching up individual relational
- * operators. That will be handle by the various Patchers.
- *
+ * each relational operator.
*/
public class LogicalPlan extends BaseOperatorPlan {
/**
- * Add a relational operation to the plan.
- * @param before operator that will be before the new operator. This
- * operator should already be in the plan. If before is null then
- * the new operator will be a root.
- * @param newOper new operator to add. This operator should not already
- * be in the plan.
- * @param after operator that will be after the new operator. This
- * operator should already be in the plan. If after is null, then the
- * new operator will be a leaf.
- * @throws IOException if add is already in the plan, or before or after
- * are not in the plan.
- */
- public void add(LogicalRelationalOperator before,
- LogicalRelationalOperator newOper,
- LogicalRelationalOperator after) throws IOException {
- doAdd(before, newOper, after);
- }
-
- /**
- * Add a relational operation with multiple outputs to the plan.
- * @param before operators that will be before the new operator. These
- * operator should already be in the plan.
- * @param newOper new operator to add. This operator should not already
- * be in the plan.
- * @param after operator that will be after the new operator. This
- * operator should already be in the plan. If after is null, then the
- * new operator will be a leaf.
- * @throws IOException if add is already in the plan, or before or after
- * are not in the plan.
- */
- public void add(LogicalRelationalOperator[] before,
- LogicalRelationalOperator newOper,
- LogicalRelationalOperator after) throws IOException {
- doAdd(null, newOper, after);
-
- for (LogicalRelationalOperator op : before) {
- checkIn(op);
- connect(op, newOper);
- }
- }
-
- /**
- * Add a relational operation with multiple inputs to the plan.
- * @param before operator that will be before the new operator. This
- * operator should already be in the plan. If before is null then
- * the new operator will be a root.
- * @param newOper new operator to add. This operator should not already
- * be in the plan.
- * @param after operators that will be after the new operator. These
- * operator should already be in the plan.
- * @throws IOException if add is already in the plan, or before or after
- * are not in the plan.
- */
- public void add(LogicalRelationalOperator before,
- LogicalRelationalOperator newOper,
- LogicalRelationalOperator[] after) throws IOException {
- doAdd(before, newOper, null);
-
- for (LogicalRelationalOperator op : after) {
- checkIn(op);
- connect(newOper, op);
- }
- }
-
- /**
- * Add a relational operation to the plan when the caller wants to control
- * how the nodes are connected in the graph.
- * @param before operator that will be before the new operator. This
- * operator should already be in the plan. before should not be null.
- * the new operator will be a root.
- * @param beforeToPos Position in before's edges to connect newOper at.
- * @param beforeFromPos Position in newOps's edges to connect before at.
- * @param newOper new operator to add. This operator should not already
- * be in the plan.
- * @param afterToPos Position in after's edges to connect newOper at.
- * @param afterFromPos Position in newOps's edges to connect after at.
- * @param after operator that will be after the new operator. This
- * operator should already be in the plan. If after is null, then the
- * new operator will be a leaf.
- * @throws IOException if add is already in the plan, or before or after
- * are not in the plan.
- */
- public void add(LogicalRelationalOperator before,
- int beforeToPos,
- int beforeFromPos,
- LogicalRelationalOperator newOper,
- int afterToPos,
- int afterFromPos,
- LogicalRelationalOperator after) throws IOException {
- if (before != null) checkIn(before);
- if (after != null) checkIn(after);
- checkNotIn(newOper);
-
- add(newOper);
- if (before != null) connect(before, beforeToPos, newOper, beforeFromPos);
- if (after != null) connect(newOper, afterToPos, after, afterFromPos);
-
- }
-
- /**
- * Remove an operator from the logical plan. This call will take care
- * of disconnecting the operator, connecting the predecessor(s) and
- * successor(s) and patching up the plan.
- * @param op operator to be removed.
- * @throws IOException If the operator is not in the plan.
- */
- public void removeLogical(LogicalRelationalOperator op) throws IOException {
-
- checkIn(op);
- List<Operator> pred = getPredecessors(op);
- List<Operator> succ = getSuccessors(op);
- int predSz = pred.size();
- int succSz = succ.size();
- if (predSz > 1 && succSz > 1) {
- // Don't have a clue what to do here. We shouldn't have any
- // operators that have multiple inputs and multiple outputs.
- throw new IOException("Attempt to remove a node with multiple "
- + "inputs and outputs!");
- }
-
- // Disconnect and remove the given node.
- for (Operator p : pred) {
- disconnect(p, op);
- }
- for (Operator s : succ) {
- disconnect(op, s);
- }
- remove(op);
-
- // Now reconnect the before and after
- if (predSz > 1 && succSz == 1) {
- for (Operator p : pred) {
- connect(p, succ.get(0));
- }
- } else if (predSz == 1 && succSz >= 1) {
- for (Operator s : succ) {
- connect(pred.get(0), s);
- }
- }
-
- }
-
- /**
* Equality is checked by calling equals on every leaf in the plan. This
* assumes that plans are always connected graphs. It is somewhat
* inefficient since every leaf will test equality all the way to
@@ -205,29 +51,4 @@
return super.isEqual(other);
}
- private void doAdd(LogicalRelationalOperator before,
- LogicalRelationalOperator newOper,
- LogicalRelationalOperator after) throws IOException {
- if (before != null) checkIn(before);
- if (after != null) checkIn(after);
- checkNotIn(newOper);
-
- add(newOper);
- if (before != null) connect(before, newOper);
- if (after != null) connect(newOper, after);
- }
-
- private void checkIn(LogicalRelationalOperator op) throws IOException {
- if (!ops.contains(op)) {
- throw new IOException("Attempt to use operator " + op.getName() +
- " which is not in the plan.");
- }
- }
-
- private void checkNotIn(LogicalRelationalOperator op) throws IOException {
- if (ops.contains(op)) {
- throw new IOException("Attempt to add operator " + op.getName() +
- " which is already in the plan.");
- }
- }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java Fri Mar 5 21:55:19 2010
@@ -140,5 +140,13 @@
if (!s.isEqual(os)) return false;
}
return true;
- }
+ }
+
+ public String toString() {
+ StringBuilder msg = new StringBuilder();
+
+ msg.append("(Name: " + name + " Schema: " + getSchema() + ")");
+
+ return msg.toString();
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java Fri Mar 5 21:55:19 2010
@@ -23,6 +23,9 @@
import java.util.List;
import java.util.Map;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.util.Pair;
+
/**
* Schema, from a logical perspective.
*/
@@ -60,16 +63,31 @@
return false;
}
}
+
+ public String toString() {
+ if( type == DataType.BAG ) {
+ if( schema == null ) {
+ return ( alias + "#" + uid + ":bag{}#" );
+ }
+ return ( alias + "#" + uid + ":bag{" + schema.toString() + "}" );
+ } else if( type == DataType.TUPLE ) {
+ if( schema == null ) {
+ return ( alias + "#" + uid + ":tuple{}" );
+ }
+ return ( alias + "#" + uid + ":tuple(" + schema.toString() + ")" );
+ }
+ return ( alias + "#" + uid + ":" + DataType.findTypeName(type) );
+ }
}
private List<LogicalFieldSchema> fields;
- private Map<String, Integer> aliases;
+ private Map<String, Pair<Integer, Boolean>> aliases;
public LogicalSchema() {
fields = new ArrayList<LogicalFieldSchema>();
- aliases = new HashMap<String, Integer>();
+ aliases = new HashMap<String, Pair<Integer, Boolean>>();
}
/**
@@ -79,16 +97,25 @@
public void addField(LogicalFieldSchema field) {
fields.add(field);
if (field.alias != null && !field.alias.equals("")) {
- aliases.put(field.alias, fields.size() - 1);
+ // put the full name of this field into aliases map
+ // boolean in the pair indicates if this alias is full name
+ aliases.put(field.alias, new Pair<Integer, Boolean>(fields.size()-1, true));
int index = 0;
+
+ // check and put short names into alias map if there is no conflict
while(index != -1) {
index = field.alias.indexOf("::", index);
if (index != -1) {
String a = field.alias.substring(index+2);
if (aliases.containsKey(a)) {
- aliases.remove(a);
+ // remove conflict if the conflict is not full name
+ // we can never remove full name
+ if (!aliases.get(a).second) {
+ aliases.remove(a);
+ }
}else{
- aliases.put(a, fields.size()-1);
+ // put alias into map and indicate it is a short name
+ aliases.put(a, new Pair<Integer, Boolean>(fields.size()-1, false));
}
index = index +2;
@@ -103,12 +130,12 @@
* @return field associated with alias, or null if no such field
*/
public LogicalFieldSchema getField(String alias) {
- Integer index = aliases.get(alias);
- if (index == null) {
+ Pair<Integer, Boolean> p = aliases.get(alias);
+ if (p == null) {
return null;
}
- return fields.get(index);
+ return fields.get(p.first);
}
/**
@@ -165,4 +192,16 @@
return null;
}
+ public String toString() {
+ StringBuilder str = new StringBuilder();
+
+ for( LogicalFieldSchema field : fields ) {
+ str.append( field.toString() + "," );
+ }
+ if( fields.size() != 0 ) {
+ str.deleteCharAt( str.length() -1 );
+ }
+ return str.toString();
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java Fri Mar 5 21:55:19 2010
@@ -38,6 +38,12 @@
import org.apache.pig.experimental.plan.optimizer.Transformer;
import org.apache.pig.impl.util.Pair;
+/**
+ * This Rule moves Filter Above Foreach.
+ * It checks if uid on which filter works on
+ * is present in the predecessor of foreach.
+ * If so it transforms it.
+ */
public class FilterAboveForeach extends Rule {
public FilterAboveForeach(String n) {
@@ -47,7 +53,7 @@
@Override
protected OperatorPlan buildPattern() {
// the pattern that this rule looks for
- // is foreach -> flatten -> filter
+ // is foreach -> filter
LogicalPlan plan = new LogicalPlan();
LogicalRelationalOperator foreach = new LOForEach(plan);
LogicalRelationalOperator filter = new LOFilter(plan);
@@ -85,25 +91,6 @@
// This would be a strange case
if( foreach == null ) return false;
- List<Operator> sinks = foreach.getInnerPlan().getSinks();
- if( ! ( sinks.size() == 1 && (sinks.get(0) instanceof LOGenerate ) ) ) {
- return false;
- }
-
-// LOGenerate generate = (LOGenerate)sinks.get(0);
-// // We check if we have any flatten
-// // Other cases are handled by other Optimizers
-// boolean hasFlatten = false;
-// for( boolean flattenFlag : generate.getFlattenFlags() ) {
-// if( flattenFlag ) {
-// hasFlatten = true;
-// break;
-// }
-// }
-//
-// if( !hasFlatten )
-// return false;
-
iter = matched.getOperators();
while( iter.hasNext() ) {
Operator op = iter.next();
@@ -147,6 +134,11 @@
return false;
}
+ /**
+ * Get all uids from Projections of this FilterOperator
+ * @param filter
+ * @return Set of uid
+ */
private Set<Long> getFilterProjectionUids( LOFilter filter ) {
Set<Long> uids = new HashSet<Long>();
if( filter != null ) {
@@ -163,7 +155,12 @@
return uids;
}
- // check if a relational operator contains all of the specified uids
+ /**
+ * checks if a relational operator contains all of the specified uids
+ * @param op LogicalRelational operator that should contain the uid
+ * @param uids Uids to check for
+ * @return true if given LogicalRelationalOperator has all the given uids
+ */
private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) {
LogicalSchema schema = op.getSchema();
List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
@@ -223,14 +220,17 @@
/*
* ForEachPred
* |
- * ForEach
+ * ForEach
* |
* Filter*
+ * ( These are filters
+ * which cannot be moved )
* |
* FilterPred
- * ( has to be a Filter or ForEach )
+ * ( is a Filter )
* |
* Filter
+ * ( To be moved )
* |
* FilterSuc
*
@@ -243,15 +243,23 @@
* ForEachPred
* |
* Filter
+ * ( After being Moved )
* |
* ForEach
* |
* Filter*
+ * ( These are filters
+ * which cannot be moved )
* |
* FilterPred
- * ( has to be a Filter or ForEach )
+ * ( is a Filter )
* |
* FilterSuc
+ *
+ * Above plan is assuming we are modifying the filter in middle.
+ * If we are modifying the first filter after ForEach then
+ * -- * (kleene star) becomes zero
+ * -- And ForEach is FilterPred
*/
Pair<Integer, Integer> forEachPredPlaces = currentPlan.disconnect(forEachPred, foreach);
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java Fri Mar 5 21:55:19 2010
@@ -19,6 +19,7 @@
package org.apache.pig.experimental.plan;
import java.io.IOException;
+import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
@@ -27,6 +28,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.experimental.logical.optimizer.PlanPrinter;
+import org.apache.pig.impl.logicalLayer.DotLOPrinter;
+import org.apache.pig.impl.logicalLayer.LOPrinter;
+import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.Pair;
public abstract class BaseOperatorPlan implements OperatorPlan {
@@ -258,4 +263,13 @@
return false;
}
+ public void explain(PrintStream ps, String format, boolean verbose)
+ throws IOException {
+ ps.println("#-----------------------------------------------");
+ ps.println("# New Logical Plan:");
+ ps.println("#-----------------------------------------------");
+
+ PlanPrinter npp = new PlanPrinter(this, ps);
+ npp.visit();
+}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java Fri Mar 5 21:55:19 2010
@@ -74,6 +74,15 @@
}
/**
+ * Remove an annotation
+ * @param key the key of the annotation
+ * @return the original value of the annotation
+ */
+ public Object removeAnnotation(String key) {
+ return annotations.remove(key);
+ }
+
+ /**
* This is like a shallow equals comparison.
* It returns true if two operators have equivalent properties even if they are
* different objects. Here properties mean equivalent plan and equivalent name.
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java Fri Mar 5 21:55:19 2010
@@ -117,7 +117,7 @@
* This is like a shallow comparison.
* Two plans are equal if they have equivalent operators and equivalent
* structure.
- * @param other object to compare
+ * @param other object to compare
* @return boolean if both the plans are equivalent
*/
public boolean isEqual( OperatorPlan other );
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorSubPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorSubPlan.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorSubPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorSubPlan.java Fri Mar 5 21:55:19 2010
@@ -53,6 +53,8 @@
public void add(Operator op) {
operators.add(op);
+ leaves.clear();
+ roots.clear();
}
public void connect(Operator from, int fromPos, Operator to, int toPos) {
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterAboveForeach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterAboveForeach.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterAboveForeach.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterAboveForeach.java Fri Mar 5 21:55:19 2010
@@ -138,7 +138,6 @@
assertTrue("Failed to set a valid uid", false );
}
-
// run filter rule
Rule r = new FilterAboveForeach("FilterAboveFlatten");
Set<Rule> s = new HashSet<Rule>();
@@ -357,6 +356,8 @@
assertTrue( plan.getPredecessors(filter).contains(load) );
assertEquals( 1, plan.getPredecessors(filter).size() );
+ assertEquals( load.getSchema().getField(0).uid, namePrj2.getUid() );
+ assertEquals( namePrj2.getUid(), prjName.getUid() );
assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
@@ -528,6 +529,10 @@
assertTrue( plan.getPredecessors(filter).contains(filter2) );
assertEquals( 1, plan.getPredecessors(filter).size() );
+ assertEquals( load.getSchema().getField(0).uid, namePrj2.getUid() );
+ assertEquals( namePrj2.getUid(), name2Prj2.getUid() );
+ assertEquals( name2Prj2.getUid(), prjName.getUid() );
+
assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
@@ -694,6 +699,9 @@
assertTrue( plan.getPredecessors(foreach).contains(load) );
assertEquals( 1, plan.getPredecessors(foreach).size() );
+ assertFalse( prjCuisines.getUid() == namePrj2.getUid() );
+ assertFalse( prjCuisines.getUid() == name2Prj2.getUid() );
+
assertTrue( plan.getPredecessors(filter).contains(filter2) );
assertEquals( 1, plan.getPredecessors(filter).size() );
@@ -836,6 +844,8 @@
assertTrue( plan.getPredecessors(stor).contains(filter) );
assertEquals( 1, plan.getPredecessors(stor).size() );
+ assertFalse( prjCuisines.getUid() == namePrj2.getUid() );
+
assertTrue( plan.getPredecessors(filter).contains(foreach) );
assertEquals( 1, plan.getPredecessors(filter).size() );
@@ -985,6 +995,9 @@
assertTrue( plan.getPredecessors(foreach).contains(filter) );
assertEquals( 1, plan.getPredecessors(foreach).size() );
+ assertEquals( load.getSchema().getField(0).uid , namePrj2.getUid() );
+ assertEquals( namePrj2.getUid(), prjName.getUid() );
+
assertTrue( foreach.getInnerPlan().getSinks().contains(generate) );
assertEquals( 1, foreach.getInnerPlan().getSinks().size() );
assertTrue( foreach.getInnerPlan().getSources().contains(innerLoad) );
@@ -1004,201 +1017,4 @@
}
}
-
-// public class MyPrintVisitor extends AllExpressionVisitor {
-//
-// private PrintStream mStream = null;
-// private String TAB1 = " ";
-// private String TABMore = "| ";
-// private String LSep = "|\n|---";
-// private String USep = "| |\n| ";
-// private int levelCntr = -1;
-// private boolean isVerbose = true;
-//
-// public MyPrintVisitor(OperatorPlan plan, PrintStream ps) {
-// super(plan, new DepthFirstWalker(plan));
-// mStream = ps;
-// }
-//
-// @Override
-// protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) {
-// // TODO Auto-generated method stub
-// return null;
-// }
-//
-// @Override
-// public void visit() throws VisitorException {
-// try {
-// mStream.write(depthFirstLP().getBytes());
-// } catch (IOException e) {
-// throw new VisitorException(e);
-// }
-// }
-//
-// public void setVerbose(boolean verbose) {
-// isVerbose = verbose;
-// }
-//
-// public void print(OutputStream printer) throws VisitorException, IOException {
-// printer.write(depthFirstLP().getBytes());
-// }
-//
-// class LogicalRelationalOperatorCompare implements Comparator<LogicalRelationalOperator> {
-//
-// @Override
-// public int compare(LogicalRelationalOperator o1,
-// LogicalRelationalOperator o2) {
-// return 0;
-// }
-//
-// }
-//
-// protected String depthFirstLP() throws VisitorException, IOException {
-// StringBuilder sb = new StringBuilder();
-// List<Operator> leaves = plan.getSinks();
-// // Collections.sort(leaves, c)
-// for (Operator leaf : leaves) {
-// sb.append(depthFirst(leaf));
-// sb.append("\n");
-// }
-// //sb.delete(sb.length() - "\n".length(), sb.length());
-// //sb.delete(sb.length() - "\n".length(), sb.length());
-// return sb.toString();
-// }
-//
-// private String planString(LogicalPlan lp) throws VisitorException, IOException {
-// StringBuilder sb = new StringBuilder();
-// ByteArrayOutputStream baos = new ByteArrayOutputStream();
-// if(lp!=null)
-// lp.explain(baos, mStream);
-// else
-// return "";
-// sb.append(USep);
-// sb.append(shiftStringByTabs(baos.toString(), 2));
-// return sb.toString();
-// }
-//
-// private String planString(
-// List<LogicalPlan> logicalPlanList) throws VisitorException, IOException {
-// StringBuilder sb = new StringBuilder();
-// if(logicalPlanList!=null)
-// for (LogicalPlan lp : logicalPlanList) {
-// sb.append(planString(lp));
-// }
-// return sb.toString();
-// }
-//
-// private String depthFirst(Operator node) throws VisitorException, IOException {
-// StringBuilder sb = new StringBuilder(node.getName());
-// if(node instanceof LogicalExpression) {
-// sb.append(" FieldSchema: ");
-// try {
-// sb.append(((LogicalExpression)node).getUid());
-// sb.append(" Type: " + DataType.findTypeName(((LogicalExpression)node).getType()));
-// } catch (Exception e) {
-// sb.append("Caught Exception: " + e.getMessage());
-// }
-// } else if( node instanceof LogicalRelationalOperator ){
-// sb.append(" Schema: ");
-// try {
-// sb.append(((LogicalRelationalOperator)node).getSchema());
-// } catch (Exception e) {
-// sb.append("Caught exception: " + e.getMessage());
-// }
-// }
-//
-// sb.append("\n");
-//
-// if (isVerbose) {
-// if(node instanceof LOFilter){
-// sb.append(planString(((LOFilter)node).getComparisonPlan()));
-// }
-// else if(node instanceof LOForEach){
-// sb.append(planString(((LOForEach)node).getForEachPlans()));
-// }
-// else if(node instanceof LOGenerate){
-// sb.append(planString(((LOGenerate)node).getGeneratePlans()));
-//
-// }
-// else if(node instanceof LOCogroup){
-// MultiMap<LogicalOperator, LogicalPlan> plans = ((LOCogroup)node).getGroupByPlans();
-// for (LogicalOperator lo : plans.keySet()) {
-// // Visit the associated plans
-// for (LogicalPlan plan : plans.get(lo)) {
-// sb.append(planString(plan));
-// }
-// }
-// }
-// else if(node instanceof LOJoin){
-// MultiMap<LogicalOperator, LogicalPlan> plans = ((LOJoin)node).getJoinPlans();
-// for (LogicalOperator lo : plans.keySet()) {
-// // Visit the associated plans
-// for (LogicalPlan plan : plans.get(lo)) {
-// sb.append(planString(plan));
-// }
-// }
-// }
-// else if(node instanceof LOJoin){
-// MultiMap<LogicalOperator, LogicalPlan> plans = ((LOJoin)node).getJoinPlans();
-// for (LogicalOperator lo : plans.keySet()) {
-// // Visit the associated plans
-// for (LogicalPlan plan : plans.get(lo)) {
-// sb.append(planString(plan));
-// }
-// }
-// }
-// else if(node instanceof LOSort){
-// sb.append(planString(((LOSort)node).getSortColPlans()));
-// }
-// else if(node instanceof LOSplitOutput){
-// sb.append(planString(((LOSplitOutput)node).getConditionPlan()));
-// }
-// else if (node instanceof LOProject) {
-// sb.append("Input: ");
-// sb.append(((LOProject)node).getExpression().name());
-// }
-// }
-//
-// List<LogicalOperator> originalPredecessors = mPlan.getPredecessors(node);
-// if (originalPredecessors == null)
-// return sb.toString();
-//
-// List<LogicalOperator> predecessors = new ArrayList<LogicalOperator>(originalPredecessors);
-//
-// Collections.sort(predecessors);
-// int i = 0;
-// for (LogicalOperator pred : predecessors) {
-// i++;
-// String DFStr = depthFirst(pred);
-// if (DFStr != null) {
-// sb.append(LSep);
-// if (i < predecessors.size())
-// sb.append(shiftStringByTabs(DFStr, 2));
-// else
-// sb.append(shiftStringByTabs(DFStr, 1));
-// }
-// }
-// return sb.toString();
-// }
-//
-// private String shiftStringByTabs(String DFStr, int TabType) {
-// StringBuilder sb = new StringBuilder();
-// String[] spl = DFStr.split("\n");
-//
-// String tab = (TabType == 1) ? TAB1 : TABMore;
-//
-// sb.append(spl[0] + "\n");
-// for (int i = 1; i < spl.length; i++) {
-// sb.append(tab);
-// sb.append(spl[i]);
-// sb.append("\n");
-// }
-// return sb.toString();
-// }
-//
-// private void dispTabs() {
-// for (int i = 0; i < levelCntr; i++)
-// System.out.print(TAB1);
-// }
-// }
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java?rev=919634&r1=919633&r2=919634&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java Fri Mar 5 21:55:19 2010
@@ -120,14 +120,18 @@
LOFilter D = new LOFilter(lp, filterPlan);
D.neverUseForRealSetSchema(cschema);
// Connect D to B, since the transform has happened.
- lp.add(B, D, (LogicalRelationalOperator)null);
+ lp.add(D);
+ lp.connect(B, D);
// Now add in C, connected to A and D.
- lp.add(new LogicalRelationalOperator[] {A, D}, C, null);
+ lp.add(C);
+ lp.connect(A, C);
+ lp.connect(D, C);
changedPlan = new LogicalPlan();
changedPlan.add(D);
- changedPlan.add(D, C, (LogicalRelationalOperator)null);
+ changedPlan.add(C);
+ changedPlan.connect(D, C);
}
private static class SillySameVisitor extends AllSameVisitor {
@@ -156,7 +160,6 @@
public void testAllSameVisitor() throws IOException {
SillySameVisitor v = new SillySameVisitor(lp);
v.visit();
- System.out.println(v.toString());
assertTrue("LOLoad LOJoin LOLoad LOFilter ".equals(v.toString()) ||
"LOLoad LOFilter LOJoin LOLoad ".equals(v.toString()));