You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/04 19:46:48 UTC
svn commit: r982345 [1/13] - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/experimental/ src/org/apache/pig/newplan/
src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/log...
Author: daijy
Date: Wed Aug 4 17:46:42 2010
New Revision: 982345
URL: http://svn.apache.org/viewvc?rev=982345&view=rev
Log:
PIG-1178: LogicalPlan and Optimizer are too complex and hard to work with (PIG-1178-5.patch)
Added:
hadoop/pig/trunk/src/org/apache/pig/newplan/
hadoop/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java
hadoop/pig/trunk/src/org/apache/pig/newplan/DependencyOrderWalker.java
hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstMemoryWalker.java
hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstWalker.java
hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java
hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorPlan.java
hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorSubPlan.java
hadoop/pig/trunk/src/org/apache/pig/newplan/PlanEdge.java
hadoop/pig/trunk/src/org/apache/pig/newplan/PlanVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/PlanWalker.java
hadoop/pig/trunk/src/org/apache/pig/newplan/ReverseDependencyOrderWalker.java
hadoop/pig/trunk/src/org/apache/pig/newplan/SubtreeDependencyOrderWalker.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/AddExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/AllSameExpressionVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/AndExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/BagDereferenceExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/BinaryExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/CastExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ColumnExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ConstantExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DivideExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/EqualExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/GreaterThanEqualExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/GreaterThanExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/IsNullExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LessThanEqualExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LessThanExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ModExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/MultiplyExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/NegativeExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/NotEqualExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/NotExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/OrExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/RegexExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/SubtractExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UnaryExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameRalationalNodesVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ExprPrinter.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/PlanPrinter.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaPatcher.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidStamper.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOForEach.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlanVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/SchemaNotDefinedException.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/AddForEach.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnMapKeyPrune.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/SplitFilter.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/WholePlanRule.java
hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/
hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/PlanTransformListener.java
hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Rule.java
hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Transformer.java
hadoop/pig/trunk/test/newlogicalplan-tests
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune2.java
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanListener.java
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanLogToPhyTranslationVisitor.java
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanOperatorPlan.java
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanPruneMapKeys.java
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanRule.java
Removed:
hadoop/pig/trunk/src/org/apache/pig/experimental/
hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalColumnPrune.java
hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterAboveForeach.java
hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterRule.java
hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java
hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java
hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java
hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalOperatorPlan.java
hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalPruneMapKeys.java
hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalRule.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumnNewLogicalPlan.java
Modified:
hadoop/pig/trunk/build.xml
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
hadoop/pig/trunk/test/org/apache/pig/test/Util.java
Modified: hadoop/pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/build.xml?rev=982345&r1=982344&r2=982345&view=diff
==============================================================================
--- hadoop/pig/trunk/build.xml (original)
+++ hadoop/pig/trunk/build.xml Wed Aug 4 17:46:42 2010
@@ -80,6 +80,7 @@
<property name="test.timeout" value="2700000" />
<property name="test.junit.output.format" value="plain" />
<property name="test.commit.file" value="${test.src.dir}/commit-tests"/>
+ <property name="test.newlogicalplan.file" value="${test.src.dir}/newlogicalplan-tests"/>
<property name="test.unit.file" value="${test.src.dir}/unit-tests"/>
<property name="test.smoke.file" value="${test.src.dir}/smoke-tests"/>
<property name="test.all.file" value="${test.src.dir}/all-tests"/>
@@ -546,6 +547,10 @@
<macro-test-runner test.file="${test.commit.file}" />
</target>
+ <target name="test-newlogicalplan" depends="compile-test,jar-withouthadoop" description="Run approximate 10-minute set of unit tests prior to commiting">
+ <macro-test-runner test.file="${test.newlogicalplan.file}" />
+ </target>
+
<target name="test-unit" depends="compile-test,jar-withouthadoop" description="Run all true unit tests">
<macro-test-runner test.file="${test.unit.file}" />
</target>
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=982345&r1=982344&r2=982345&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Wed Aug 4 17:46:42 2010
@@ -59,9 +59,6 @@ import org.apache.pig.classification.Int
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
-import org.apache.pig.experimental.logical.optimizer.LogicalPlanOptimizer;
-import org.apache.pig.experimental.logical.optimizer.UidStamper;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.InterStorage;
@@ -95,6 +92,8 @@ import org.apache.pig.impl.streaming.Str
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.PropertiesUtil;
+import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
import org.apache.pig.pen.ExampleGenerator;
import org.apache.pig.scripting.ScriptEngine;
import org.apache.pig.tools.grunt.GruntParser;
@@ -881,12 +880,8 @@ public class PigServer {
if( pigContext.getProperties().getProperty("pig.usenewlogicalplan", "false").equals("true") ) {
LogicalPlanMigrationVistor migrator = new LogicalPlanMigrationVistor(lp);
migrator.visit();
- org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migrator.getNewLogicalPlan();
+ org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migrator.getNewLogicalPlan();
- // set uids
- UidStamper stamper = new UidStamper(newPlan);
- stamper.visit();
-
LogicalPlanOptimizer optimizer = new LogicalPlanOptimizer(newPlan, 3);
optimizer.optimize();
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=982345&r1=982344&r2=982345&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Aug 4 17:46:42 2010
@@ -50,8 +50,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
-import org.apache.pig.experimental.logical.optimizer.UidStamper;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
@@ -59,6 +57,8 @@ import org.apache.pig.impl.io.InterStora
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
@@ -235,20 +235,19 @@ public class HExecutionEngine {
// translate old logical plan to new plan
LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(plan);
visitor.visit();
- org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+ org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
- // set uids
- UidStamper stamper = new UidStamper(newPlan);
- stamper.visit();
+ SchemaResetter schemaResetter = new SchemaResetter(newPlan);
+ schemaResetter.visit();
// run optimizer
- org.apache.pig.experimental.logical.optimizer.LogicalPlanOptimizer optimizer =
- new org.apache.pig.experimental.logical.optimizer.LogicalPlanOptimizer(newPlan, 100);
+ org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer optimizer =
+ new org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer(newPlan, 100);
optimizer.optimize();
// translate new logical plan to physical plan
- org.apache.pig.experimental.logical.relational.LogToPhyTranslationVisitor translator =
- new org.apache.pig.experimental.logical.relational.LogToPhyTranslationVisitor(newPlan);
+ org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor translator =
+ new org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor(newPlan);
translator.setPigContext(pigContext);
translator.visit();
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.util.Pair;
+
+public abstract class BaseOperatorPlan implements OperatorPlan {
+
+ protected Set<Operator> ops;
+ protected PlanEdge fromEdges;
+ protected PlanEdge toEdges;
+
+ private List<Operator> roots;
+ private List<Operator> leaves;
+ protected static final Log log =
+ LogFactory.getLog(BaseOperatorPlan.class);
+
+ public BaseOperatorPlan() {
+ ops = new HashSet<Operator>();
+ roots = new ArrayList<Operator>();
+ leaves = new ArrayList<Operator>();
+ fromEdges = new PlanEdge();
+ toEdges = new PlanEdge();
+ }
+
+ /**
+ * Get number of nodes in the plan.
+ */
+ public int size() {
+ return ops.size();
+ }
+
+ /**
+ * Get all operators in the plan that have no predecessors.
+ * @return all operators in the plan that have no predecessors, or
+ * an empty list if the plan is empty.
+ */
+ public List<Operator> getSources() {
+ if (roots.size() == 0 && ops.size() > 0) {
+ for (Operator op : ops) {
+ if (toEdges.get(op) == null) {
+ roots.add(op);
+ }
+ }
+ }
+ return roots;
+ }
+
+ /**
+ * Get all operators in the plan that have no successors.
+ * @return all operators in the plan that have no successors, or
+ * an empty list if the plan is empty.
+ */
+ public List<Operator> getSinks() {
+ if (leaves.size() == 0 && ops.size() > 0) {
+ for (Operator op : ops) {
+ if (fromEdges.get(op) == null) {
+ leaves.add(op);
+ }
+ }
+ }
+ return leaves;
+ }
+
+ /**
+ * For a given operator, get all operators immediately before it in the
+ * plan.
+ * @param op operator to fetch predecessors of
+ * @return list of all operators imeediately before op, or an empty list
+ * if op is a root.
+ * @throws IOException if op is not in the plan.
+ */
+ public List<Operator> getPredecessors(Operator op) throws IOException {
+ return (List<Operator>)toEdges.get(op);
+ }
+
+ /**
+ * For a given operator, get all operators immediately after it.
+ * @param op operator to fetch successors of
+ * @return list of all operators imeediately after op, or an empty list
+ * if op is a leaf.
+ * @throws IOException if op is not in the plan.
+ */
+ public List<Operator> getSuccessors(Operator op) throws IOException {
+ return (List<Operator>)fromEdges.get(op);
+ }
+
+ /**
+ * Add a new operator to the plan. It will not be connected to any
+ * existing operators.
+ * @param op operator to add
+ */
+ public void add(Operator op) {
+ markDirty();
+ ops.add(op);
+ }
+
+ /**
+ * Remove an operator from the plan.
+ * @param op Operator to be removed
+ * @throws IOException if the remove operation attempts to
+ * remove an operator that is still connected to other operators.
+ */
+ public void remove(Operator op) throws IOException {
+
+ if (fromEdges.containsKey(op) || toEdges.containsKey(op)) {
+ throw new IOException("Attempt to remove operator " + op.getName()
+ + " that is still connected in the plan");
+ }
+ markDirty();
+ ops.remove(op);
+ }
+
+ /**
+ * Connect two operators in the plan, controlling which position in the
+ * edge lists that the from and to edges are placed.
+ * @param from Operator edge will come from
+ * @param fromPos Position in the array for the from edge
+ * @param to Operator edge will go to
+ * @param toPos Position in the array for the to edge
+ */
+ public void connect(Operator from,
+ int fromPos,
+ Operator to,
+ int toPos) {
+ markDirty();
+ fromEdges.put(from, to, fromPos);
+ toEdges.put(to, from, toPos);
+ }
+
+ /**
+ * Connect two operators in the plan.
+ * @param from Operator edge will come from
+ * @param to Operator edge will go to
+ */
+ public void connect(Operator from, Operator to) {
+ markDirty();
+ fromEdges.put(from, to);
+ toEdges.put(to, from);
+ }
+
+ /**
+ * Disconnect two operators in the plan.
+ * @param from Operator edge is coming from
+ * @param to Operator edge is going to
+ * @return pair of positions, indicating the position in the from and
+ * to arrays.
+ * @throws IOException if the two operators aren't connected.
+ */
+ public Pair<Integer, Integer> disconnect(Operator from,
+ Operator to) throws IOException {
+ Pair<Operator, Integer> f = fromEdges.removeWithPosition(from, to);
+ if (f == null) {
+ throw new IOException("Attempt to disconnect operators " +
+ from.getName() + " and " + to.getName() +
+ " which are not connected.");
+ }
+
+ Pair<Operator, Integer> t = toEdges.removeWithPosition(to, from);
+ if (t == null) {
+ throw new IOException("Plan in inconssistent state " +
+ from.getName() + " and " + to.getName() +
+ " connected in fromEdges but not toEdges.");
+ }
+
+ markDirty();
+ return new Pair<Integer, Integer>(f.second, t.second);
+ }
+
+ private void markDirty() {
+ roots.clear();
+ leaves.clear();
+ }
+
+ public Iterator<Operator> getOperators() {
+ return ops.iterator();
+ }
+
+ public boolean isEqual(OperatorPlan other) {
+ return isEqual(this, other);
+ }
+
+ private static boolean checkPredecessors(Operator op1,
+ Operator op2) {
+ try {
+ List<Operator> preds = op1.getPlan().getPredecessors(op1);
+ List<Operator> otherPreds = op2.getPlan().getPredecessors(op2);
+ if (preds == null && otherPreds == null) {
+ // intentionally blank
+ } else if (preds == null || otherPreds == null) {
+ return false;
+ } else {
+ if (preds.size() != otherPreds.size()) return false;
+ for (int i = 0; i < preds.size(); i++) {
+ Operator p1 = preds.get(i);
+ Operator p2 = otherPreds.get(i);
+ if (!p1.isEqual(p2)) return false;
+ if (!checkPredecessors(p1, p2)) return false;
+ }
+ }
+ return true;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected static boolean isEqual(OperatorPlan p1, OperatorPlan p2) {
+ if (p1 == p2) {
+ return true;
+ }
+
+ if (p1 != null && p2 != null) {
+ List<Operator> leaves = p1.getSinks();
+ List<Operator> otherLeaves = p2.getSinks();
+ if (leaves.size() != otherLeaves.size()) return false;
+ // Must find some leaf that is equal to each leaf. There is no
+ // guarantee leaves will be returned in any particular order.
+ boolean foundAll = true;
+ for (Operator op1 : leaves) {
+ boolean foundOne = false;
+ for (Operator op2 : otherLeaves) {
+ if (op1.isEqual(op2) && checkPredecessors(op1, op2)) {
+ foundOne = true;
+ break;
+ }
+ }
+ foundAll &= foundOne;
+ if (!foundAll) return false;
+ }
+ return foundAll;
+ }
+
+ return false;
+ }
+
+ public void explain(PrintStream ps, String format, boolean verbose) throws IOException {
+ }
+
+ @Override
+ public String toString() {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(os);
+ try {
+ explain(ps,"",false);
+ } catch (IOException e) {
+ return "";
+ }
+ return os.toString();
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/DependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/DependencyOrderWalker.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/DependencyOrderWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/DependencyOrderWalker.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A walker to walk graphs in dependency order. It is guaranteed that a node
+ * will not be visited until all of its predecessors have been visited. This
+ * is equivalent to doing a topilogical sort on the graph and then visiting
+ * the nodes in order.
+ */
+public class DependencyOrderWalker extends PlanWalker {
+
+ /**
+ * @param plan for this walker to traverse.
+ */
+ public DependencyOrderWalker(OperatorPlan plan) {
+ super(plan);
+ }
+
+ @Override
+ public PlanWalker spawnChildWalker(OperatorPlan plan) {
+ return new DependencyOrderWalker(plan);
+ }
+
+ /**
+ * Begin traversing the graph.
+ * @param visitor Visitor this walker is being used by.
+ * @throws VisitorException if an error is encountered while walking.
+ */
+ @Override
+ public void walk(PlanVisitor visitor) throws IOException {
+ // This is highly inefficient, but our graphs are small so it should be okay.
+ // The algorithm works by starting at any node in the graph, finding it's
+ // predecessors and calling itself for each of those predecessors. When it
+ // finds a node that has no unfinished predecessors it puts that node in the
+ // list. It then unwinds itself putting each of the other nodes in the list.
+ // It keeps track of what nodes it's seen as it goes so it doesn't put any
+ // nodes in the graph twice.
+
+ List<Operator> fifo = new ArrayList<Operator>();
+ Set<Operator> seen = new HashSet<Operator>();
+ List<Operator> leaves = plan.getSinks();
+ if (leaves == null) return;
+ for (Operator op : leaves) {
+ doAllPredecessors(op, seen, fifo);
+ }
+
+ for (Operator op: fifo) {
+ op.accept(visitor);
+ }
+ }
+
+ protected void doAllPredecessors(Operator node,
+ Set<Operator> seen,
+ Collection<Operator> fifo) throws IOException {
+ if (!seen.contains(node)) {
+ // We haven't seen this one before.
+ Collection<Operator> preds = plan.getPredecessors(node);
+ if (preds != null && preds.size() > 0) {
+ // Do all our predecessors before ourself
+ for (Operator op : preds) {
+ doAllPredecessors(op, seen, fifo);
+ }
+ }
+ // Now do ourself
+ seen.add(node);
+ fifo.add(node);
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstMemoryWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstMemoryWalker.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstMemoryWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstMemoryWalker.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+public class DepthFirstMemoryWalker extends DepthFirstWalker {
+
+ private int level = 0;
+ private int startingLevel = 0;
+ private Stack<String> prefixStack;
+ private String currentPrefix = "";
+
+ public DepthFirstMemoryWalker(OperatorPlan plan, int startingLevel) {
+ super(plan);
+ level = startingLevel;
+ this.startingLevel = startingLevel;
+ prefixStack = new Stack<String>();
+ }
+
+ @Override
+ public PlanWalker spawnChildWalker(OperatorPlan plan) {
+ return new DepthFirstMemoryWalker(plan, level);
+ }
+
+ /**
+ * Begin traversing the graph.
+ * @param visitor Visitor this walker is being used by.
+ * @throws IOException if an error is encountered while walking.
+ */
+ @Override
+ public void walk(PlanVisitor visitor) throws IOException {
+ List<Operator> roots = plan.getSources();
+ Set<Operator> seen = new HashSet<Operator>();
+
+ depthFirst(null, roots, seen, visitor);
+ }
+
+ public String getPrefix() {
+ return currentPrefix;
+ }
+
+ private void depthFirst(Operator node,
+ Collection<Operator> successors,
+ Set<Operator> seen,
+ PlanVisitor visitor) throws IOException {
+ if (successors == null) return;
+
+ StringBuilder strb = new StringBuilder();
+ for(int i = 0; i < startingLevel; i++ ) {
+ strb.append("|\t");
+ }
+ if( ((level-1) - startingLevel ) >= 0 )
+ strb.append("\t");
+ for(int i = 0; i < ((level-1) - startingLevel ); i++ ) {
+ strb.append("|\t");
+ }
+ strb.append( "|\n" );
+ for(int i = 0; i < startingLevel; i++ ) {
+ strb.append("|\t");
+ }
+ if( ((level-1) - startingLevel ) >= 0 )
+ strb.append("\t");
+ for(int i = 0; i < ((level-1) - startingLevel ); i++ ) {
+ strb.append("|\t");
+ }
+ strb.append("|---");
+ currentPrefix = strb.toString();
+
+ for (Operator suc : successors) {
+ if (seen.add(suc)) {
+ suc.accept(visitor);
+ Collection<Operator> newSuccessors = plan.getSuccessors(suc);
+ level++;
+ prefixStack.push(currentPrefix);
+ depthFirst(suc, newSuccessors, seen, visitor);
+ level--;
+ currentPrefix = prefixStack.pop();
+ }
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstWalker.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstWalker.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Do a depth first traversal of the graph.
+ */
+public class DepthFirstWalker extends PlanWalker {
+
+ public DepthFirstWalker(OperatorPlan plan) {
+ super(plan);
+ }
+
+ @Override
+ public PlanWalker spawnChildWalker(OperatorPlan plan) {
+ return new DepthFirstWalker(plan);
+ }
+
+ /**
+ * Begin traversing the graph.
+ * @param visitor Visitor this walker is being used by.
+ * @throws IOException if an error is encountered while walking.
+ */
+ @Override
+ public void walk(PlanVisitor visitor) throws IOException {
+ List<Operator> roots = plan.getSources();
+ Set<Operator> seen = new HashSet<Operator>();
+
+ depthFirst(null, roots, seen, visitor);
+ }
+
+ private void depthFirst(Operator node,
+ Collection<Operator> successors,
+ Set<Operator> seen,
+ PlanVisitor visitor) throws IOException {
+ if (successors == null) return;
+
+ for (Operator suc : successors) {
+ if (seen.add(suc)) {
+ suc.accept(visitor);
+ Collection<Operator> newSuccessors = plan.getSuccessors(suc);
+ depthFirst(suc, newSuccessors, seen, visitor);
+ }
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class Operator {
+
+ protected String name;
+ protected OperatorPlan plan; // plan that contains this operator
+ protected Map<String, Object> annotations;
+ protected final int hashPrime = 31;
+
+ public Operator(String n, OperatorPlan p) {
+ name = n;
+ plan = p;
+ annotations = new HashMap<String, Object>();
+ }
+
+ /**
+ * Accept a visitor at this node in the graph.
+ * @param v Visitor to accept.
+ * @throws IOException
+ */
+ public abstract void accept(PlanVisitor v) throws IOException;
+
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Get the plan associated with this operator.
+ * @return plan
+ */
+ public OperatorPlan getPlan() {
+ return plan;
+ }
+
+ /**
+ * Add an annotation to a node in the plan.
+ * @param key string name of this annotation
+ * @param val value, as an Object
+ */
+ public void annotate(String key, Object val) {
+ annotations.put(key, val);
+ }
+
+ /**
+ * Look to see if a node is annotated.
+ * @param key string name of annotation to look for
+ * @return value of the annotation, as an Object, or null if the key is
+ * not present in the map.
+ */
+ public Object getAnnotation(String key) {
+ return annotations.get(key);
+ }
+
+ /**
+ * Remove an annotation
+ * @param key the key of the annotation
+ * @return the original value of the annotation
+ */
+ public Object removeAnnotation(String key) {
+ return annotations.remove(key);
+ }
+
+ /**
+ * This is like a shallow equals comparison.
+ * It returns true if two operators have equivalent properties even if they are
+ * different objects. Here properties mean equivalent plan and equivalent name.
+ * @param operator
+ * @return true if two object have equivalent properties, else false
+ */
+ public abstract boolean isEqual(Operator operator);
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorPlan.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorPlan.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorPlan.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.impl.util.Pair;
+
+public interface OperatorPlan {
+
+ /**
+ * Get number of nodes in the plan.
+ */
+ public int size();
+
+ /**
+ * Get all operators in the plan that have no predecessors.
+ * @return all operators in the plan that have no predecessors, or
+ * an empty list if the plan is empty.
+ */
+ public List<Operator> getSources();
+
+ /**
+ * Get all operators in the plan that have no successors.
+ * @return all operators in the plan that have no successors, or
+ * an empty list if the plan is empty.
+ */
+ public List<Operator> getSinks();
+
+ /**
+ * For a given operator, get all operators immediately before it in the
+ * plan.
+ * @param op operator to fetch predecessors of
+ * @return list of all operators immediately before op, or an empty list
+ * if op is a root.
+ * @throws IOException if op is not in the plan.
+ */
+ public List<Operator> getPredecessors(Operator op) throws IOException;
+
+ /**
+ * For a given operator, get all operators immediately after it.
+ * @param op operator to fetch successors of
+ * @return list of all operators immediately after op, or an empty list
+ * if op is a leaf.
+ * @throws IOException if op is not in the plan.
+ */
+ public List<Operator> getSuccessors(Operator op) throws IOException;
+
+ /**
+ * Add a new operator to the plan. It will not be connected to any
+ * existing operators.
+ * @param op operator to add
+ */
+ public void add(Operator op);
+
+ /**
+ * Remove an operator from the plan.
+ * @param op Operator to be removed
+ * @throws IOException if the remove operation attempts to
+ * remove an operator that is still connected to other operators.
+ */
+ public void remove(Operator op) throws IOException;
+
+ /**
+ * Connect two operators in the plan, controlling which position in the
+ * edge lists that the from and to edges are placed.
+ * @param from Operator edge will come from
+ * @param fromPos Position in the array for the from edge
+ * @param to Operator edge will go to
+ * @param toPos Position in the array for the to edge
+ */
+ public void connect(Operator from, int fromPos, Operator to, int toPos);
+
+ /**
+ * Connect two operators in the plan.
+ * @param from Operator edge will come from
+ * @param to Operator edge will go to
+ */
+ public void connect(Operator from, Operator to);
+
+ /**
+ * Disconnect two operators in the plan.
+ * @param from Operator edge is coming from
+ * @param to Operator edge is going to
+ * @return pair of positions, indicating the position in the from and
+ * to arrays.
+ * @throws IOException if the two operators aren't connected.
+ */
+ public Pair<Integer, Integer> disconnect(Operator from, Operator to) throws IOException;
+
+
+ /**
+ * Get an iterator of all operators in this plan
+ * @return an iterator of all operators in this plan
+ */
+ public Iterator<Operator> getOperators();
+
+ /**
+ * This is like a shallow comparison.
+ * Two plans are equal if they have equivalent operators and equivalent
+ * structure.
+ * @param other object to compare
+ * @return boolean if both the plans are equivalent
+ */
+ public boolean isEqual( OperatorPlan other );
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorSubPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorSubPlan.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorSubPlan.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorSubPlan.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * Class to represent a view of a plan. The view contains a subset of the plan.
+ * All the operators returned from the view are the same objects to the operators
+ * in its base plan. It is used to represent match results.
+ *
+ */
+public class OperatorSubPlan implements OperatorPlan {
+
+ private OperatorPlan basePlan;
+ private List<Operator> roots;
+ private List<Operator> leaves;
+ private Set<Operator> operators;
+
+ public OperatorSubPlan(OperatorPlan base) {
+ basePlan = base;
+ roots = new ArrayList<Operator>();
+ leaves = new ArrayList<Operator>();
+ operators = new HashSet<Operator>();
+ }
+
+ public OperatorPlan getBasePlan() {
+ return basePlan;
+ }
+
+ public void add(Operator op) {
+ operators.add(op);
+ leaves.clear();
+ roots.clear();
+ }
+
+ public void connect(Operator from, int fromPos, Operator to, int toPos) {
+ throw new UnsupportedOperationException("connect() can not be called on OperatorSubPlan");
+ }
+
+ public void connect(Operator from, Operator to) {
+ throw new UnsupportedOperationException("connect() can not be called on OperatorSubPlan");
+ }
+
+ public Pair<Integer, Integer> disconnect(Operator from, Operator to) throws IOException {
+ throw new UnsupportedOperationException("disconnect() can not be called on OperatorSubPlan");
+ }
+
+ public List<Operator> getSinks() {
+ if (leaves.size() == 0 && operators.size() > 0) {
+ for (Operator op : operators) {
+ try {
+ if (getSuccessors(op) == null) {
+ leaves.add(op);
+ }
+ }catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return leaves;
+ }
+
+ public Iterator<Operator> getOperators() {
+ return operators.iterator();
+ }
+
+ public List<Operator> getPredecessors(Operator op) throws IOException {
+ List<Operator> l = basePlan.getPredecessors(op);
+ List<Operator> list = null;
+ if (l != null) {
+ for(Operator oper: l) {
+ if (operators.contains(oper)) {
+ if (list == null) {
+ list = new ArrayList<Operator>();
+ }
+ list.add(oper);
+ }
+ }
+ }
+
+ return list;
+ }
+
+ public List<Operator> getSources() {
+ if (roots.size() == 0 && operators.size() > 0) {
+ for (Operator op : operators) {
+ try {
+ if (getPredecessors(op) == null) {
+ roots.add(op);
+ }
+ }catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return roots;
+ }
+
+ public List<Operator> getSuccessors(Operator op) throws IOException {
+ List<Operator> l = basePlan.getSuccessors(op);
+ List<Operator> list = null;
+ if (l != null) {
+ for(Operator oper: l) {
+ if (operators.contains(oper)) {
+ if (list == null) {
+ list = new ArrayList<Operator>();
+ }
+ list.add(oper);
+ }
+ }
+ }
+
+ return list;
+ }
+
+ public void remove(Operator op) throws IOException {
+ operators.remove(op);
+ leaves.clear();
+ roots.clear();
+ }
+
+ public int size() {
+ return operators.size();
+ }
+
+ @Override
+ public boolean isEqual(OperatorPlan other) {
+ return BaseOperatorPlan.isEqual(this, other);
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/PlanEdge.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/PlanEdge.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/PlanEdge.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/PlanEdge.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
+
+public class PlanEdge extends MultiMap<Operator, Operator> {
+
+ private static final long serialVersionUID = 1L;
+
+ public PlanEdge() {
+ super();
+ }
+
+ /**
+ * @param size Initial size of the map
+ */
+ public PlanEdge(int size) {
+ super(size);
+ }
+
+ /**
+ * Add an element to the map.
+ * @param key The key to store the value under. If the key already
+ * exists the value will be added to the collection for that key, it
+ * will not replace the existing value (as in a standard map).
+ * @param value value to store.
+ * @param pos position in the arraylist to store the new value at.
+ * Positions are zero based.
+ */
+ public void put(Operator key, Operator value, int pos) {
+ ArrayList<Operator> list = mMap.get(key);
+ if (list == null) {
+ list = new ArrayList<Operator>();
+ if (pos != 0) {
+ throw new IndexOutOfBoundsException(
+ "First edge cannot have position greater than 1");
+ }
+ list.add(value);
+ mMap.put(key, list);
+ } else {
+ list.add(pos, value);
+ }
+ }
+
+ /**
+ * Remove one value from an existing key and return which position in
+ * the arraylist the value was at.. If that is the last value
+ * for the key, then remove the key too.
+ * @param key Key to remove the value from.
+ * @param value Value to remove.
+ * @return A pair containing the value being removed and an integer
+ * indicating the position, or null if the key or value does
+ * not exist. Positions are zero based.
+ */
+ public Pair<Operator, Integer> removeWithPosition(Operator key,
+ Operator value) {
+ ArrayList<Operator> list = mMap.get(key);
+ if (list == null) return null;
+
+ int index = -1;
+ Iterator<Operator> i = list.iterator();
+ Operator keeper = null;
+ for (int j = 0; i.hasNext(); j++) {
+ keeper = i.next();
+ //if (keeper.equals(value)) {
+ if (keeper == value) {
+ i.remove();
+ index = j;
+ break;
+ }
+ }
+
+ if (index == -1) return null;
+
+ if (list.size() == 0) {
+ mMap.remove(key);
+ }
+
+ return new Pair<Operator, Integer>(keeper, index);
+ }
+
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/PlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/PlanVisitor.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/PlanVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/PlanVisitor.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.Stack;
+
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor mechanism for navigating and operating on a plan of
+ * Operators. This class contains the logic to traverse the plan. It does
+ * not visit individual nodes. That is left to implementing classes
+ * (such as LOVisitor).
+ */
+public abstract class PlanVisitor {
+
+ // TODO Remove this scope value
+ final protected static String DEFAULT_SCOPE = "scope";
+
+ protected OperatorPlan plan;
+
+ /**
+ * Guaranteed to always point to the walker currently being used.
+ */
+ protected PlanWalker currentWalker;
+
+ private Stack<PlanWalker> walkers;
+
+ /**
+ * Entry point for visiting the plan.
+ * @throws VisitorException if an error is encountered while visiting.
+ */
+ public void visit() throws IOException {
+ currentWalker.walk(this);
+ }
+
+ public OperatorPlan getPlan() {
+ return plan;
+ }
+
+ /**
+ * @param plan OperatorPlan this visitor will visit.
+ * @param walker PlanWalker this visitor will use to traverse the plan.
+ */
+ protected PlanVisitor(OperatorPlan plan, PlanWalker walker) {
+ this.plan = plan;
+ currentWalker = walker;
+ walkers = new Stack<PlanWalker>();
+ }
+
+ /**
+ * Push the current walker onto the stack of saved walkers and begin using
+ * the newly passed walker as the current walker.
+ * @param walker new walker to set as the current walker.
+ */
+ protected void pushWalker(PlanWalker walker) {
+ walkers.push(currentWalker);
+ currentWalker = walker;
+ }
+
+ /**
+ * Pop the next to previous walker off of the stack and set it as the current
+ * walker. This will drop the reference to the current walker.
+ * @throws VisitorException if there are no more walkers on the stack. In
+ * this case the current walker is not reset.
+ */
+ protected void popWalker() throws VisitorException {
+ if (walkers.empty()) {
+ throw new VisitorException("No more walkers to pop.");
+ }
+ currentWalker = walkers.pop();
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/PlanWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/PlanWalker.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/PlanWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/PlanWalker.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+
+public abstract class PlanWalker {
+
+ protected OperatorPlan plan;
+
+ /**
+ * @param plan Plan for this walker to traverse.
+ */
+ public PlanWalker(OperatorPlan plan) {
+ this.plan = plan;
+ }
+
+ /**
+ * Begin traversing the graph.
+ * @param visitor Visitor this walker is being used by. This can't be set in
+ * the constructor because the visitor is constructing this class, and does
+ * not yet have a 'this' pointer to send as an argument.
+ * @throws VisitorException if an error is encountered while walking.
+ */
+ public abstract void walk(PlanVisitor visitor) throws IOException;
+
+ /**
+ * Return a new instance of this same type of walker for a subplan.
+ * When this method is called the same type of walker with the
+ * provided plan set as the plan, must be returned. This can then be
+ * used to walk subplans. This allows abstract visitors to clone
+ * walkers without knowning the type of walker their subclasses used.
+ * @param plan Plan for the new walker.
+ * @return Instance of the same type of walker with plan set to plan.
+ */
+ public abstract PlanWalker spawnChildWalker(OperatorPlan plan);
+
+ public OperatorPlan getPlan() {
+ return plan ;
+ }
+
+ /**
+ * Set the plan for this walker to operate on.
+ * @param plan to walk
+ */
+ public void setPlan(OperatorPlan plan) {
+ this.plan = plan;
+ }
+
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/ReverseDependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/ReverseDependencyOrderWalker.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/ReverseDependencyOrderWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/ReverseDependencyOrderWalker.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Visit a plan in the reverse of the dependency order. That is, every node
+ * after every node that depends on it is visited. Thus this is equivalent to
+ * doing a reverse topilogical sort on the graph and then visiting it in order.
+ */
+public class ReverseDependencyOrderWalker extends PlanWalker {
+
+ public ReverseDependencyOrderWalker(OperatorPlan plan) {
+ super(plan);
+ }
+
+ @Override
+ public PlanWalker spawnChildWalker(OperatorPlan plan) {
+ return new ReverseDependencyOrderWalker(plan);
+ }
+
+ /**
+ * Begin traversing the graph.
+ * @param visitor Visitor this walker is being used by.
+ * @throws VisitorException if an error is encountered while walking.
+ */
+ @Override
+ public void walk(PlanVisitor visitor) throws IOException {
+ // This is highly inefficient, but our graphs are small so it should be okay.
+ // The algorithm works by starting at any node in the graph, finding it's
+ // successors and calling itself for each of those successors. When it
+ // finds a node that has no unfinished successors it puts that node in the
+ // list. It then unwinds itself putting each of the other nodes in the list.
+ // It keeps track of what nodes it's seen as it goes so it doesn't put any
+ // nodes in the graph twice.
+
+ List<Operator> fifo = new ArrayList<Operator>();
+ Set<Operator> seen = new HashSet<Operator>();
+ List<Operator> roots = plan.getSources();
+ if (roots == null) return;
+ for (Operator op : roots) {
+ doAllSuccessors(op, seen, fifo);
+ }
+
+ for (Operator op: fifo) {
+ op.accept(visitor);
+ }
+ }
+
+ protected void doAllSuccessors(Operator node,
+ Set<Operator> seen,
+ Collection<Operator> fifo) throws IOException {
+ if (!seen.contains(node)) {
+ // We haven't seen this one before.
+ Collection<Operator> succs = plan.getSuccessors(node);
+ if (succs != null && succs.size() > 0) {
+ // Do all our successors before ourself
+ for (Operator op : succs) {
+ doAllSuccessors(op, seen, fifo);
+ }
+ }
+ // Now do ourself
+ seen.add(node);
+ fifo.add(node);
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/SubtreeDependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/SubtreeDependencyOrderWalker.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/SubtreeDependencyOrderWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/SubtreeDependencyOrderWalker.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class SubtreeDependencyOrderWalker extends DependencyOrderWalker {
+ private Operator startNode;
+
+ public SubtreeDependencyOrderWalker(OperatorPlan plan) {
+ super(plan);
+ }
+
+ public SubtreeDependencyOrderWalker(OperatorPlan plan, Operator startNode) {
+ super(plan);
+ this.startNode = startNode;
+ }
+
+ public void walk(PlanVisitor visitor) throws IOException {
+ List<Operator> fifo = new ArrayList<Operator>();
+ Set<Operator> seen = new HashSet<Operator>();
+
+ // get all predecessors of startNode
+ doAllPredecessors(startNode, seen, fifo);
+
+ for (Operator op: fifo) {
+ op.accept(visitor);
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.pig.impl.logicalLayer.ExpressionOperator;
+import org.apache.pig.impl.logicalLayer.LODistinct;
+import org.apache.pig.impl.logicalLayer.LOFilter;
+import org.apache.pig.impl.logicalLayer.RelationalOperator;
+import org.apache.pig.impl.logicalLayer.LOForEach;
+import org.apache.pig.impl.logicalLayer.LOLimit;
+import org.apache.pig.impl.logicalLayer.LOProject;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.logical.expression.DereferenceExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+
+
+// visitor to translate the inner plan of foreach
+// it contains methods to translate all the operators that are allowed
+// in the inner plan of foreach
+public class ForeachInnerPlanVisitor extends LogicalExpPlanMigrationVistor {
+ private org.apache.pig.newplan.logical.relational.LogicalPlan newInnerPlan;
+ private LOForEach oldForeach;
+ private org.apache.pig.newplan.logical.relational.LogicalRelationalOperator gen;
+ private int inputNo;
+ private HashMap<LogicalOperator, LogicalRelationalOperator> innerOpsMap;
+
+ public ForeachInnerPlanVisitor(org.apache.pig.newplan.logical.relational.LOForEach foreach, LOForEach oldForeach, LogicalPlan innerPlan,
+ LogicalPlan oldLogicalPlan) {
+ super(innerPlan, foreach, oldLogicalPlan);
+ newInnerPlan = foreach.getInnerPlan();
+
+ // get next inputNo
+ gen = (org.apache.pig.newplan.logical.relational.LogicalRelationalOperator)
+ newInnerPlan.getSinks().get(0);
+ try {
+ inputNo = 0;
+ List<org.apache.pig.newplan.Operator> suc = newInnerPlan.getPredecessors(gen);
+ if (suc != null) {
+ inputNo = suc.size();
+ }
+ }catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ this.oldForeach = oldForeach;
+
+ innerOpsMap = new HashMap<LogicalOperator, LogicalRelationalOperator>();
+ }
+
+ private void translateInnerPlanConnection(LogicalOperator oldOp, org.apache.pig.newplan.Operator newOp) throws IOException {
+ List<LogicalOperator> preds = mPlan.getPredecessors(oldOp);
+
+ if(preds != null) {
+ for(LogicalOperator pred: preds) {
+ org.apache.pig.newplan.Operator newPred = innerOpsMap.get(pred);
+ if (newPred.getPlan().getSuccessors(newPred)!=null) {
+ org.apache.pig.newplan.Operator newSucc = newOp.getPlan().getSuccessors(newPred).get(0);
+ Pair<Integer, Integer> pair = newOp.getPlan().disconnect(newPred, newSucc);
+ newOp.getPlan().connect(newPred, newOp);
+ newOp.getPlan().connect(newOp, pair.first, newSucc, pair.second);
+ }
+ else {
+ newOp.getPlan().connect(newPred, newOp);
+ }
+ }
+ }
+ }
+
+ private LogicalExpressionPlan translateInnerExpressionPlan(LogicalPlan lp, LogicalRelationalOperator op, LogicalPlan outerPlan) throws VisitorException {
+ PlanWalker<LogicalOperator, LogicalPlan> childWalker =
+ new DependencyOrderWalker<LogicalOperator, LogicalPlan>(lp);
+
+ LogicalExpPlanMigrationVistor childPlanVisitor = new LogicalExpPlanMigrationVistor(lp, op, outerPlan);
+
+ childWalker.walk(childPlanVisitor);
+ return childPlanVisitor.exprPlan;
+ }
+
+ public void visit(LOProject project) throws VisitorException {
+ LogicalOperator op = project.getExpression();
+
+ if (op == outerPlan.getPredecessors(oldForeach).get(0)) {
+ // if this projection is to get a field from outer plan, change it
+ // to LOInnerLoad
+
+ LOInnerLoad innerLoad = new LOInnerLoad(newInnerPlan,
+ (org.apache.pig.newplan.logical.relational.LOForEach)attachedRelationalOp,
+ project.isStar()?-1:project.getCol());
+
+ newInnerPlan.add(innerLoad);
+ innerOpsMap.put(project, innerLoad);
+
+ // The logical plan part for this foreach plan is done, add ProjectExpression
+ // into expression plan.
+
+ // The logical plan part is done, add this sub plan under LOGenerate,
+ // and prepare for the expression plan
+ newInnerPlan.connect(innerLoad, gen);
+
+ ProjectExpression pe = new ProjectExpression(exprPlan, inputNo++, -1, gen);
+ exprPlan.add(pe);
+ exprOpsMap.put(project, pe);
+ try {
+ translateInnerPlanConnection(project, pe);
+ } catch (IOException e) {
+ throw new VisitorException(e);
+ }
+ }
+
+ // This case occurs when there are two projects one after another
+ // These projects in combination project a column (bag) out of a tuple
+ // and then project a column out of this projected bag
+ // Here we merge these two projects into one BagDereferenceExpression
+ else if( op instanceof LOProject ) {
+ LogicalExpression expOper = exprOpsMap.get(op);
+
+ if (expOper!=null) {
+ // Add the dereference in the plan
+ DereferenceExpression dereferenceExp = new DereferenceExpression(
+ exprPlan, project.getProjection());
+ exprOpsMap.put(project, dereferenceExp);
+ exprPlan.add(dereferenceExp);
+ exprPlan.connect(dereferenceExp, expOper);
+ }
+ } else {
+ if (op instanceof RelationalOperator && project.isSendEmptyBagOnEOP()) {
+ LogicalOperator currentOp = op;
+ while (currentOp instanceof RelationalOperator) {
+ List<LogicalOperator> preds = mPlan.getPredecessors(currentOp);
+ if (preds!=null)
+ currentOp = preds.get(0);
+ else break;
+ }
+ if (currentOp instanceof ExpressionOperator) {
+ LogicalExpression exp = exprOpsMap.get(currentOp);
+ if (exp!=null)
+ exprOpsMap.put(project, exp);
+ }
+ }
+ }
+ }
+
+ public void visit(LOSort sort) throws VisitorException {
+ List<LogicalPlan> sortPlans = sort.getSortColPlans();
+ List<LogicalExpressionPlan> newSortPlans = new ArrayList<LogicalExpressionPlan>();
+
+ org.apache.pig.newplan.logical.relational.LOSort newSort =
+ new org.apache.pig.newplan.logical.relational.LOSort(newInnerPlan,
+ newSortPlans, sort.getAscendingCols(), sort.getUserFunc());
+
+ newSort.setAlias(sort.getAlias());
+ newSort.setRequestedParallelism(sort.getRequestedParallelism());
+ newSort.setLimit(sort.getLimit());
+ newInnerPlan.add(newSort);
+ innerOpsMap.put(sort, newSort);
+ try {
+ translateInnerPlanConnection(sort, newSort);
+ } catch (IOException e) {
+ throw new VisitorException(e);
+ }
+
+ for (LogicalPlan sortPlan : sortPlans) {
+ LogicalExpressionPlan newSortPlan = translateInnerExpressionPlan(sortPlan, newSort, mPlan);
+ newSortPlans.add(newSortPlan);
+ }
+ }
+
+ public void visit(LOLimit limit) throws VisitorException {
+ org.apache.pig.newplan.logical.relational.LOLimit newLimit =
+ new org.apache.pig.newplan.logical.relational.LOLimit(newInnerPlan,
+ limit.getLimit());
+
+ newLimit.setAlias(limit.getAlias());
+ newLimit.setRequestedParallelism(limit.getRequestedParallelism());
+ newInnerPlan.add(newLimit);
+ innerOpsMap.put(limit, newLimit);
+ try {
+ translateInnerPlanConnection(limit, newLimit);
+ } catch (IOException e) {
+ throw new VisitorException(e);
+ }
+ }
+
+ public void visit(LODistinct distinct) throws VisitorException {
+ org.apache.pig.newplan.logical.relational.LODistinct newDistinct =
+ new org.apache.pig.newplan.logical.relational.LODistinct(newInnerPlan);
+
+ newDistinct.setAlias(distinct.getAlias());
+ newDistinct.setRequestedParallelism(distinct.getRequestedParallelism());
+ newInnerPlan.add(newDistinct);
+ innerOpsMap.put(distinct, newDistinct);
+ try {
+ translateInnerPlanConnection(distinct, newDistinct);
+ } catch (IOException e) {
+ throw new VisitorException(e);
+ }
+ }
+
+ public void visit(LOFilter filter) throws VisitorException {
+ org.apache.pig.newplan.logical.relational.LOFilter newFilter =
+ new org.apache.pig.newplan.logical.relational.LOFilter(newInnerPlan);
+
+ newFilter.setAlias(filter.getAlias());
+ newFilter.setRequestedParallelism(filter.getRequestedParallelism());
+ LogicalExpressionPlan newFilterPlan = translateInnerExpressionPlan(filter.getComparisonPlan(), newFilter, mPlan);
+ newFilter.setFilterPlan(newFilterPlan);
+ newInnerPlan.add(newFilter);
+ innerOpsMap.put(filter, newFilter);
+ try {
+ translateInnerPlanConnection(filter, newFilter);
+ } catch (IOException e) {
+ throw new VisitorException(e);
+ }
+ }
+}
\ No newline at end of file