You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by gd...@apache.org on 2012/09/13 16:55:38 UTC
svn commit: r1384352 [2/4] - in /pig/trunk: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer...
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Thu Sep 13 14:55:36 2012
@@ -22,9 +22,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -40,6 +42,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
@@ -54,6 +57,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -94,7 +98,7 @@ import org.apache.pig.parser.SourceLocat
public class LogToPhyTranslationVisitor extends LogicalRelationalNodesVisitor {
private static final Log LOG = LogFactory.getLog(LogToPhyTranslationVisitor.class);
-
+
public LogToPhyTranslationVisitor(OperatorPlan plan) throws FrontendException {
super(plan, new DependencyOrderWalker(plan));
currentPlan = new PhysicalPlan();
@@ -111,7 +115,7 @@ public class LogToPhyTranslationVisitor
protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
protected PigContext pc;
-
+
public void setPigContext(PigContext pc) {
this.pc = pc;
}
@@ -119,16 +123,16 @@ public class LogToPhyTranslationVisitor
public Map<Operator, PhysicalOperator> getLogToPhyMap() {
return logToPhyMap;
}
-
+
public PhysicalPlan getPhysicalPlan() {
return currentPlan;
}
-
+
@Override
public void visit(LOLoad loLoad) throws FrontendException {
String scope = DEFAULT_SCOPE;
-// System.err.println("Entering Load");
- // The last parameter here is set to true as we assume all files are
+ // System.err.println("Entering Load");
+ // The last parameter here is set to true as we assume all files are
// splittable due to LoadStore Refactor
POLoad load = new POLoad(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), loLoad.getLoadFunc());
@@ -145,7 +149,7 @@ public class LogToPhyTranslationVisitor
// case it might have a store as a predecessor.
List<Operator> op = loLoad.getPlan().getPredecessors(loLoad);
PhysicalOperator from;
-
+
if(op != null) {
from = logToPhyMap.get(op.get(0));
try {
@@ -156,13 +160,13 @@ public class LogToPhyTranslationVisitor
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
-// System.err.println("Exiting Load");
+ // System.err.println("Exiting Load");
}
-
+
@Override
- public void visit(LONative loNative) throws FrontendException{
+ public void visit(LONative loNative) throws FrontendException{
String scope = DEFAULT_SCOPE;
-
+
PONative poNative = new PONative(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)));
poNative.addOriginalLocation(loNative.getAlias(), loNative.getLocation());
@@ -172,7 +176,7 @@ public class LogToPhyTranslationVisitor
logToPhyMap.put(loNative, poNative);
currentPlan.add(poNative);
-
+
List<Operator> op = loNative.getPlan().getPredecessors(loNative);
PhysicalOperator from;
@@ -183,7 +187,7 @@ public class LogToPhyTranslationVisitor
String msg = "Did not find a predecessor for Native." ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
}
-
+
try {
currentPlan.connect(from, poNative);
} catch (PlanException e) {
@@ -191,13 +195,13 @@ public class LogToPhyTranslationVisitor
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
-
+
}
-
+
@Override
public void visit(LOFilter filter) throws FrontendException {
String scope = DEFAULT_SCOPE;
-// System.err.println("Entering Filter");
+ // System.err.println("Entering Filter");
POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), filter.getRequestedParallelism());
poFilter.addOriginalLocation(filter.getAlias(), filter.getLocation());
@@ -208,8 +212,8 @@ public class LogToPhyTranslationVisitor
currentPlan = new PhysicalPlan();
-// PlanWalker childWalker = currentWalker
-// .spawnChildWalker(filter.getFilterPlan());
+ // PlanWalker childWalker = currentWalker
+ // .spawnChildWalker(filter.getFilterPlan());
PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(filter.getFilterPlan());
pushWalker(childWalker);
//currentWalker.walk(this);
@@ -231,7 +235,7 @@ public class LogToPhyTranslationVisitor
String msg = "Did not find a predecessor for Filter." ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
}
-
+
try {
currentPlan.connect(from, poFilter);
} catch (PlanException e) {
@@ -239,11 +243,11 @@ public class LogToPhyTranslationVisitor
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
-
+
translateSoftLinks(filter);
-// System.err.println("Exiting Filter");
+ // System.err.println("Exiting Filter");
}
-
+
@Override
public void visit(LOSort sort) throws FrontendException {
String scope = DEFAULT_SCOPE;
@@ -282,17 +286,17 @@ public class LogToPhyTranslationVisitor
// sort.setRequestedParallelism(s.getType());
logToPhyMap.put(sort, poSort);
currentPlan.add(poSort);
- List<Operator> op = sort.getPlan().getPredecessors(sort);
+ List<Operator> op = sort.getPlan().getPredecessors(sort);
PhysicalOperator from;
-
+
if(op != null) {
from = logToPhyMap.get(op.get(0));
} else {
int errCode = 2051;
String msg = "Did not find a predecessor for Sort." ;
- throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
}
-
+
try {
currentPlan.connect(from, poSort);
} catch (PlanException e) {
@@ -303,7 +307,254 @@ public class LogToPhyTranslationVisitor
poSort.setResultType(DataType.BAG);
}
-
+
+ /**
+ * Transformation from Logical to Physical Plan involves the following steps:
+ * First, it is generated a random number which will link a POCounter within a PORank.
+ * On this way, avoiding possible collisions on parallel rank operations.
+ * Then, if it is row number mode:
+ * <pre>
+ * In case of a RANK operation (row number mode), are used two steps:
+ * 1.- Each tuple is counted sequentially on each mapper, and are produced global counters
+ * 2.- Global counters are gathered and summed, each tuple calls to the respective counter value
+ * in order to calculate the corresponding rank value.
+ * </pre>
+ * or not:
+ * <pre>
+ * In case of a RANK BY operation, then are necessary five steps:
+ * 1.- Group by the fields involved on the rank operation: POPackage
+ * 2.- In case of multi-fields, the key (group field) is flatten: POForEach
+ * 3.- Sort operation by the fields available after flattening: POSort
+ * 4.- Each group is sequentially counted on each mapper through a global counter: POCounter
+ * 5.- Global counters are summed and passed to the rank operation: PORank
+ * </pre>
+ * @param loRank describe if the rank operation is on a row number mode
+ * or is rank by (dense or not)
+ **/
+ @Override
+ public void visit(LORank loRank) throws FrontendException {
+ String scope = DEFAULT_SCOPE;
+ PORank poRank;
+ POCounter poCounter;
+
+ Random randomGenerator = new Random();
+ Long operationID = Math.abs(randomGenerator.nextLong());
+
+ try {
+ // Physical operations for RANK operator:
+ // In case of a RANK BY operation, then are necessary five steps:
+ // 1.- Group by the fields involved on the rank operation: POPackage
+ // 2.- In case of multi-fields, the key (group field) is flatten: POForEach
+ // 3.- Sort operation by the fields available after flattening: POSort
+ // 4.- Each group is sequentially counted on each mapper through a global counter: POCounter
+ // 5.- Global counters are summed and passed to the rank operation: PORank
+ if(!loRank.isRowNumber()) {
+
+ boolean[] flags = {false};
+
+ MultiMap<Integer, LogicalExpressionPlan> expressionPlans = new MultiMap<Integer, LogicalExpressionPlan>();
+ for(int i = 0 ; i < loRank.getRankColPlans().size() ; i++)
+ expressionPlans.put(i,loRank.getRankColPlans());
+
+ POPackage poPackage = compileToLR_GR_PackTrio(loRank, null, flags, expressionPlans);
+ poPackage.setPackageType(PackageType.GROUP);
+ translateSoftLinks(loRank);
+
+ List<Boolean> flattenLst = Arrays.asList(true, false);
+
+ PhysicalPlan fep1 = new PhysicalPlan();
+ POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
+ feproj1.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+ feproj1.setColumn(0);
+ feproj1.setResultType(poPackage.getKeyType());
+ feproj1.setStar(false);
+ feproj1.setOverloaded(false);
+ fep1.add(feproj1);
+
+
+ PhysicalPlan fep2 = new PhysicalPlan();
+ POProject feproj2 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
+ feproj2.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+ feproj2.setColumn(1);
+ feproj2.setResultType(DataType.BAG);
+ feproj2.setStar(false);
+ feproj2.setOverloaded(false);
+ fep2.add(feproj2);
+ List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
+
+ POForEach poForEach = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1, fePlans, flattenLst);
+
+ List<LogicalExpressionPlan> rankPlans = loRank.getRankColPlans();
+ byte[] newTypes = new byte[rankPlans.size()];
+
+ for(int i = 0; i < rankPlans.size(); i++) {
+ LogicalExpressionPlan loep = rankPlans.get(i);
+ Iterator<Operator> inpOpers = loep.getOperators();
+
+ while(inpOpers.hasNext()) {
+ Operator oper = inpOpers.next();
+ newTypes[i] = ((ProjectExpression) oper).getType();
+ }
+ }
+
+ List<PhysicalPlan> newPhysicalPlan = new ArrayList<PhysicalPlan>();
+ List<Boolean> newOrderPlan = new ArrayList<Boolean>();
+
+ for(int i = 0; i < loRank.getRankColPlans().size(); i++) {
+ PhysicalPlan fep3 = new PhysicalPlan();
+ POProject feproj3 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
+ feproj3.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+ feproj3.setColumn(i);
+ feproj3.setResultType(newTypes[i]);
+ feproj3.setStar(false);
+ feproj3.setOverloaded(false);
+ fep3.add(feproj3);
+
+ newPhysicalPlan.add(fep3);
+ newOrderPlan.add(loRank.getAscendingCol().get(i));
+ }
+
+ POSort poSort;
+ poSort = new POSort(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), -1, null,
+ newPhysicalPlan, newOrderPlan, null);
+ poSort.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+
+
+ poCounter = new POCounter(
+ new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), -1 , null,
+ newPhysicalPlan, newOrderPlan);
+
+ poCounter.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+ poCounter.setResultType(DataType.TUPLE);
+ poCounter.setIsRowNumber(loRank.isRowNumber());
+ poCounter.setIsDenseRank(loRank.isDenseRank());
+ poCounter.setOperationID(String.valueOf(operationID));
+
+ poRank = new PORank(
+ new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), -1 , null,
+ newPhysicalPlan, newOrderPlan);
+
+ poRank.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+ poRank.setResultType(DataType.TUPLE);
+ poRank.setOperationID(String.valueOf(operationID));
+
+ List<Boolean> flattenLst2 = Arrays.asList(false, true);
+
+ PhysicalPlan fep12 = new PhysicalPlan();
+ POProject feproj12 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
+ feproj12.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+ feproj12.setColumn(0);
+ feproj12.setResultType(DataType.LONG);
+ feproj12.setStar(false);
+ feproj12.setOverloaded(false);
+ fep12.add(feproj12);
+
+
+ PhysicalPlan fep22 = new PhysicalPlan();
+ POProject feproj22 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
+ feproj22.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+ feproj22.setColumn(loRank.getRankColPlans().size()+1);
+ feproj22.setResultType(DataType.BAG);
+ feproj22.setStar(false);
+ feproj22.setOverloaded(false);
+ fep22.add(feproj22);
+ List<PhysicalPlan> fePlans2 = Arrays.asList(fep12, fep22);
+
+ POForEach poForEach2 = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1, fePlans2, flattenLst2);
+
+ currentPlan.add(poForEach);
+ currentPlan.add(poSort);
+ currentPlan.add(poCounter);
+ currentPlan.add(poRank);
+ currentPlan.add(poForEach2);
+
+ try {
+ currentPlan.connect(poPackage, poForEach);
+ currentPlan.connect(poForEach, poSort);
+ currentPlan.connect(poSort, poCounter);
+ currentPlan.connect(poCounter, poRank);
+ currentPlan.connect(poRank, poForEach2);
+ } catch (PlanException e) {
+ throw new LogicalToPhysicalTranslatorException(e.getMessage(),e.getErrorCode(),e.getErrorSource(),e);
+ }
+
+ logToPhyMap.put(loRank, poForEach2);
+
+ // In case of a RANK operation, are used two steps:
+ // 1.- Each tuple is counted sequentially on each mapper, and are produced global counters
+ // 2.- Global counters are gathered and summed, each tuple calls to the respective counter value
+ // in order to calculate the corresponding rank value.
+ } else {
+
+ List<LogicalExpressionPlan> logPlans = loRank.getRankColPlans();
+ List<PhysicalPlan> rankPlans = new ArrayList<PhysicalPlan>(logPlans.size());
+
+ // convert all the logical expression plans to physical expression plans
+ currentPlans.push(currentPlan);
+ for (LogicalExpressionPlan plan : logPlans) {
+ currentPlan = new PhysicalPlan();
+ PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(plan);
+ pushWalker(childWalker);
+ childWalker.walk(new ExpToPhyTranslationVisitor( currentWalker.getPlan(),
+ childWalker, loRank, currentPlan, logToPhyMap));
+ rankPlans.add(currentPlan);
+ popWalker();
+ }
+ currentPlan = currentPlans.pop();
+
+
+
+ poCounter = new POCounter(
+ new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), -1 , null,
+ rankPlans, loRank.getAscendingCol());
+
+ poCounter.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+ poCounter.setResultType(DataType.TUPLE);
+ poCounter.setIsRowNumber(loRank.isRowNumber());
+ poCounter.setIsDenseRank(loRank.isDenseRank());
+ poCounter.setOperationID(String.valueOf(operationID));
+
+ poRank = new PORank(
+ new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), -1 , null,
+ rankPlans, loRank.getAscendingCol());
+
+ poRank.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+ poRank.setResultType(DataType.TUPLE);
+ poRank.setOperationID(String.valueOf(operationID));
+
+ currentPlan.add(poCounter);
+ currentPlan.add(poRank);
+
+ List<Operator> op = loRank.getPlan().getPredecessors(loRank);
+ PhysicalOperator from;
+
+ if(op != null) {
+ from = logToPhyMap.get(op.get(0));
+ } else {
+ int errCode = 2051;
+ String msg = "Did not find a predecessor for Rank." ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
+ }
+
+ currentPlan.connect(from, poCounter);
+ currentPlan.connect(poCounter, poRank);
+
+ logToPhyMap.put(loRank, poRank);
+ }
+
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+
+ }
+
@Override
public void visit(LOCross cross) throws FrontendException {
String scope = DEFAULT_SCOPE;
@@ -334,20 +585,20 @@ public class LogToPhyTranslationVisitor
poGlobal.addOriginalLocation(cross.getAlias(), cross.getLocation());
currentPlan.add(poGlobal);
currentPlan.add(poPackage);
-
+
int count = 0;
-
+
try {
currentPlan.connect(poGlobal, poPackage);
List<Boolean> flattenLst = Arrays.asList(true, true);
-
+
for (Operator op : inputs) {
PhysicalPlan fep1 = new PhysicalPlan();
ConstantExpression ce1 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism());
ce1.setValue(inputs.size());
ce1.setResultType(DataType.INTEGER);
fep1.add(ce1);
-
+
ConstantExpression ce2 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism());
ce2.setValue(count);
ce2.setResultType(DataType.INTEGER);
@@ -357,7 +608,7 @@ public class LogToPhyTranslationVisitor
ce1val.set(1,count);
ce1.setValue(ce1val);
ce1.setResultType(DataType.TUPLE);*/
-
+
POUserFunc gfc = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism(), Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2), new FuncSpec(GFCross.class.getName()));
gfc.addOriginalLocation(cross.getAlias(), cross.getLocation());
gfc.setResultType(DataType.BAG);
@@ -366,7 +617,7 @@ public class LogToPhyTranslationVisitor
/*fep1.add(gfc);
fep1.connect(ce1, gfc);
fep1.connect(ce2, gfc);*/
-
+
PhysicalPlan fep2 = new PhysicalPlan();
POProject feproj = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism());
feproj.addOriginalLocation(cross.getAlias(), cross.getLocation());
@@ -375,12 +626,12 @@ public class LogToPhyTranslationVisitor
feproj.setOverloaded(false);
fep2.add(feproj);
List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
-
+
POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst );
fe.addOriginalLocation(cross.getAlias(), cross.getLocation());
currentPlan.add(fe);
currentPlan.connect(logToPhyMap.get(op), fe);
-
+
POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
scope, nodeGen.getNextNodeId(scope)), cross
.getRequestedParallelism());
@@ -395,13 +646,13 @@ public class LogToPhyTranslationVisitor
lrp1.add(lrproj1);
lrPlans.add(lrp1);
}
-
+
physOp.setCross(true);
physOp.setIndex(count++);
physOp.setKeyType(DataType.TUPLE);
physOp.setPlans(lrPlans);
physOp.setResultType(DataType.TUPLE);
-
+
currentPlan.add(physOp);
currentPlan.connect(fe, physOp);
currentPlan.connect(physOp, poGlobal);
@@ -415,7 +666,7 @@ public class LogToPhyTranslationVisitor
String msg = "Unable to set index on newly create POLocalRearrange.";
throw new VisitorException(msg, errCode, PigException.BUG, e);
}
-
+
poPackage.setKeyType(DataType.TUPLE);
poPackage.setResultType(DataType.TUPLE);
poPackage.setNumInps(count);
@@ -424,7 +675,7 @@ public class LogToPhyTranslationVisitor
inner[i] = true;
}
poPackage.setInner(inner);
-
+
List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
List<Boolean> flattenLst = new ArrayList<Boolean>();
for(int i=1;i<=count;i++){
@@ -437,7 +688,7 @@ public class LogToPhyTranslationVisitor
fePlans.add(fep1);
flattenLst.add(true);
}
-
+
POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst );
fe.addOriginalLocation(cross.getAlias(), cross.getLocation());
currentPlan.add(fe);
@@ -451,28 +702,28 @@ public class LogToPhyTranslationVisitor
logToPhyMap.put(cross, fe);
}
}
-
+
@Override
public void visit(LOStream stream) throws FrontendException {
String scope = DEFAULT_SCOPE;
POStream poStream = new POStream(new OperatorKey(scope, nodeGen
- .getNextNodeId(scope)), stream.getExecutableManager(),
+ .getNextNodeId(scope)), stream.getExecutableManager(),
stream.getStreamingCommand(), this.pc.getProperties());
poStream.addOriginalLocation(stream.getAlias(), stream.getLocation());
currentPlan.add(poStream);
logToPhyMap.put(stream, poStream);
-
+
List<Operator> op = stream.getPlan().getPredecessors(stream);
PhysicalOperator from;
if(op != null) {
from = logToPhyMap.get(op.get(0));
- } else {
+ } else {
int errCode = 2051;
String msg = "Did not find a predecessor for Stream." ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
}
-
+
try {
currentPlan.connect(from, poStream);
} catch (PlanException e) {
@@ -485,10 +736,10 @@ public class LogToPhyTranslationVisitor
@Override
public void visit(LOInnerLoad load) throws FrontendException {
String scope = DEFAULT_SCOPE;
-
+
POProject exprOp = new POProject(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)));
-
+
LogicalSchema s = load.getSchema();
if (load.sourceIsBag()) {
@@ -518,25 +769,25 @@ public class LogToPhyTranslationVisitor
exprOp.setColumn(load.getColNum());
}
// set input to POProject to the predecessor of foreach
-
+
logToPhyMap.put(load, exprOp);
currentPlan.add(exprOp);
}
-
+
@Override
public void visit(LOForEach foreach) throws FrontendException {
String scope = DEFAULT_SCOPE;
-
+
List<PhysicalPlan> innerPlans = new ArrayList<PhysicalPlan>();
-
+
org.apache.pig.newplan.logical.relational.LogicalPlan inner = foreach.getInnerPlan();
LOGenerate gen = (LOGenerate)inner.getSinks().get(0);
-
+
List<LogicalExpressionPlan> exps = gen.getOutputPlans();
List<Operator> preds = inner.getPredecessors(gen);
currentPlans.push(currentPlan);
-
+
// we need to translate each predecessor of LOGenerate into a physical plan.
// The physical plan should contain the expression plan for this predecessor plus
// the subtree starting with this predecessor
@@ -546,25 +797,25 @@ public class LogToPhyTranslationVisitor
PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(exps.get(i));
pushWalker(childWalker);
childWalker.walk(new ExpToPhyTranslationVisitor(exps.get(i),
- childWalker, gen, currentPlan, logToPhyMap));
+ childWalker, gen, currentPlan, logToPhyMap));
popWalker();
-
+
List<Operator> leaves = exps.get(i).getSinks();
for(Operator l: leaves) {
PhysicalOperator op = logToPhyMap.get(l);
if (l instanceof ProjectExpression ) {
- int input = ((ProjectExpression)l).getInputNum();
-
+ int input = ((ProjectExpression)l).getInputNum();
+
// for each sink projection, get its input logical plan and translate it
Operator pred = preds.get(input);
childWalker = new SubtreeDependencyOrderWalker(inner, pred);
pushWalker(childWalker);
childWalker.walk(this);
popWalker();
-
+
// get the physical operator of the leaf of input logical plan
- PhysicalOperator leaf = logToPhyMap.get(pred);
-
+ PhysicalOperator leaf = logToPhyMap.get(pred);
+
if (pred instanceof LOInnerLoad) {
// if predecessor is only an LOInnerLoad, remove the project that
// comes from LOInnerLoad and change the column of project that
@@ -582,19 +833,19 @@ public class LogToPhyTranslationVisitor
}else {
((POProject)op).setColumn(leafProj.getColumn() );
}
-
+
} catch (ExecException e) {
throw new FrontendException(foreach, "Cannot get column from "+leaf, 2230, e);
}
- }else{
+ }else{
currentPlan.connect(leaf, op);
}
}
}
innerPlans.add(currentPlan);
}
-
+
currentPlan = currentPlans.pop();
// PhysicalOperator poGen = new POGenerate(new OperatorKey("",
@@ -629,14 +880,13 @@ public class LogToPhyTranslationVisitor
translateSoftLinks(foreach);
}
-
+
/**
- * This function takes in a List of LogicalExpressionPlan and converts them to
+ * This function takes in a List of LogicalExpressionPlan and converts them to
* a list of PhysicalPlans
- *
* @param plans
* @return
- * @throws FrontendException
+ * @throws FrontendException
*/
private List<PhysicalPlan> translateExpressionPlans(LogicalRelationalOperator loj,
List<LogicalExpressionPlan> plans ) throws FrontendException {
@@ -644,40 +894,40 @@ public class LogToPhyTranslationVisitor
if( plans == null || plans.size() == 0 ) {
return exprPlans;
}
-
+
// Save the current plan onto stack
currentPlans.push(currentPlan);
-
+
for( LogicalExpressionPlan lp : plans ) {
currentPlan = new PhysicalPlan();
-
- // We spawn a new Dependency Walker and use it
+
+ // We spawn a new Dependency Walker and use it
// PlanWalker childWalker = currentWalker.spawnChildWalker(lp);
PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(lp);
-
+
// Save the old walker and use childWalker as current Walker
pushWalker(childWalker);
-
+
// We create a new ExpToPhyTranslationVisitor to walk the ExpressionPlan
currentWalker.walk(
- new ExpToPhyTranslationVisitor(
- currentWalker.getPlan(),
+ new ExpToPhyTranslationVisitor(
+ currentWalker.getPlan(),
childWalker, loj, currentPlan, logToPhyMap) );
-
+
exprPlans.add(currentPlan);
popWalker();
}
-
+
// Pop the current plan back out
currentPlan = currentPlans.pop();
return exprPlans;
}
-
+
@Override
public void visit(LOStore loStore) throws FrontendException {
String scope = DEFAULT_SCOPE;
-// System.err.println("Entering Store");
+ // System.err.println("Entering Store");
POStore store = new POStore(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)));
store.addOriginalLocation(loStore.getAlias(), loStore.getLocation());
@@ -687,17 +937,17 @@ public class LogToPhyTranslationVisitor
store.setSortInfo(loStore.getSortInfo());
store.setIsTmpStore(loStore.isTmpStore());
store.setStoreFunc(loStore.getStoreFunc());
-
+
store.setSchema(Util.translateSchema( loStore.getSchema() ));
currentPlan.add(store);
-
- List<Operator> op = loStore.getPlan().getPredecessors(loStore);
+
+ List<Operator> op = loStore.getPlan().getPredecessors(loStore);
PhysicalOperator from = null;
-
+
if(op != null) {
from = logToPhyMap.get(op.get(0));
- // TODO Implement sorting when we have a LOSort (new) and LOLimit (new) operator ready
+// TODO Implement sorting when we have a LOSort (new) and LOLimit (new) operator ready
// SortInfo sortInfo = null;
// // if store's predecessor is limit,
// // check limit's predecessor
@@ -725,9 +975,9 @@ public class LogToPhyTranslationVisitor
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
logToPhyMap.put(loStore, store);
-// System.err.println("Exiting Store");
+ // System.err.println("Exiting Store");
}
-
+
@Override
public void visit( LOCogroup cg ) throws FrontendException {
switch (cg.getGroupType()) {
@@ -747,7 +997,7 @@ public class LogToPhyTranslationVisitor
}
translateSoftLinks(cg);
}
-
+
private void translateCollectedCogroup(LOCogroup cg) throws FrontendException {
// can have only one input
LogicalRelationalOperator pred = (LogicalRelationalOperator) plan.getPredecessors(cg).get(0);
@@ -756,7 +1006,7 @@ public class LogToPhyTranslationVisitor
DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
physOp.addOriginalLocation(cg.getAlias(), cg.getLocation());
List<PhysicalPlan> pExprPlans = translateExpressionPlans(cg, exprPlans);
-
+
try {
physOp.setPlans(pExprPlans);
} catch (PlanException pe) {
@@ -775,7 +1025,7 @@ public class LogToPhyTranslationVisitor
physOp.setResultType(DataType.TUPLE);
currentPlan.add(physOp);
-
+
try {
currentPlan.connect(logToPhyMap.get(pred), physOp);
} catch (PlanException e) {
@@ -786,23 +1036,23 @@ public class LogToPhyTranslationVisitor
logToPhyMap.put(cg, physOp);
}
-
- private POMergeCogroup compileToMergeCogrp(LogicalRelationalOperator relationalOp,
+
+ private POMergeCogroup compileToMergeCogrp(LogicalRelationalOperator relationalOp,
MultiMap<Integer, LogicalExpressionPlan> innerPlans) throws FrontendException {
-
+
List<Operator> inputs = relationalOp.getPlan().getPredecessors(relationalOp);
- // LocalRearrange corresponding to each of input
+ // LocalRearrange corresponding to each of input
// LR is needed to extract keys out of the tuples.
-
+
POLocalRearrange[] innerLRs = new POLocalRearrange[inputs.size()];
int count = 0;
List<PhysicalOperator> inpPOs = new ArrayList<PhysicalOperator>(inputs.size());
-
+
for (int i=0;i<inputs.size();i++) {
Operator op = inputs.get(i);
PhysicalOperator physOp = logToPhyMap.get(op);
inpPOs.add(physOp);
-
+
List<LogicalExpressionPlan> plans = innerPlans.get(i);
POLocalRearrange poInnerLR = new POLocalRearrange(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
poInnerLR.addOriginalLocation(relationalOp.getAlias(), relationalOp.getLocation());
@@ -824,16 +1074,16 @@ public class LogToPhyTranslationVisitor
String msg = "Unable to set index on newly create POLocalRearrange.";
throw new VisitorException(msg, errCode, PigException.BUG, e1);
}
- poInnerLR.setKeyType(plans.size() > 1 ? DataType.TUPLE :
+ poInnerLR.setKeyType(plans.size() > 1 ? DataType.TUPLE :
exprPlans.get(0).getLeaves().get(0).getResultType());
poInnerLR.setResultType(DataType.TUPLE);
}
-
+
POMergeCogroup poCogrp = new POMergeCogroup(new OperatorKey(
DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)),inpPOs,innerLRs,relationalOp.getRequestedParallelism());
return poCogrp;
}
-
+
private void translateMergeCogroup(LOCogroup cg) throws FrontendException {
if(!validateMergeCogrp(cg.getInner())){
throw new LogicalToPhysicalTranslatorException("Inner is not " +
@@ -857,33 +1107,33 @@ public class LogToPhyTranslationVisitor
}
logToPhyMap.put(cg, poCogrp);
}
-
+
private boolean validateMergeCogrp(boolean[] innerFlags){
-
+
for(boolean flag : innerFlags){
if(flag)
return false;
}
return true;
}
-
+
@Override
public void visit(LOJoin loj) throws FrontendException {
String scope = DEFAULT_SCOPE;
-
- // List of join predicates
+
+ // List of join predicates
List<Operator> inputs = loj.getPlan().getPredecessors(loj);
-
+
// mapping of inner join physical plans corresponding to inner physical operators.
MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = new LinkedMultiMap<PhysicalOperator, PhysicalPlan>();
-
+
// Outer list corresponds to join predicates. Inner list is inner physical plan of each predicate.
List<List<PhysicalPlan>> ppLists = new ArrayList<List<PhysicalPlan>>();
-
+
// List of physical operator corresponding to join predicates.
List<PhysicalOperator> inp = new ArrayList<PhysicalOperator>();
-
+
// Outer list corresponds to join predicates and inner list corresponds to type of keys for each predicate.
List<List<Byte>> keyTypes = new ArrayList<List<Byte>>();
@@ -891,18 +1141,18 @@ public class LogToPhyTranslationVisitor
String alias = loj.getAlias();
SourceLocation location = loj.getLocation();
int parallel = loj.getRequestedParallelism();
-
+
for (int i=0;i<inputs.size();i++) {
Operator op = inputs.get(i);
PhysicalOperator physOp = logToPhyMap.get(op);
inp.add(physOp);
List<LogicalExpressionPlan> plans = (List<LogicalExpressionPlan>)loj.getJoinPlan(i);
-
+
List<PhysicalPlan> exprPlans = translateExpressionPlans(loj, plans);
ppLists.add(exprPlans);
joinPlans.put(physOp, exprPlans);
-
+
// Key could potentially be a tuple. So, we visit all exprPlans to get types of members of tuples.
List<Byte> tupleKeyMemberTypes = new ArrayList<Byte>();
for(PhysicalPlan exprPlan : exprPlans)
@@ -924,7 +1174,7 @@ public class LogToPhyTranslationVisitor
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
skj.setResultType(DataType.TUPLE);
-
+
for (int i=0; i < inputs.size(); i++) {
Operator op = inputs.get(i);
if (!innerFlags[i]) {
@@ -946,7 +1196,7 @@ public class LogToPhyTranslationVisitor
skj.addSchema(null);
}
}
-
+
currentPlan.add(skj);
for (Operator op : inputs) {
@@ -986,22 +1236,22 @@ public class LogToPhyTranslationVisitor
SchemaTupleFrontend.registerToGenerateIfPossible(keyToGen, false, GenContext.FR_JOIN);
keySchemas[i] = keyToGen;
}
-
+
int fragment = 0;
POFRJoin pfrj;
try {
boolean isLeftOuter = false;
- // We dont check for bounds issue as we assume that a join
+ // We dont check for bounds issue as we assume that a join
// involves atleast two inputs
isLeftOuter = !innerFlags[1];
-
+
Tuple nullTuple = null;
if( isLeftOuter ) {
try {
- // We know that in a Left outer join its only a two way
- // join, so we assume index of 1 for the right input
- LogicalSchema inputSchema = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
-
+ // We know that in a Left outer join its only a two way
+ // join, so we assume index of 1 for the right input
+ LogicalSchema inputSchema = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
+
// We check if we have a schema before the join
if(inputSchema == null) {
int errCode = 1109;
@@ -1009,21 +1259,21 @@ public class LogToPhyTranslationVisitor
"on which outer join is desired should have a valid schema";
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.INPUT);
}
-
- // Using the schema we decide the number of columns/fields
+
+ // Using the schema we decide the number of columns/fields
// in the nullTuple
nullTuple = TupleFactory.getInstance().newTuple(inputSchema.size());
for(int j = 0; j < inputSchema.size(); j++) {
nullTuple.set(j, null);
}
-
+
} catch( FrontendException e ) {
int errCode = 2104;
String msg = "Error while determining the schema of input";
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
-
+
pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),
parallel,
inp,
@@ -1055,9 +1305,9 @@ public class LogToPhyTranslationVisitor
logToPhyMap.put(loj, pfrj);
} else if ( (loj.getJoinType() == LOJoin.JOINTYPE.MERGE || loj.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE)
&& (new MapSideMergeValidator().validateMapSideMerge(inputs,loj.getPlan()))) {
-
+
PhysicalOperator smj;
- boolean usePOMergeJoin = inputs.size() == 2 && innerFlags[0] && innerFlags[1] ;
+ boolean usePOMergeJoin = inputs.size() == 2 && innerFlags[0] && innerFlags[1] ;
if(usePOMergeJoin){
// We register the merge join schema information for code generation
@@ -1111,7 +1361,7 @@ public class LogToPhyTranslationVisitor
// in all other cases we fall back to POMergeCogroup + Flattening FEs
smj = compileToMergeCogrp(loj, loj.getExpressionPlans());
}
-
+
smj.setResultType(DataType.TUPLE);
currentPlan.add(smj);
smj.addOriginalLocation(alias, location);
@@ -1135,9 +1385,9 @@ public class LogToPhyTranslationVisitor
} catch (PlanException e) {
throw new LogicalToPhysicalTranslatorException(e.getMessage(),e.getErrorCode(),e.getErrorSource(),e);
}
- logToPhyMap.put(loj, fe);
+ logToPhyMap.put(loj, fe);
}
-
+
return;
}
else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){
@@ -1155,8 +1405,8 @@ public class LogToPhyTranslationVisitor
}
translateSoftLinks(loj);
}
-
- private POPackage compileToLR_GR_PackTrio(LogicalRelationalOperator relationalOp, String customPartitioner,
+
+ private POPackage compileToLR_GR_PackTrio(LogicalRelationalOperator relationalOp, String customPartitioner,
boolean[] innerFlags, MultiMap<Integer, LogicalExpressionPlan> innerPlans) throws FrontendException {
POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
@@ -1221,18 +1471,18 @@ public class LogToPhyTranslationVisitor
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
-
+
poPackage.setKeyType(type);
poPackage.setResultType(DataType.TUPLE);
poPackage.setNumInps(count);
poPackage.setInner(innerFlags);
return poPackage;
}
-
- private POForEach compileFE4Flattening(boolean[] innerFlags,String scope,
+
+ private POForEach compileFE4Flattening(boolean[] innerFlags,String scope,
int parallel, String alias, SourceLocation location, List<Operator> inputs)
throws FrontendException {
-
+
List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
List<Boolean> flattenLst = new ArrayList<Boolean>();
POForEach fe;
@@ -1259,8 +1509,8 @@ public class LogToPhyTranslationVisitor
}
flattenLst.add(true);
}
-
- fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
+
+ fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
parallel, fePlans, flattenLst );
fe.addOriginalLocation(alias, location);
@@ -1293,7 +1543,7 @@ public class LogToPhyTranslationVisitor
}
}
}
-
+
@Override
public void visit(LODistinct loDistinct) throws FrontendException {
String scope = DEFAULT_SCOPE;
@@ -1313,7 +1563,7 @@ public class LogToPhyTranslationVisitor
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
-
+
@Override
public void visit(LOLimit loLimit) throws FrontendException {
String scope = DEFAULT_SCOPE;
@@ -1348,10 +1598,10 @@ public class LogToPhyTranslationVisitor
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
-
+
translateSoftLinks(loLimit);
}
-
+
@Override
public void visit(LOSplit loSplit) throws FrontendException {
String scope = DEFAULT_SCOPE;
@@ -1384,9 +1634,9 @@ public class LogToPhyTranslationVisitor
currentPlan.add(physOp);
- List<Operator> op = loSplit.getPlan().getPredecessors(loSplit);
+ List<Operator> op = loSplit.getPlan().getPredecessors(loSplit);
PhysicalOperator from;
-
+
if(op != null) {
from = logToPhyMap.get(op.get(0));
} else {
@@ -1403,11 +1653,11 @@ public class LogToPhyTranslationVisitor
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
-
+
@Override
public void visit(LOSplitOutput loSplitOutput) throws FrontendException {
String scope = DEFAULT_SCOPE;
-// System.err.println("Entering Filter");
+ // System.err.println("Entering Filter");
POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), loSplitOutput.getRequestedParallelism());
poFilter.addOriginalLocation(loSplitOutput.getAlias(), loSplitOutput.getLocation());
@@ -1441,7 +1691,7 @@ public class LogToPhyTranslationVisitor
String msg = "Did not find a predecessor for Filter." ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
}
-
+
try {
currentPlan.connect(from, poFilter);
} catch (PlanException e) {
@@ -1449,9 +1699,9 @@ public class LogToPhyTranslationVisitor
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
-
+
translateSoftLinks(loSplitOutput);
-// System.err.println("Exiting Filter");
+ // System.err.println("Exiting Filter");
}
/**
* updates plan with check for empty bag and if bag is empty to flatten a bag
@@ -1464,7 +1714,7 @@ public class LogToPhyTranslationVisitor
LogicalSchema inputSchema = null;
try {
inputSchema = ((LogicalRelationalOperator) joinInput).getSchema();
-
+
if(inputSchema == null) {
int errCode = 1109;
String msg = "Input (" + ((LogicalRelationalOperator) joinInput).getAlias() + ") " +
@@ -1476,9 +1726,9 @@ public class LogToPhyTranslationVisitor
String msg = "Error while determining the schema of input";
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
-
+
CompilerUtils.addEmptyBagOuterJoin(fePlan, Util.translateSchema(inputSchema));
-
+
}
private void translateSoftLinks(Operator op) throws FrontendException {
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java Thu Sep 13 14:55:36 2012
@@ -33,7 +33,7 @@ public abstract class LogicalRelationalN
protected LogicalRelationalNodesVisitor(OperatorPlan plan, PlanWalker walker) throws FrontendException {
super(plan, walker);
-
+
Iterator<Operator> iter = plan.getOperators();
while(iter.hasNext()) {
if (!(iter.next() instanceof LogicalRelationalOperator)) {
@@ -41,58 +41,61 @@ public abstract class LogicalRelationalN
}
}
}
-
+
public void visit(LOLoad load) throws FrontendException {
}
public void visit(LOFilter filter) throws FrontendException {
}
-
+
public void visit(LOStore store) throws FrontendException {
}
-
+
public void visit(LOJoin join) throws FrontendException {
}
-
+
public void visit(LOForEach foreach) throws FrontendException {
}
-
+
public void visit(LOGenerate gen) throws FrontendException {
}
-
+
public void visit(LOInnerLoad load) throws FrontendException {
}
public void visit(LOCube cube) throws FrontendException {
}
-
+
public void visit(LOCogroup loCogroup) throws FrontendException {
}
-
+
public void visit(LOSplit loSplit) throws FrontendException {
}
-
+
public void visit(LOSplitOutput loSplitOutput) throws FrontendException {
}
-
+
public void visit(LOUnion loUnion) throws FrontendException {
}
-
+
public void visit(LOSort loSort) throws FrontendException {
}
-
+
+ public void visit(LORank loRank) throws FrontendException{
+ }
+
public void visit(LODistinct loDistinct) throws FrontendException {
}
-
+
public void visit(LOLimit loLimit) throws FrontendException {
}
-
+
public void visit(LOCross loCross) throws FrontendException {
}
-
+
public void visit(LOStream loStream) throws FrontendException {
}
- public void visit(LONative nativeMR) throws FrontendException{
+ public void visit(LONative nativeMR) throws FrontendException{
}
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java Thu Sep 13 14:55:36 2012
@@ -43,6 +43,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -62,16 +63,16 @@ import org.apache.pig.newplan.logical.re
*/
public class ColumnPruneHelper {
protected static final String INPUTUIDS = "ColumnPrune:InputUids";
- public static final String OUTPUTUIDS = "ColumnPrune:OutputUids";
+ public static final String OUTPUTUIDS = "ColumnPrune:OutputUids";
protected static final String REQUIREDCOLS = "ColumnPrune:RequiredColumns";
-
+
private OperatorPlan currentPlan;
private OperatorSubPlan subPlan;
public ColumnPruneHelper(OperatorPlan currentPlan) {
this.currentPlan = currentPlan;
- }
-
+ }
+
private OperatorSubPlan getSubPlan() throws FrontendException {
OperatorSubPlan p = null;
if (currentPlan instanceof OperatorSubPlan) {
@@ -80,35 +81,35 @@ public class ColumnPruneHelper {
p = new OperatorSubPlan(currentPlan);
}
Iterator<Operator> iter = currentPlan.getOperators();
-
+
while(iter.hasNext()) {
Operator op = iter.next();
if (op instanceof LOForEach) {
addOperator(op, p);
}
}
-
+
return p;
}
-
+
private void addOperator(Operator op, OperatorSubPlan subplan) throws FrontendException {
if (op == null) {
return;
}
-
+
subplan.add(op);
-
+
List<Operator> ll = currentPlan.getPredecessors(op);
if (ll == null) {
return;
}
-
+
for(Operator pred: ll) {
addOperator(pred, subplan);
}
}
-
-
+
+
@SuppressWarnings("unchecked")
public boolean check() throws FrontendException {
List<Operator> sources = currentPlan.getSources();
@@ -117,14 +118,14 @@ public class ColumnPruneHelper {
clearAnnotation();
return false;
}
-
+
// create sub-plan that ends with foreach
subPlan = getSubPlan();
if (subPlan.size() == 0) {
clearAnnotation();
return false;
}
-
+
ColumnDependencyVisitor v = new ColumnDependencyVisitor(currentPlan);
try {
v.visit();
@@ -133,7 +134,7 @@ public class ColumnPruneHelper {
clearAnnotation();
return false;
}
-
+
List<Operator> ll = subPlan.getSources();
boolean found = false;
for(Operator op: ll) {
@@ -141,20 +142,20 @@ public class ColumnPruneHelper {
Set<Long> uids = (Set<Long>)op.getAnnotation(INPUTUIDS);
LogicalSchema s = ((LOLoad) op).getSchema();
Set<Integer> required = getColumns(s, uids);
-
+
if (required.size() < s.size()) {
- op.annotate(REQUIREDCOLS, required);
+ op.annotate(REQUIREDCOLS, required);
found = true;
}
}
}
-
+
if (!found)
clearAnnotation();
-
+
return found;
}
-
+
private void clearAnnotation() {
Iterator<Operator> iter = currentPlan.getOperators();
while (iter.hasNext()) {
@@ -170,7 +171,7 @@ public class ColumnPruneHelper {
if (schema == null) {
throw new SchemaNotDefinedException("Schema is not defined.");
}
-
+
Set<Integer> cols = new HashSet<Integer>();
Iterator<Long> iter = uids.iterator();
while(iter.hasNext()) {
@@ -179,32 +180,32 @@ public class ColumnPruneHelper {
if (index == -1) {
throw new FrontendException("UID " + uid + " is not found in the schema " + schema, 2241);
}
-
+
cols.add(index);
}
-
+
return cols;
}
-
+
public OperatorPlan reportChanges() {
return subPlan;
}
-
+
// Visitor to calculate the input and output uids for each operator
// It doesn't change the plan, only put calculated info as annotations
// The input and output uids are not necessarily the top level uids of
// a schema. They may be the uids of lower level fields of complex fields
// that have their own schema.
- static private class ColumnDependencyVisitor extends LogicalRelationalNodesVisitor {
-
+ static private class ColumnDependencyVisitor extends LogicalRelationalNodesVisitor {
+
public ColumnDependencyVisitor(OperatorPlan plan) throws FrontendException {
- super(plan, new ReverseDependencyOrderWalker(plan));
+ super(plan, new ReverseDependencyOrderWalker(plan));
}
-
+
@Override
public void visit(LOLoad load) throws FrontendException {
Set<Long> output = setOutputUids(load);
-
+
// for load, input uids are same as output uids
load.annotate(INPUTUIDS, output);
}
@@ -212,71 +213,71 @@ public class ColumnPruneHelper {
@Override
public void visit(LOFilter filter) throws FrontendException {
Set<Long> output = setOutputUids(filter);
-
+
// the input uids contains all the output uids and
// projections in filter conditions
Set<Long> input = new HashSet<Long>(output);
-
+
LogicalExpressionPlan exp = filter.getFilterPlan();
collectUids(filter, exp, input);
-
+
filter.annotate(INPUTUIDS, input);
}
-
+
@Override
public void visit(LOStore store) throws FrontendException {
- Set<Long> output = setOutputUids(store);
-
+ Set<Long> output = setOutputUids(store);
+
if (output.isEmpty()) {
// to deal with load-store-load-store case
LogicalSchema s = store.getSchema();
if (s == null) {
throw new SchemaNotDefinedException("Schema for " + store.getName() + " is not defined.");
}
-
+
for(int i=0; i<s.size(); i++) {
output.add(s.getField(i).uid);
- }
- }
-
+ }
+ }
+
// for store, input uids are same as output uids
store.annotate(INPUTUIDS, output);
}
-
+
@Override
public void visit(LOJoin join) throws FrontendException {
Set<Long> output = setOutputUids(join);
-
+
// the input uids contains all the output uids and
// projections in join expressions
Set<Long> input = new HashSet<Long>(output);
-
+
Collection<LogicalExpressionPlan> exps = join.getExpressionPlanValues();
Iterator<LogicalExpressionPlan> iter = exps.iterator();
while(iter.hasNext()) {
LogicalExpressionPlan exp = iter.next();
collectUids(join, exp, input);
}
-
+
join.annotate(INPUTUIDS, input);
}
-
+
@Override
public void visit(LOCogroup cg) throws FrontendException {
Set<Long> output = setOutputUids(cg);
-
+
// the input uids contains all the output uids and
// projections in join expressions
Set<Long> input = new HashSet<Long>();
-
+
// Add all the uids required for doing cogroup. As in all the
// keys on which the cogroup is done.
for( LogicalExpressionPlan plan : cg.getExpressionPlans().values() ) {
collectUids(cg, plan, input);
}
-
+
// Now check for the case where the output uid is a generated one
- // If that is the case we need to add the uids which generated it in
+ // If that is the case we need to add the uids which generated it in
// the input
long firstUid=-1;
Map<Integer,Long> generatedInputUids = cg.getGeneratedInputUids();
@@ -291,34 +292,34 @@ public class ColumnPruneHelper {
if (pred.getSchema()!=null)
firstUid = pred.getSchema().getField(0).uid;
}
-
+
if (input.isEmpty() && firstUid!=-1) {
input.add(firstUid);
}
-
+
cg.annotate(INPUTUIDS, input);
}
-
+
@Override
public void visit(LOLimit limit) throws FrontendException {
Set<Long> output = setOutputUids(limit);
-
+
// the input uids contains all the output uids and
// projections in limit expression
Set<Long> input = new HashSet<Long>(output);
-
+
LogicalExpressionPlan exp = limit.getLimitPlan();
if (exp != null)
collectUids(limit, exp, input);
-
+
limit.annotate(INPUTUIDS, input);
}
-
+
@Override
public void visit(LOStream stream) throws FrontendException {
// output is not used, setOutputUids is used to check if it has output schema
Set<Long> output = setOutputUids(stream);
-
+
// Every field is required
LogicalRelationalOperator pred = (LogicalRelationalOperator)plan.getPredecessors(stream).get(0);
@@ -326,23 +327,23 @@ public class ColumnPruneHelper {
stream.annotate(INPUTUIDS, input);
}
-
+
@Override
public void visit(LODistinct distinct) throws FrontendException {
Set<Long> input = new HashSet<Long>();
-
+
// Every field is required
LogicalSchema s = distinct.getSchema();
if (s == null) {
throw new SchemaNotDefinedException("Schema for " + distinct.getName() + " is not defined.");
}
-
+
for(int i=0; i<s.size(); i++) {
input.add(s.getField(i).uid);
- }
+ }
distinct.annotate(INPUTUIDS, input);
}
-
+
@Override
public void visit(LOCross cross) throws FrontendException {
Set<Long> output = setOutputUids(cross);
@@ -362,7 +363,7 @@ public class ColumnPruneHelper {
}
cross.annotate(INPUTUIDS, output);
}
-
+
@Override
public void visit(LOUnion union) throws FrontendException {
Set<Long> output = setOutputUids(union);
@@ -372,54 +373,67 @@ public class ColumnPruneHelper {
}
union.annotate(INPUTUIDS, input);
}
-
+
@Override
public void visit(LOSplit split) throws FrontendException {
Set<Long> output = setOutputUids(split);
split.annotate(INPUTUIDS, output);
}
-
+
@Override
public void visit(LOSplitOutput splitOutput) throws FrontendException {
Set<Long> output = setOutputUids(splitOutput);
-
+
// the input uids contains all the output uids and
// projections in splitOutput conditions
Set<Long> input = new HashSet<Long>();
-
+
for (long uid : output) {
input.add(splitOutput.getInputUids(uid));
}
-
+
LogicalExpressionPlan exp = splitOutput.getFilterPlan();
collectUids(splitOutput, exp, input);
-
+
splitOutput.annotate(INPUTUIDS, input);
}
-
+
@Override
public void visit(LOSort sort) throws FrontendException {
Set<Long> output = setOutputUids(sort);
-
+
Set<Long> input = new HashSet<Long>(output);
-
+
for (LogicalExpressionPlan exp : sort.getSortColPlans()) {
collectUids(sort, exp, input);
}
-
+
sort.annotate(INPUTUIDS, input);
}
-
+
+ @Override
+ public void visit(LORank rank) throws FrontendException {
+ Set<Long> output = setOutputUids(rank);
+
+ Set<Long> input = new HashSet<Long>(output);
+
+ for (LogicalExpressionPlan exp : rank.getRankColPlans()) {
+ collectUids(rank, exp, input);
+ }
+
+ rank.annotate(INPUTUIDS, input);
+ }
+
/*
* This function returns all uids present in the given schema
*/
- private Set<Long> getAllUids( LogicalSchema schema ) {
+ private Set<Long> getAllUids( LogicalSchema schema ) {
Set<Long> uids = new HashSet<Long>();
-
+
if( schema == null ) {
return uids;
}
-
+
for( LogicalFieldSchema field : schema.getFields() ) {
if( ( field.type == DataType.TUPLE || field.type == DataType.BAG )
&& field.schema != null ) {
@@ -429,19 +443,19 @@ public class ColumnPruneHelper {
}
return uids;
}
-
+
@SuppressWarnings("unchecked")
@Override
public void visit(LOForEach foreach) throws FrontendException {
Set<Long> output = setOutputUids(foreach);
-
+
LOGenerate gen = OptimizerUtils.findGenerate(foreach);
gen.annotate(OUTPUTUIDS, output);
-
+
visit(gen);
-
+
Set<Long> input = (Set<Long>)gen.getAnnotation(INPUTUIDS);
-
+
// Make sure at least one column will retain
if (input.isEmpty()) {
LogicalRelationalOperator pred = (LogicalRelationalOperator)plan.getPredecessors(foreach).get(0);
@@ -455,11 +469,11 @@ public class ColumnPruneHelper {
@SuppressWarnings("unchecked")
public void visit(LOGenerate gen) throws FrontendException {
Set<Long> output = (Set<Long>)gen.getAnnotation(OUTPUTUIDS);
-
+
Set<Long> input = new HashSet<Long>();
-
+
List<LogicalExpressionPlan> ll = gen.getOutputPlans();
-
+
Iterator<Long> iter = output.iterator();
while(iter.hasNext()) {
long uid = iter.next();
@@ -473,7 +487,7 @@ public class ColumnPruneHelper {
break;
}
}
-
+
if (found) {
List<Operator> srcs = exp.getSinks();
for (Operator src : srcs) {
@@ -500,7 +514,7 @@ public class ColumnPruneHelper {
}
}
}
-
+
// for the flatten bag, we need to make sure at least one field is in the input
for(int i=0; i<ll.size(); i++) {
if (!gen.getFlattenFlags()[i]) {
@@ -535,13 +549,13 @@ public class ColumnPruneHelper {
}
gen.annotate(INPUTUIDS, input);
}
-
+
@Override
public void visit(LOInnerLoad load) throws FrontendException {
Set<Long> output = setOutputUids(load);
load.annotate(INPUTUIDS, output);
}
-
+
private void collectUids(LogicalRelationalOperator currentOp, LogicalExpressionPlan exp, Set<Long> uids) throws FrontendException {
List<Operator> ll = exp.getSinks();
for(Operator op: ll) {
@@ -562,20 +576,20 @@ public class ColumnPruneHelper {
}
}
}
-
+
@SuppressWarnings("unchecked")
// Get output uid from output schema. If output schema does not exist,
// throw exception
private Set<Long> setOutputUids(LogicalRelationalOperator op) throws FrontendException {
-
+
List<Operator> ll = plan.getSuccessors(op);
Set<Long> uids = new HashSet<Long>();
-
+
LogicalSchema s = op.getSchema();
if (s == null) {
throw new SchemaNotDefinedException("Schema for " + op.getName() + " is not defined.");
}
-
+
if (ll != null) {
// if this is not sink, the output uids are union of input uids of its successors
for(Operator succ: ll) {
@@ -584,7 +598,7 @@ public class ColumnPruneHelper {
Iterator<Long> iter = inputUids.iterator();
while(iter.hasNext()) {
long uid = iter.next();
-
+
if (s.findField(uid) != -1) {
uids.add(uid);
}
@@ -592,12 +606,12 @@ public class ColumnPruneHelper {
}
}
} else {
- // if it's leaf, set to its schema
+ // if it's leaf, set to its schema
for(int i=0; i<s.size(); i++) {
uids.add(s.getField(i).uid);
- }
- }
-
+ }
+ }
+
op.annotate(OUTPUTUIDS, uids);
return uids;
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java Thu Sep 13 14:55:36 2012
@@ -48,6 +48,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -70,20 +71,20 @@ public class ColumnPruneVisitor extends
this.columnPrune = columnPrune;
this.requiredItems = requiredItems;
}
-
+
public void addRequiredItems(LOLoad load, Pair<Map<Integer,Set<String>>,Set<Integer>> requiredItem) {
requiredItems.put(load, requiredItem);
}
-
+
@Override
public void visit(LOLoad load) throws FrontendException {
if(! requiredItems.containsKey( load ) ) {
return;
}
-
- Pair<Map<Integer,Set<String>>,Set<Integer>> required =
+
+ Pair<Map<Integer,Set<String>>,Set<Integer>> required =
requiredItems.get(load);
-
+
RequiredFieldList requiredFields = new RequiredFieldList();
LogicalSchema s = load.getSchema();
@@ -109,15 +110,15 @@ public class ColumnPruneVisitor extends
requiredField = new RequiredField();
requiredField.setIndex(i);
requiredField.setAlias(s.getField(i).alias);
- requiredField.setType(s.getField(i).type);
+ requiredField.setType(s.getField(i).type);
requiredFields.add(requiredField);
}
}
-
+
boolean[] columnRequired = new boolean[s.size()];
for (RequiredField rf : requiredFields.getFields())
columnRequired[rf.getIndex()] = true;
-
+
List<Pair<Integer, Integer>> pruneList = new ArrayList<Pair<Integer, Integer>>();
for (int i=0;i<columnRequired.length;i++)
{
@@ -136,7 +137,7 @@ public class ColumnPruneVisitor extends
}
log.info(message);
}
-
+
message = new StringBuffer();
for(RequiredField rf: requiredFields.getFields()) {
List<RequiredField> sub = rf.getSubFields();
@@ -146,71 +147,71 @@ public class ColumnPruneVisitor extends
}
if (message.length()!=0)
log.info(message);
-
+
LoadPushDown.RequiredFieldResponse response = null;
try {
LoadFunc loadFunc = load.getLoadFunc();
if (loadFunc instanceof LoadPushDown) {
response = ((LoadPushDown)loadFunc).pushProjection(requiredFields);
}
-
+
} catch (FrontendException e) {
log.warn("pushProjection on "+load+" throw an exception, skip it");
- }
-
- // Loader does not support column pruning, insert foreach
+ }
+
+ // Loader does not support column pruning, insert foreach
if (columnPrune) {
if (response==null || !response.getRequiredFieldResponse()) {
- LogicalPlan p = (LogicalPlan)load.getPlan();
- Operator next = p.getSuccessors(load).get(0);
- // if there is already a LOForEach after load, we don't need to
+ LogicalPlan p = (LogicalPlan)load.getPlan();
+ Operator next = p.getSuccessors(load).get(0);
+ // if there is already a LOForEach after load, we don't need to
// add another LOForEach
if (next instanceof LOForEach) {
return;
}
-
+
LOForEach foreach = new LOForEach(load.getPlan());
-
- // add foreach to the base plan
+
+ // add foreach to the base plan
p.add(foreach);
-
+
p.insertBetween(load, foreach, next);
-
+
LogicalPlan innerPlan = new LogicalPlan();
foreach.setInnerPlan(innerPlan);
-
+
// build foreach inner plan
- List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
+ List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
LOGenerate gen = new LOGenerate(innerPlan, exps, new boolean[requiredFields.getFields().size()]);
innerPlan.add(gen);
-
+
for (int i=0; i<requiredFields.getFields().size(); i++) {
LoadPushDown.RequiredField rf = requiredFields.getFields().get(i);
- LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, rf.getIndex());
- innerPlan.add(innerLoad);
+ LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, rf.getIndex());
+ innerPlan.add(innerLoad);
innerPlan.connect(innerLoad, gen);
-
+
LogicalExpressionPlan exp = new LogicalExpressionPlan();
ProjectExpression prj = new ProjectExpression(exp, i, -1, gen);
exp.add(prj);
exps.add(exp);
- }
-
+ }
+
} else {
// columns are pruned, reset schema for LOLoader
List<Integer> requiredIndexes = new ArrayList<Integer>();
List<LoadPushDown.RequiredField> fieldList = requiredFields.getFields();
- for (int i=0; i<fieldList.size(); i++) {
+ for (int i=0; i<fieldList.size(); i++) {
requiredIndexes.add(fieldList.get(i).getIndex());
}
load.setRequiredFields(requiredIndexes);
-
+
LogicalSchema newSchema = new LogicalSchema();
- for (int i=0; i<fieldList.size(); i++) {
+ for (int i=0; i<fieldList.size(); i++) {
newSchema.addField(s.getField(fieldList.get(i).getIndex()));
}
-
+
load.setSchema(newSchema);
}
}
@@ -219,15 +220,15 @@ public class ColumnPruneVisitor extends
@Override
public void visit(LOFilter filter) throws FrontendException {
}
-
+
@Override
public void visit(LOLimit limit) throws FrontendException {
}
-
+
@Override
public void visit(LOSplitOutput splitOutput) throws FrontendException {
}
-
+
@SuppressWarnings("unchecked")
@Override
public void visit(LOSplit split) throws FrontendException {
@@ -235,15 +236,15 @@ public class ColumnPruneVisitor extends
for (int i=0;i<branchOutputs.size();i++) {
Operator branchOutput = branchOutputs.get(i);
Set<Long> branchOutputUids = (Set<Long>)branchOutput.getAnnotation(ColumnPruneHelper.INPUTUIDS);
-
+
if (branchOutputUids!=null) {
Set<Integer> columnsToDrop = new HashSet<Integer>();
-
+
for (int j=0;j<split.getSchema().size();j++) {
if (!branchOutputUids.contains(split.getSchema().getField(j).uid))
columnsToDrop.add(j);
}
-
+
if (!columnsToDrop.isEmpty()) {
LOForEach foreach = Util.addForEachAfter((LogicalPlan)split.getPlan(), split, i, columnsToDrop);
foreach.getSchema();
@@ -251,38 +252,42 @@ public class ColumnPruneVisitor extends
}
}
}
-
+
@Override
public void visit(LOSort sort) throws FrontendException {
}
-
+
+ @Override
+ public void visit(LORank rank) throws FrontendException {
+ }
+
@Override
public void visit(LOStore store) throws FrontendException {
}
-
+
@Override
public void visit( LOCogroup cg ) throws FrontendException {
addForEachIfNecessary(cg);
}
-
+
@Override
public void visit(LOJoin join) throws FrontendException {
}
-
+
@Override
public void visit(LOCross cross) throws FrontendException {
}
-
+
@Override
@SuppressWarnings("unchecked")
public void visit(LOForEach foreach) throws FrontendException {
if (!columnPrune) {
return;
}
-
+
// get column numbers from input uids
Set<Long> inputUids = (Set<Long>)foreach.getAnnotation(ColumnPruneHelper.INPUTUIDS);
-
+
// Get all top level projects
LogicalPlan innerPlan = foreach.getInnerPlan();
List<LOInnerLoad> innerLoads= new ArrayList<LOInnerLoad>();
@@ -291,7 +296,7 @@ public class ColumnPruneVisitor extends
if (s instanceof LOInnerLoad)
innerLoads.add((LOInnerLoad)s);
}
-
+
// If project of the innerLoad is not in INPUTUIDS, remove this innerLoad
Set<LOInnerLoad> innerLoadsToRemove = new HashSet<LOInnerLoad>();
for (LOInnerLoad innerLoad: innerLoads) {
@@ -308,7 +313,7 @@ public class ColumnPruneVisitor extends
innerLoadsToRemove.add(innerLoad);
}
}
-
+
// Find the logical operator immediate precede LOGenerate which should be removed (the whole branch)
Set<LogicalRelationalOperator> branchHeadToRemove = new HashSet<LogicalRelationalOperator>();
for (LOInnerLoad innerLoad : innerLoadsToRemove) {
@@ -318,16 +323,16 @@ public class ColumnPruneVisitor extends
}
branchHeadToRemove.add((LogicalRelationalOperator)op);
}
-
+
// Find the expression plan to remove
LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
List<LogicalExpressionPlan> genPlansToRemove = new ArrayList<LogicalExpressionPlan>();
-
+
List<LogicalExpressionPlan> genPlans = gen.getOutputPlans();
for (int i=0;i<genPlans.size();i++) {
LogicalExpressionPlan expPlan = genPlans.get(i);
List<Operator> expSources = expPlan.getSinks();
-
+
for (Operator expSrc : expSources) {
if (expSrc instanceof ProjectExpression) {
LogicalRelationalOperator reference = ((ProjectExpression)expSrc).findReferent();
@@ -337,7 +342,7 @@ public class ColumnPruneVisitor extends
}
}
}
-
+
// Build the temporary structure based on genPlansToRemove, which include:
// * flattenList
// * outputPlanSchemas
@@ -352,10 +357,10 @@ public class ColumnPruneVisitor extends
List<LogicalSchema> outputPlanSchemas = new ArrayList<LogicalSchema>();
List<LogicalSchema> uidOnlySchemas = new ArrayList<LogicalSchema>();
List<LogicalSchema> userDefinedSchemas = null;
-
+
if (gen.getUserDefinedSchema()!=null)
userDefinedSchemas = new ArrayList<LogicalSchema>();
-
+
for (int i=0;i<genPlans.size();i++) {
LogicalExpressionPlan genPlan = genPlans.get(i);
if (!genPlansToRemove.contains(genPlan)) {
@@ -373,17 +378,17 @@ public class ColumnPruneVisitor extends
}
}
}
-
+
List<Operator> preds = innerPlan.getPredecessors(gen);
-
+
if (preds!=null) { // otherwise, all gen plan are based on constant, no need to adjust
for (int i=0;i<preds.size();i++) {
if (!inputsNeeded.contains(i))
inputsRemoved.add(i);
}
}
-
-
+
+
// Change LOGenerate: remove unneeded output expression plan
// change flatten flag, outputPlanSchema, uidOnlySchemas
boolean[] flatten = new boolean[flattenList.size()];
@@ -394,11 +399,11 @@ public class ColumnPruneVisitor extends
gen.setOutputPlanSchemas(outputPlanSchemas);
gen.setUidOnlySchemas(uidOnlySchemas);
gen.setUserDefinedSchema(userDefinedSchemas);
-
+
for (LogicalExpressionPlan genPlanToRemove : genPlansToRemove) {
genPlans.remove(genPlanToRemove);
}
-
+
// shift project input
if (!inputsRemoved.isEmpty()) {
for (LogicalExpressionPlan genPlan : genPlans) {
@@ -416,7 +421,7 @@ public class ColumnPruneVisitor extends
}
}
}
-
+
// Prune unneeded LOInnerLoad
List<LogicalRelationalOperator> predToRemove = new ArrayList<LogicalRelationalOperator>();
for (int i : inputsRemoved) {
@@ -426,18 +431,18 @@ public class ColumnPruneVisitor extends
removeSubTree(pred);
}
}
-
+
@Override
public void visit(LOUnion union) throws FrontendException {
// AddForEach before union if necessary.
List<Operator> preds = new ArrayList<Operator>();
preds.addAll(plan.getPredecessors(union));
-
+
for (Operator pred : preds) {
addForEachIfNecessary((LogicalRelationalOperator)pred);
}
}
-
+
// remove all the operators starting from an operator
private void removeSubTree(LogicalRelationalOperator op) throws FrontendException {
LogicalPlan p = (LogicalPlan)op.getPlan();
@@ -447,14 +452,14 @@ public class ColumnPruneVisitor extends
removeSubTree((LogicalRelationalOperator)pred);
}
}
-
+
if (p.getSuccessors(op) != null) {
- Operator[] succs = p.getSuccessors(op).toArray(new Operator[0]);
+ Operator[] succs = p.getSuccessors(op).toArray(new Operator[0]);
for(Operator s: succs) {
p.disconnect(op, s);
}
}
-
+
p.remove(op);
}
@@ -465,16 +470,16 @@ public class ColumnPruneVisitor extends
if (outputUids!=null) {
LogicalSchema schema = op.getSchema();
Set<Integer> columnsToDrop = new HashSet<Integer>();
-
+
for (int i=0;i<schema.size();i++) {
if (!outputUids.contains(schema.getField(i).uid))
columnsToDrop.add(i);
}
-
+
if (!columnsToDrop.isEmpty()) {
LOForEach foreach = Util.addForEachAfter((LogicalPlan)op.getPlan(), op, 0, columnsToDrop);
foreach.getSchema();
}
}
- }
+ }
}