You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/04 19:46:48 UTC
svn commit: r982345 [8/13] - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/experimental/ src/org/apache/pig/newplan/
src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/log...
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+public class PushUpFilter extends Rule {
+
+ public PushUpFilter(String n) {
+ super(n);
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new PushUpFilterTransformer();
+ }
+
+ public class PushUpFilterTransformer extends Transformer {
+
+ private OperatorSubPlan subPlan;
+
+ @Override
+ public boolean check(OperatorPlan matched) throws IOException {
+ // check if it is inner join
+ LOJoin join = (LOJoin)matched.getSources().get(0);
+ boolean[] innerFlags = join.getInnerFlags();
+ for(boolean inner: innerFlags) {
+ if (!inner){
+ return false;
+ }
+ }
+
+ Operator next = matched.getSinks().get(0);
+ while(next != null && next instanceof LOFilter) {
+ LOFilter filter = (LOFilter)next;
+ LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+
+ // collect all uids used in the filter plan
+ Set<Long> uids = new HashSet<Long>();
+ Iterator<Operator> iter = filterPlan.getOperators();
+ while(iter.hasNext()) {
+ Operator op = iter.next();
+ if (op instanceof ProjectExpression) {
+ long uid = ((ProjectExpression)op).getFieldSchema().uid;
+ uids.add(uid);
+ }
+ }
+
+ List<Operator> preds = currentPlan.getPredecessors(join);
+
+ for(int j=0; j<preds.size(); j++) {
+ if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) {
+ return true;
+ }
+ }
+
+ // if current filter can not move up, check next filter
+ List<Operator> l = currentPlan.getSuccessors(filter);
+ if (l != null) {
+ next = l.get(0);
+ } else {
+ next = null;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws IOException {
+ subPlan = new OperatorSubPlan(currentPlan);
+
+ LOJoin join = (LOJoin)matched.getSources().get(0);
+ subPlan.add(join);
+
+ Operator next = matched.getSinks().get(0);
+ while(next != null && next instanceof LOFilter) {
+ LOFilter filter = (LOFilter)next;
+ subPlan.add(filter);
+
+ LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+
+ // collect all uids used in the filter plan
+ Set<Long> uids = new HashSet<Long>();
+ Iterator<Operator> iter = filterPlan.getOperators();
+ while(iter.hasNext()) {
+ Operator op = iter.next();
+ if (op instanceof ProjectExpression) {
+ long uid = ((ProjectExpression)op).getFieldSchema().uid;
+ uids.add(uid);
+ }
+ }
+
+ // find the farthest predecessor that has all the fields
+ LogicalRelationalOperator input = join;
+ List<Operator> preds = currentPlan.getPredecessors(input);
+ while(preds != null) {
+ boolean found = false;
+ for(int j=0; j<preds.size(); j++) {
+ if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) {
+ input = (LogicalRelationalOperator)preds.get(j);
+ subPlan.add(input);
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ break;
+ }
+ preds = currentPlan.getPredecessors(input);
+ }
+
+ if (input != join) {
+ Operator pred = currentPlan.getPredecessors(filter).get(0);
+ Operator succed = currentPlan.getSuccessors(filter).get(0);
+ subPlan.add(succed);
+
+ Pair<Integer, Integer> p1 = currentPlan.disconnect(pred, filter);
+ Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, succed);
+ currentPlan.connect(pred, p1.first, succed, p2.second);
+
+ succed = currentPlan.getSuccessors(input).get(0);
+ Pair<Integer, Integer> p3 = currentPlan.disconnect(input, succed);
+ currentPlan.connect(input, p3.first, filter, 0);
+ currentPlan.connect(filter, 0, succed, p3.second);
+
+ return;
+ }
+
+ List<Operator> l = currentPlan.getSuccessors(filter);
+ if (l != null) {
+ next = l.get(0);
+ } else {
+ next = null;
+ }
+ }
+ }
+
+ // check if a relational operator contains all of the specified uids
+ private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) {
+ LogicalSchema schema = op.getSchema();
+ for(long uid: uids) {
+ if (schema.findField(uid) == -1) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ // the pattern that this rule looks for
+ // is join -> filter
+ LogicalPlan plan = new LogicalPlan();
+ LogicalRelationalOperator op1 = new LOJoin(plan);
+ LogicalRelationalOperator op2 = new LOFilter(plan);
+ plan.add(op1);
+ plan.add(op2);
+ plan.connect(op1, op2);
+
+ return plan;
+ }
+}
+
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/SplitFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/SplitFilter.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/SplitFilter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/SplitFilter.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+public class SplitFilter extends Rule {
+
+ public SplitFilter(String n) {
+ super(n);
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new SplitFilterTransformer();
+ }
+
+ public class SplitFilterTransformer extends Transformer {
+ private OperatorSubPlan subPlan;
+
+ @Override
+ public boolean check(OperatorPlan matched) throws IOException {
+ LOFilter filter = (LOFilter)matched.getSources().get(0);
+ LogicalExpressionPlan cond = filter.getFilterPlan();
+ LogicalExpression root = (LogicalExpression) cond.getSources().get(0);
+ if (root instanceof AndExpression) {
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws IOException {
+ subPlan = new OperatorSubPlan(currentPlan);
+
+ // split one LOFilter into 2 by "AND"
+ LOFilter filter = (LOFilter)matched.getSources().get(0);
+ LogicalExpressionPlan cond = filter.getFilterPlan();
+ LogicalExpression root = (LogicalExpression) cond.getSources().get(0);
+ if (!(root instanceof AndExpression)) {
+ return;
+ }
+ LogicalExpressionPlan op1 = new LogicalExpressionPlan();
+ op1.add((LogicalExpression)cond.getSuccessors(root).get(0));
+ fillSubPlan(cond, op1, (LogicalExpression)cond.getSuccessors(root).get(0));
+
+ LogicalExpressionPlan op2 = new LogicalExpressionPlan();
+ op2.add((LogicalExpression)cond.getSuccessors(root).get(1));
+ fillSubPlan(cond, op2, (LogicalExpression)cond.getSuccessors(root).get(1));
+
+ filter.setFilterPlan(op1);
+ LOFilter filter2 = new LOFilter((LogicalPlan)currentPlan, op2);
+ currentPlan.add(filter2);
+
+ Operator succed = null;
+ try {
+ List<Operator> succeds = currentPlan.getSuccessors(filter);
+ if (succeds != null) {
+ succed = succeds.get(0);
+ subPlan.add(succed);
+ Pair<Integer, Integer> p = currentPlan.disconnect(filter, succed);
+ currentPlan.connect(filter2, 0, succed, p.second);
+ currentPlan.connect(filter, p.first, filter2, 0);
+ } else {
+ currentPlan.connect(filter, 0, filter2, 0);
+ }
+ }catch(Exception e) {
+ throw new IOException(e);
+ }
+
+ subPlan.add(filter);
+ subPlan.add(filter2);
+ Iterator<Operator> iter = filter2.getFilterPlan().getOperators();
+ while (iter.hasNext()) {
+ Operator oper = iter.next();
+ if (oper instanceof ProjectExpression) {
+ ((ProjectExpression)oper).setAttachedRelationalOp(filter2);
+ }
+ }
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ private void fillSubPlan(OperatorPlan origPlan,
+ OperatorPlan subPlan, Operator startOp) throws IOException {
+
+ List<Operator> l = origPlan.getSuccessors(startOp);
+ if (l != null) {
+ for(Operator le: l) {
+ subPlan.add(le);
+ subPlan.connect(startOp, le);
+ fillSubPlan(origPlan, subPlan, le);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ // the pattern that this rule looks for
+ // is filter
+ LogicalPlan plan = new LogicalPlan();
+ LogicalRelationalOperator op2 = new LOFilter(plan);
+ plan.add(op2);
+
+ return plan;
+ }
+}
+
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOStream;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+public class TypeCastInserter extends Rule {
+
+ private String operatorClassName;
+
+ public TypeCastInserter(String n, String operatorClassName) {
+ super(n);
+ this.operatorClassName = operatorClassName;
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ // the pattern that this rule looks for is load
+ LogicalPlan plan = new LogicalPlan();
+ LogicalRelationalOperator op = new LOLoad(null, null, plan, null);
+ plan.add(op);
+ return plan;
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new TypeCastInserterTransformer();
+ }
+
+ public class TypeCastInserterTransformer extends Transformer {
+ @Override
+ public boolean check(OperatorPlan matched) throws IOException {
+ LogicalRelationalOperator op = (LogicalRelationalOperator)matched.getSources().get(0);
+ LogicalSchema s = op.getSchema();
+ if (s == null) return false;
+
+ if (((LOLoad)op).isCastInserted()) return false;
+
+ boolean sawOne = false;
+ List<LogicalSchema.LogicalFieldSchema> fss = s.getFields();
+ LogicalSchema determinedSchema = null;
+ if(LOLoad.class.getName().equals(operatorClassName)) {
+ determinedSchema = ((LOLoad)op).getDeterminedSchema();
+ }
+ for (int i = 0; i < fss.size(); i++) {
+ if (fss.get(i).type != DataType.BYTEARRAY) {
+ if(determinedSchema == null ||
+ (!fss.get(i).isEqual(determinedSchema.getField(i)))) {
+ // Either no schema was determined by loader OR the type
+ // from the "determinedSchema" is different
+ // from the type specified - so we need to cast
+ sawOne = true;
+ }
+ }
+ }
+
+ // If all we've found are byte arrays, we don't need a projection.
+ return sawOne;
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws IOException {
+ LogicalRelationalOperator op = (LogicalRelationalOperator)matched.getSources().get(0);
+ LogicalSchema s = op.getSchema();
+ // For every field, build a logical plan. If the field has a type
+ // other than byte array, then the plan will be cast(project). Else
+ // it will just be project.
+ LogicalPlan innerPlan = new LogicalPlan();
+
+ LOForEach foreach = new LOForEach(currentPlan);
+ foreach.setInnerPlan(innerPlan);
+ foreach.setAlias(op.getAlias());
+
+ // Insert the foreach into the plan and patch up the plan.
+ Operator next = currentPlan.getSuccessors(op).get(0);
+ Pair<Integer,Integer> disconnectedPos = currentPlan.disconnect(op, next);
+ currentPlan.add(foreach);
+ currentPlan.connect(op, disconnectedPos.first.intValue(), foreach, 0 );
+ currentPlan.connect(foreach, 0, next, disconnectedPos.second.intValue());
+
+ List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
+ LOGenerate gen = new LOGenerate(innerPlan, exps, new boolean[s.size()]);
+ innerPlan.add(gen);
+
+ // if we are inserting casts in a load and if the loader
+ // implements determineSchema(), insert casts only where necessary
+ // Note that in this case, the data coming out of the loader is not
+ // a BYTEARRAY but is whatever determineSchema() says it is.
+ LogicalSchema determinedSchema = null;
+ if(LOLoad.class.getName().equals(operatorClassName)) {
+ determinedSchema = ((LOLoad)op).getDeterminedSchema();
+ }
+ for (int i = 0; i < s.size(); i++) {
+ LogicalSchema.LogicalFieldSchema fs = s.getField(i);
+
+ LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, i);
+ innerPlan.add(innerLoad);
+ innerPlan.connect(innerLoad, gen);
+
+ LogicalExpressionPlan exp = new LogicalExpressionPlan();
+
+ ProjectExpression prj = new ProjectExpression(exp, i, 0, gen);
+ exp.add(prj);
+
+ if (fs.type != DataType.BYTEARRAY && (determinedSchema == null || (fs.isEqual(determinedSchema.getField(i))))) {
+ // Either no schema was determined by loader OR the type
+ // from the "determinedSchema" is different
+ // from the type specified - so we need to cast
+ CastExpression cast = new CastExpression(exp, prj, new LogicalSchema.LogicalFieldSchema(fs));
+ exp.add(cast);
+ FuncSpec loadFuncSpec = null;
+ if(op instanceof LOLoad) {
+ loadFuncSpec = ((LOLoad)op).getFileSpec().getFuncSpec();
+ } else if (op instanceof LOStream) {
+ StreamingCommand command = ((LOStream)op).getStreamingCommand();
+ HandleSpec streamOutputSpec = command.getOutputSpec();
+ loadFuncSpec = new FuncSpec(streamOutputSpec.getSpec());
+ } else {
+ String msg = "TypeCastInserter invoked with an invalid operator class name: " + innerPlan.getClass().getSimpleName();
+ throw new IOException(msg);
+ }
+ cast.setFuncSpec(loadFuncSpec);
+ }
+ exps.add(exp);
+ }
+ ((LOLoad)op).setCastInserted(true);
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return currentPlan;
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/WholePlanRule.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/WholePlanRule.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/WholePlanRule.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/WholePlanRule.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.optimizer.Rule;
+
+/**
+ * Super class for all rules that operates on the whole plan. It doesn't look for
+ * a specific pattern. An example of such kind rule is ColumnPrune.
+ *
+ */
+public abstract class WholePlanRule extends Rule {
+
+ public WholePlanRule(String n) {
+ super(n);
+ }
+
+ public List<OperatorPlan> match(OperatorPlan plan) {
+ currentPlan = plan;
+ List<OperatorPlan> ll = new ArrayList<OperatorPlan>();
+ ll.add(plan);
+ return ll;
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ return null;
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.optimizer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.newplan.OperatorPlan;
+
+/**
+ * The core class of the optimizer. The basic design of this class is that it
+ * is provided a list of RuleSets. RuleSets represent all of the optimizer
+ * rules that can be run together. The rules in the RuleSet will be run
+ * repeatedly until either no rule in the RuleSet passes check and calls
+ * transform or until maxIter iterations (default 500) has been made over
+ * the RuleSet. Then the next RuleSet will be moved to. Once finished,
+ * a given RuleSet is never returned to.
+ *
+ * Each rule is has two parts: a pattern and and associated transformer.
+ * Transformers have two important functions: check(), and transform().
+ * The pattern describes a pattern of node types that the optimizer will
+ * look to match. If that match is found anywhere in the plan, then check()
+ * will be called. check() allows the rule to look more in depth at the
+ * matched pattern and decide whether the rule should be run or not. For
+ * example, one might design a rule to push filters above join that would
+ * look for the pattern filter(join) (meaning a filter followed by a join).
+ * But only certain types of filters can be pushed. The check() function
+ * would need to decide whether the filter that it found was pushable or not.
+ * If check() returns true, the rule is said to have matched, and transform()
+ * is then called. This function is responsible for making changes in the
+ * logical plan. Once transform is complete PlanPatcher.patchUp will be
+ * called to do any necessary cleanup in the plan, such as resetting
+ * schemas, etc.
+ */
+public abstract class PlanOptimizer {
+
+ protected List<Set<Rule>> ruleSets;
+ protected OperatorPlan plan;
+ protected List<PlanTransformListener> listeners;
+ protected int maxIter;
+
+ static final int defaultIterations = 500;
+
+ /**
+ * @param p Plan to optimize
+ * @param rs List of RuleSets to use to optimize
+ * @param iterations maximum number of optimization iterations,
+ * set to -1 for default
+ */
+ protected PlanOptimizer(OperatorPlan p,
+ List<Set<Rule>> rs,
+ int iterations) {
+ plan = p;
+ ruleSets = rs;
+ listeners = new ArrayList<PlanTransformListener>();
+ maxIter = (iterations < 1 ? defaultIterations : iterations);
+ }
+
+ /**
+ * Adds a listener to the optimization. This listener will be fired
+ * after each rule transforms a plan. Listeners are guaranteed to
+ * be fired in the order they are added.
+ * @param listener
+ */
+ protected void addPlanTransformListener(PlanTransformListener listener) {
+ listeners.add(listener);
+ }
+
+ /**
+ * Run the optimizer. This method attempts to match each of the Rules
+ * against the plan. If a Rule matches, it then calls the check
+ * method of the associated Transformer to give the it a chance to
+ * check whether it really wants to do the optimization. If that
+ * returns true as well, then Transformer.transform is called.
+ * @throws OptimizerException
+ */
+ public void optimize() throws IOException {
+
+ for (Set<Rule> rs : ruleSets) {
+ boolean sawMatch = false;
+ int numIterations = 0;
+ do {
+ sawMatch = false;
+ for (Rule rule : rs) {
+ List<OperatorPlan> matches = rule.match(plan);
+ if (matches != null) {
+ Transformer transformer = rule.getNewTransformer();
+ for (OperatorPlan m : matches) {
+ if (transformer.check(m)) {
+ sawMatch = true;
+ transformer.transform(m);
+ for(PlanTransformListener l: listeners) {
+ l.transformed(plan, transformer.reportChanges());
+ }
+ }
+ }
+ }
+ }
+ } while(sawMatch && ++numIterations < maxIter);
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/PlanTransformListener.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/PlanTransformListener.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/PlanTransformListener.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/PlanTransformListener.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.optimizer;
+
+import java.io.IOException;
+
+import org.apache.pig.newplan.OperatorPlan;
+
+/**
+ * A listener class that patches up plans after they have been transformed.
+ */
+public interface PlanTransformListener {
+ /**
+ * the listener that is notified after a plan is transformed
+ * @param fp the full plan that has been transformed
+ * @param tp a plan containing only the operators that have been transformed
+ * @throws IOException
+ */
+ public void transformed(OperatorPlan fp, OperatorPlan tp) throws IOException;
+
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Rule.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Rule.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Rule.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Rule.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.optimizer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+
+/**
+ * Rules describe a pattern of operators. They also reference a Transformer.
+ * If the pattern of operators is found one or more times in the provided plan,
+ * then the optimizer will use the associated Transformer to transform the
+ * plan.
+ */
+public abstract class Rule {
+
+ protected String name = null;
+ protected OperatorPlan pattern;
+ transient protected OperatorPlan currentPlan;
+ protected static final Log log = LogFactory.getLog(Rule.class);
+ private transient Set<Operator> matchedNodes = new HashSet<Operator>();
+
+ /**
+ * Create this rule by using the default pattern that this rule provided
+ * @param n Name of this rule
+ */
+ public Rule(String n) {
+ name = n;
+ pattern = buildPattern();
+ }
+
+ /**
+ * @param n Name of this rule
+ * @param p Pattern to look for.
+ */
+ public Rule(String n, OperatorPlan p) {
+ name = n;
+ pattern = p;
+ }
+
+ /**
+ * Build the pattern that this rule will look for
+ * @return the pattern to look for by this rule
+ */
+ abstract protected OperatorPlan buildPattern();
+
+ /**
+ * Get the transformer for this rule. Abstract because the rule
+ * may want to choose how to instantiate the transformer.
+ * This should never return a cached transformer, it should
+ * always return a fresh one with no state.
+ * @return Transformer to use with this rule
+ */
+ abstract public Transformer getNewTransformer();
+
+ /**
+ * Return the pattern to be matched for this rule
+ * @return the pattern to be matched for this rule
+ */
+ public OperatorPlan getPattern() {
+ return pattern;
+ }
+
+ /**
+ * Search for all the sub-plans that matches the pattern
+ * defined by this rule.
+ * @return A list of all matched sub-plans. The returned plans are
+ * partial views of the original OperatorPlan. Each is a
+ * sub-set of the original plan and represents the same
+ * topology as the pattern, but operators in the returned plan
+ * are the same objects as the original plan. Therefore,
+ * a call getPlan() from any node in the return plan would
+ * return the original plan.
+ *
+ * @param plan the OperatorPlan to look for matches to the pattern
+ */
+ public List<OperatorPlan> match(OperatorPlan plan) {
+ currentPlan = plan;
+
+ List<Operator> leaves = pattern.getSinks();
+
+ Iterator<Operator> iter = plan.getOperators();
+ List<OperatorPlan> matchedList = new ArrayList<OperatorPlan>();
+ matchedNodes.clear();
+
+ while(iter.hasNext()) {
+ Operator op = iter.next();
+
+ // find a node that matches the first leaf of the pattern
+ if (match(op, leaves.get(0))) {
+ List<Operator> planOps = new ArrayList<Operator>();
+ planOps.add(op);
+
+ // if there is more than 1 leaves in the pattern, we check
+ // if other leaves match the siblings of this node
+ if (leaves.size()>1) {
+ boolean matched = true;
+
+
+ List<Operator> preds = null;
+ try {
+ preds = plan.getPredecessors(op);
+ }catch(IOException e) {
+ // not going to happen
+ }
+
+ // if this node has no predecessor, it must be a root
+ if (preds == null) {
+ preds = new ArrayList<Operator>();
+ preds.add(null);
+ }
+
+ for(Operator s: preds) {
+ matched = true;
+ List<Operator> siblings = null;
+ try {
+ if (s != null) {
+ siblings = plan.getSuccessors(s);
+ }else{
+ // for a root, we get its siblings by getting all roots
+ siblings = plan.getSources();
+ }
+ }catch(IOException e) {
+ // not going to happen
+ throw new RuntimeException(e);
+ }
+ int index = siblings.indexOf(op);
+ if (siblings.size()-index < leaves.size()) {
+ continue;
+ }
+
+
+ for(int j=1; j<leaves.size(); j++) {
+ if (!match(siblings.get(index+j), leaves.get(j))) {
+ matched = false;
+ break;
+ }
+ }
+
+ if (matched) {
+ for(int j=1; j<leaves.size(); j++) {
+ planOps.add(siblings.get(index+j));
+ break;
+ }
+ }
+
+ }
+
+ // we have move on to next operator as this one doesn't have siblings to
+ // match all the leaves
+ if (!matched) {
+ continue;
+ }
+ }
+
+
+ PatternMatchOperatorPlan match = new PatternMatchOperatorPlan(plan);
+ try {
+ if (match.check(planOps)) {
+ // we find a matched pattern,
+ // add the operators into matchedNodes
+ Iterator<Operator> iter2 = match.getOperators();
+ while(iter2.hasNext()) {
+ Operator opt = iter2.next();
+ matchedNodes.add(opt);
+ }
+
+ // add pattern
+ matchedList.add(match);
+ }
+ }catch(IOException e) {
+ log.error("Failed to search for optmization pattern. ", e);
+ }
+ }
+ }
+
+ return matchedList;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Check if two operators match each other, we want to find matches
+ * that don't share nodes
+ */
+ private boolean match(Operator planNode, Operator patternNode) {
+ return planNode.getClass().equals(patternNode.getClass()) && !matchedNodes.contains(planNode);
+ }
+
+
+ private class PatternMatchOperatorPlan extends OperatorSubPlan {
+
+ public PatternMatchOperatorPlan(OperatorPlan basePlan) {
+ super(basePlan);
+ }
+
+ protected boolean check(List<Operator> planOps) throws IOException {
+ List<Operator> patternOps = pattern.getSinks();
+ if (planOps.size() != patternOps.size()) {
+ return false;
+ }
+
+ for(int i=0; i<planOps.size(); i++) {
+ Stack<Operator> s = new Stack<Operator>();
+ if (!check(planOps.get(i), patternOps.get(i), s)) {
+ return false;
+ }
+ Iterator<Operator> iter = s.iterator();
+ while(iter.hasNext()) {
+ add(iter.next());
+ }
+ }
+
+ if (size() == pattern.size()) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Check if the plan operator and its sub-tree has a match to the pattern
+ * operator and its sub-tree. This algorithm only search and return one match.
+ * It doesn't recursively search for all possible matches. For example,
+ * for a plan that looks like
+ * join
+ * / \
+ * load load
+ * if we are looking for join->load pattern, only one match will be returned instead
+ * of two, so that the matched subsets don't share nodes.
+ */
+ private boolean check(Operator planOp, Operator patternOp, Stack<Operator> opers) throws IOException {
+ if (!match(planOp, patternOp)) {
+ return false;
+ }
+
+ // check if their predecessors match
+ List<Operator> preds1 = getBasePlan().getPredecessors(planOp);
+ List<Operator> preds2 = pattern.getPredecessors(patternOp);
+ if (preds1 == null && preds2 != null) {
+ return false;
+ }
+
+ if (preds1 != null && preds2 != null && preds1.size() < preds2.size()) {
+ return false;
+ }
+
+ // we've reached the root of the pattern, so a match is found
+ if (preds2 == null || preds2.size() == 0) {
+ opers.push(planOp);
+ return true;
+ }
+
+ int index = 0;
+ // look for predecessors
+ while(index < preds1.size()) {
+ boolean match = true;
+ if (match(preds1.get(index), preds2.get(0))) {
+ if ( (preds1.size() - index) < preds2.size()) {
+ return false;
+ }
+
+ int oldSize = opers.size();
+ for(int i=0; i<preds2.size(); i++) {
+ if (!check(preds1.get(index+i), preds2.get(i), opers)) {
+ for(int j=opers.size(); j>oldSize; j--) {
+ opers.pop();
+ }
+ match = false;
+ break;
+ }
+ }
+ if (match) {
+ opers.push(planOp);
+ return true;
+ }
+ }
+ index++;
+ }
+
+ return false;
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Transformer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Transformer.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Transformer.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Transformer.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.optimizer;
+
+import java.io.IOException;
+
+import org.apache.pig.newplan.OperatorPlan;
+
+public abstract class Transformer {
+
+ /**
+ * check if the transform should be done. If this is being called then
+ * the pattern matches, but there may be other criteria that must be met
+ * as well.
+ * @param matched the sub-set of the plan that matches the pattern. This
+ * subset has the same graph as the pattern, but the operators
+ * point to the same objects as the plan to be matched.
+ * @return true if the transform should be done.
+ * @throws IOException
+ */
+ public abstract boolean check(OperatorPlan matched) throws IOException;
+
+ /**
+ * Transform the tree
+ * @param matched the sub-set of the plan that matches the pattern. This
+ * subset has the same graph as the pattern, but the operators
+ * point to the same objects as the plan to be matched.
+ * @throws IOException
+ */
+ public abstract void transform(OperatorPlan matched) throws IOException;
+
+ /**
+ * Report what parts of the tree were transformed. This is so that
+ * listeners can know which part of the tree to visit and modify
+ * schemas, annotations, etc. So any nodes that were removed need
+ * will not be in this plan, only nodes that were added or moved.
+ * @return OperatorPlan that describes just the changed nodes.
+ */
+ public abstract OperatorPlan reportChanges();
+
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=982345&r1=982344&r2=982345&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Wed Aug 4 17:46:42 2010
@@ -46,8 +46,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
-import org.apache.pig.experimental.plan.Operator;
-import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=982345&r1=982344&r2=982345&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Wed Aug 4 17:46:42 2010
@@ -43,14 +43,14 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.InternalCachedBag;
-import org.apache.pig.experimental.plan.BaseOperatorPlan;
-import org.apache.pig.experimental.plan.Operator;
-import org.apache.pig.experimental.plan.OperatorPlan;
-import org.apache.pig.experimental.plan.PlanVisitor;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.newplan.BaseOperatorPlan;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
import org.apache.pig.tools.pigstats.JobStats.JobState;
/**
@@ -216,7 +216,7 @@ public final class PigStats {
protected JobGraphPrinter(OperatorPlan plan) {
super(plan,
- new org.apache.pig.experimental.plan.DependencyOrderWalker(
+ new org.apache.pig.newplan.DependencyOrderWalker(
plan));
buf = new StringBuffer();
}
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=982345&r1=982344&r2=982345&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Wed Aug 4 17:46:42 2010
@@ -64,7 +64,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.experimental.plan.Operator;
import org.apache.pig.impl.logicalLayer.LOCogroup;
import org.apache.pig.impl.logicalLayer.LOCross;
import org.apache.pig.impl.logicalLayer.LODistinct;
@@ -84,6 +83,7 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.newplan.Operator;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
/**
Added: hadoop/pig/trunk/test/newlogicalplan-tests
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/newlogicalplan-tests?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/test/newlogicalplan-tests (added)
+++ hadoop/pig/trunk/test/newlogicalplan-tests Wed Aug 4 17:46:42 2010
@@ -0,0 +1,11 @@
+**/TestNewPlanColumnPrune.java
+**/TestNewPlanFilterAboveForeach.java
+**/TestNewPlanFilterRule.java
+**/TestNewPlanListener.java
+**/TestNewPlanLogicalOptimizer.java
+**/TestNewPlanLogToPhyTranslationVisitor.java
+**/TestNewPlanOperatorPlan.java
+**/TestNewPlanPruneMapKeys.java
+**/TestNewPlanRule.java
+**/TestLogicalPlanMigrationVisitor.java
+**/TestNewPlanColumnPrune2.java
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java?rev=982345&r1=982344&r2=982345&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java Wed Aug 4 17:46:42 2010
@@ -28,31 +28,30 @@ import java.util.Set;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.data.DataType;
-import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
-import org.apache.pig.experimental.logical.expression.AndExpression;
-import org.apache.pig.experimental.logical.expression.CastExpression;
-import org.apache.pig.experimental.logical.expression.ConstantExpression;
-import org.apache.pig.experimental.logical.expression.EqualExpression;
-import org.apache.pig.experimental.logical.expression.LogicalExpression;
-import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
-import org.apache.pig.experimental.logical.expression.ProjectExpression;
-import org.apache.pig.experimental.logical.optimizer.UidStamper;
-import org.apache.pig.experimental.logical.relational.LOCogroup;
-import org.apache.pig.experimental.logical.relational.LOForEach;
-import org.apache.pig.experimental.logical.relational.LOGenerate;
-import org.apache.pig.experimental.logical.relational.LOInnerLoad;
-import org.apache.pig.experimental.logical.relational.LOJoin;
-import org.apache.pig.experimental.logical.relational.LOLoad;
-import org.apache.pig.experimental.logical.relational.LOStore;
-import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
-import org.apache.pig.experimental.logical.relational.LogicalSchema;
-import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
-import org.apache.pig.experimental.plan.Operator;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
import org.apache.pig.test.utils.LogicalPlanTester;
import junit.framework.TestCase;
@@ -67,19 +66,19 @@ public class TestLogicalPlanMigrationVis
LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
// check basics
- org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+ org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migratePlan(plan);
assertEquals(3, newPlan.size());
assertEquals(newPlan.getSources().size(), 1);
// check load
LogicalRelationalOperator op = (LogicalRelationalOperator)newPlan.getSources().get(0);
- assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOLoad.class);
+ assertEquals(op.getClass(), org.apache.pig.newplan.logical.relational.LOLoad.class);
// check filter
op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
- assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOFilter.class);
+ assertEquals(op.getClass(), org.apache.pig.newplan.logical.relational.LOFilter.class);
- LogicalExpressionPlan exp = ((org.apache.pig.experimental.logical.relational.LOFilter)op).getFilterPlan();
+ LogicalExpressionPlan exp = ((org.apache.pig.newplan.logical.relational.LOFilter)op).getFilterPlan();
EqualExpression eq = (EqualExpression)exp.getSources().get(0);
assertEquals(eq.getLhs().getClass(), ProjectExpression.class);
@@ -90,7 +89,7 @@ public class TestLogicalPlanMigrationVis
// check store
op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
- assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOStore.class);
+ assertEquals(op.getClass(), org.apache.pig.newplan.logical.relational.LOStore.class);
}
public void testPlanWithCast() throws Exception {
@@ -100,19 +99,19 @@ public class TestLogicalPlanMigrationVis
LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
// check basics
- org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+ org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migratePlan(plan);
assertEquals(3, newPlan.size());
assertEquals(newPlan.getSources().size(), 1);
// check load
LogicalRelationalOperator op = (LogicalRelationalOperator)newPlan.getSources().get(0);
- assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOLoad.class);
+ assertEquals(op.getClass(), org.apache.pig.newplan.logical.relational.LOLoad.class);
// check filter
op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
- assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOFilter.class);
+ assertEquals(op.getClass(), org.apache.pig.newplan.logical.relational.LOFilter.class);
- LogicalExpressionPlan exp = ((org.apache.pig.experimental.logical.relational.LOFilter)op).getFilterPlan();
+ LogicalExpressionPlan exp = ((org.apache.pig.newplan.logical.relational.LOFilter)op).getFilterPlan();
EqualExpression eq = (EqualExpression)exp.getSources().get(0);
assertEquals(eq.getLhs().getClass(), CastExpression.class);
@@ -127,7 +126,7 @@ public class TestLogicalPlanMigrationVis
// check store
op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
- assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOStore.class);
+ assertEquals(op.getClass(), org.apache.pig.newplan.logical.relational.LOStore.class);
}
public void testJoinPlan() throws Exception {
@@ -139,21 +138,21 @@ public class TestLogicalPlanMigrationVis
LogicalPlan plan = lpt.buildPlan("store d into 'empty';");
// check basics
- org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+ org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migratePlan(plan);
assertEquals(5, newPlan.size());
assertEquals(newPlan.getSources().size(), 2);
// check load and join
LogicalRelationalOperator op = (LogicalRelationalOperator)newPlan.getSuccessors(newPlan.getSources().get(0)).get(0);
- assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOJoin.class);
+ assertEquals(op.getClass(), org.apache.pig.newplan.logical.relational.LOJoin.class);
assertEquals(((LOJoin)op).getJoinType(), LOJoin.JOINTYPE.HASH);
LogicalRelationalOperator l1 = (LogicalRelationalOperator)newPlan.getPredecessors(op).get(0);
- assertEquals(l1.getClass(), org.apache.pig.experimental.logical.relational.LOLoad.class);
+ assertEquals(l1.getClass(), org.apache.pig.newplan.logical.relational.LOLoad.class);
assertEquals(l1.getAlias(), "a");
LogicalRelationalOperator l2 = (LogicalRelationalOperator)newPlan.getPredecessors(op).get(1);
- assertEquals(l2.getClass(), org.apache.pig.experimental.logical.relational.LOLoad.class);
+ assertEquals(l2.getClass(), org.apache.pig.newplan.logical.relational.LOLoad.class);
assertEquals(l2.getAlias(), "b");
// check join input plans
@@ -175,8 +174,8 @@ public class TestLogicalPlanMigrationVis
// check filter
op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
- assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOFilter.class);
- LogicalExpressionPlan exp = ((org.apache.pig.experimental.logical.relational.LOFilter)op).getFilterPlan();
+ assertEquals(op.getClass(), org.apache.pig.newplan.logical.relational.LOFilter.class);
+ LogicalExpressionPlan exp = ((org.apache.pig.newplan.logical.relational.LOFilter)op).getFilterPlan();
AndExpression ae = (AndExpression)exp.getSources().get(0);
@@ -196,7 +195,7 @@ public class TestLogicalPlanMigrationVis
// check store
op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
- assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOStore.class);
+ assertEquals(op.getClass(), org.apache.pig.newplan.logical.relational.LOStore.class);
}
public void testForeachPlan() throws Exception {
@@ -206,32 +205,34 @@ public class TestLogicalPlanMigrationVis
LogicalPlan plan = lpt.buildPlan("store b into '/test/empty';");
// check basics
- org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+ org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migratePlan(plan);
- org.apache.pig.experimental.logical.relational.LogicalPlan expected =
- new org.apache.pig.experimental.logical.relational.LogicalPlan();
+ org.apache.pig.newplan.logical.relational.LogicalPlan expected =
+ new org.apache.pig.newplan.logical.relational.LogicalPlan();
LogicalSchema aschema = new LogicalSchema();
aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY));
aschema.addField(new LogicalSchema.LogicalFieldSchema("d", null, DataType.BYTEARRAY));
- LOLoad load = new LOLoad(new FileSpec("/test/d.txt", new FuncSpec("org.apache.pig.builtin.PigStorage")), aschema, expected);
+ LOLoad load = new LOLoad(new FileSpec("/test/d.txt", new FuncSpec("org.apache.pig.builtin.PigStorage")), aschema, expected, null);
expected.add(load);
LOForEach foreach = new LOForEach(expected);
- org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = new org.apache.pig.experimental.logical.relational.LogicalPlan();
+ org.apache.pig.newplan.logical.relational.LogicalPlan innerPlan = new org.apache.pig.newplan.logical.relational.LogicalPlan();
LOInnerLoad l1 = new LOInnerLoad(innerPlan, foreach, 0);
innerPlan.add(l1);
LOInnerLoad l2 = new LOInnerLoad(innerPlan, foreach, 1);
+ innerPlan.add(l2);
List<LogicalExpressionPlan> eps = new ArrayList<LogicalExpressionPlan>();
+ LOGenerate gen = new LOGenerate(innerPlan, eps, new boolean[] {false, true});
LogicalExpressionPlan p1 = new LogicalExpressionPlan();
- p1.add(new ProjectExpression(p1, DataType.BYTEARRAY, 0, 0));
+ p1.add(new ProjectExpression(p1, 0, -1, gen));
LogicalExpressionPlan p2 = new LogicalExpressionPlan();
- p2.add(new ProjectExpression(p2, DataType.BYTEARRAY, 1, 0));
+ p2.add(new ProjectExpression(p2, 1, -1, gen));
eps.add(p1);
eps.add(p2);
- LOGenerate gen = new LOGenerate(innerPlan, eps, new boolean[] {false, true});
+
innerPlan.add(gen);
innerPlan.connect(l1, gen);
innerPlan.connect(l2, gen);
@@ -246,13 +247,6 @@ public class TestLogicalPlanMigrationVis
expected.connect(load, foreach);
expected.connect(foreach, s);
- try {
- UidStamper stamper = new UidStamper(expected);
- stamper.visit();
- }catch(Exception e) {
- throw new VisitorException(e);
- }
-
assertTrue(expected.isEqual(newPlan));
LogicalSchema schema = foreach.getSchema();
@@ -268,7 +262,7 @@ public class TestLogicalPlanMigrationVis
lpt.buildPlan("a = load '/test/d.txt' as (id, d:tuple(v, s));");
LogicalPlan plan = lpt.buildPlan("b = foreach a generate id, FLATTEN(d);");
- org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+ org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migratePlan(plan);
LogicalRelationalOperator op = (LogicalRelationalOperator)newPlan.getSinks().get(0);
LogicalSchema s2 = new LogicalSchema();
@@ -338,10 +332,10 @@ public class TestLogicalPlanMigrationVis
LogicalPlan plan = lpt.buildPlan("store b into '/test/empty';");
// check basics
- org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+ org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migratePlan(plan);
- org.apache.pig.experimental.logical.relational.LogicalPlan expected =
- new org.apache.pig.experimental.logical.relational.LogicalPlan();
+ org.apache.pig.newplan.logical.relational.LogicalPlan expected =
+ new org.apache.pig.newplan.logical.relational.LogicalPlan();
LogicalSchema aschema = new LogicalSchema();
aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY));
@@ -350,26 +344,28 @@ public class TestLogicalPlanMigrationVis
aschema3.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.INTEGER));
aschema3.addField(new LogicalSchema.LogicalFieldSchema("s", null, DataType.BYTEARRAY));
aschema2.addField(new LogicalSchema.LogicalFieldSchema("t", aschema3, DataType.TUPLE));
+ aschema2.setTwoLevelAccessRequired(true);
aschema.addField(new LogicalSchema.LogicalFieldSchema("d", aschema2, DataType.BAG));
- LOLoad load = new LOLoad(new FileSpec("/test/d.txt", new FuncSpec("org.apache.pig.builtin.PigStorage")), aschema, expected);
+ LOLoad load = new LOLoad(new FileSpec("/test/d.txt", new FuncSpec("org.apache.pig.builtin.PigStorage")), aschema, expected, null);
expected.add(load);
LOForEach foreach2 = new LOForEach(expected);
- org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = new org.apache.pig.experimental.logical.relational.LogicalPlan();
+ org.apache.pig.newplan.logical.relational.LogicalPlan innerPlan = new org.apache.pig.newplan.logical.relational.LogicalPlan();
LOInnerLoad l1 = new LOInnerLoad(innerPlan, foreach2, 0);
innerPlan.add(l1);
LOInnerLoad l2 = new LOInnerLoad(innerPlan, foreach2, 1);
+ innerPlan.add(l2);
List<LogicalExpressionPlan> eps = new ArrayList<LogicalExpressionPlan>();
+ LOGenerate gen = new LOGenerate(innerPlan, eps, new boolean[] {false, true});
LogicalExpressionPlan p1 = new LogicalExpressionPlan();
- new ProjectExpression(p1, DataType.BYTEARRAY, 0, 0);
+ new ProjectExpression(p1, 0, -1, gen);
LogicalExpressionPlan p2 = new LogicalExpressionPlan();
- new ProjectExpression(p2, DataType.BAG, 1, 0);
+ new ProjectExpression(p2, 1, -1, gen);
eps.add(p1);
eps.add(p2);
- LOGenerate gen = new LOGenerate(innerPlan, eps, new boolean[] {false, true});
innerPlan.add(gen);
innerPlan.connect(l1, gen);
innerPlan.connect(l2, gen);
@@ -384,12 +380,10 @@ public class TestLogicalPlanMigrationVis
expected.connect(load, foreach2);
expected.connect(foreach2, s);
- try {
- UidStamper stamper = new UidStamper(expected);
- stamper.visit();
- }catch(Exception e) {
- throw new VisitorException(e);
- }
+
+ System.out.println(newPlan);
+
+ System.out.println(expected);
assertTrue(expected.isEqual(newPlan));
@@ -410,7 +404,7 @@ public class TestLogicalPlanMigrationVis
LogicalPlan plan = lpt.buildPlan("store b into '/test/empty';");
// check basics
- org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+ org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migratePlan(plan);
LogicalSchema loadSchema =
((LogicalRelationalOperator)newPlan.getSources().get(0)).getSchema();
@@ -455,7 +449,7 @@ public class TestLogicalPlanMigrationVis
assertEquals( 1, exprPlan.getSinks().size() );
assertEquals( ProjectExpression.class, exprPlan.getSinks().get(0).getClass() );
ProjectExpression prj = (ProjectExpression) exprPlan.getSinks().get(0);
- assertEquals( loadSchema.getField(0).uid, prj.getUid() );
+ assertEquals( loadSchema.getField(0).uid, prj.getFieldSchema().uid );
assertEquals( 0, prj.getColNum() );
assertEquals( 0, prj.getInputNum() );
}
@@ -467,7 +461,7 @@ public class TestLogicalPlanMigrationVis
LogicalPlan plan = lpt.buildPlan("store b into '/test/empty';");
// check basics
- org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+ org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migratePlan(plan);
LogicalSchema loadSchema =
((LogicalRelationalOperator)newPlan.getSources().get(0)).getSchema();
@@ -519,7 +513,7 @@ public class TestLogicalPlanMigrationVis
assertEquals( 1, exprPlan.getSinks().size() );
assertEquals( ProjectExpression.class, exprPlan.getSinks().get(0).getClass() );
ProjectExpression prj = (ProjectExpression) exprPlan.getSinks().get(0);
- assertEquals( loadSchema.getField(0).uid, prj.getUid() );
+ assertEquals( loadSchema.getField(0).uid, prj.getFieldSchema().uid );
assertEquals( 0, prj.getColNum() );
assertEquals( 0, prj.getInputNum() );
@@ -527,7 +521,7 @@ public class TestLogicalPlanMigrationVis
assertEquals( 1, exprPlan2.getSinks().size() );
assertEquals( ProjectExpression.class, exprPlan2.getSinks().get(0).getClass() );
ProjectExpression prj2 = (ProjectExpression) exprPlan2.getSinks().get(0);
- assertEquals( loadSchema.getField(1).uid, prj2.getUid() );
+ assertEquals( loadSchema.getField(1).uid, prj2.getFieldSchema().uid );
assertEquals( 1, prj2.getColNum() );
assertEquals( 0, prj2.getInputNum() );
}
@@ -540,7 +534,7 @@ public class TestLogicalPlanMigrationVis
LogicalPlan plan = lpt.buildPlan("store c into '/test/empty';");
// check basics
- org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+ org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migratePlan(plan);
assertEquals( LOCogroup.class, newPlan.getSuccessors( newPlan.getSources().get(0) ).get(0).getClass() );
LOCogroup cogroup = (LOCogroup) newPlan.getSuccessors( newPlan.getSources().get(0) ).get(0);
@@ -604,7 +598,7 @@ public class TestLogicalPlanMigrationVis
assertEquals( 1, exprPlan.getSinks().size() );
assertEquals( ProjectExpression.class, exprPlan.getSinks().get(0).getClass() );
ProjectExpression prj = (ProjectExpression) exprPlan.getSinks().get(0);
- assertEquals( loadSchema.getField(0).uid, prj.getUid() );
+ assertEquals( loadSchema.getField(0).uid, prj.getFieldSchema().uid );
assertEquals( 0, prj.getColNum() );
assertEquals( 0, prj.getInputNum() );
@@ -612,7 +606,7 @@ public class TestLogicalPlanMigrationVis
assertEquals( 1, exprPlan2.getSinks().size() );
assertEquals( ProjectExpression.class, exprPlan2.getSinks().get(0).getClass() );
ProjectExpression prj2 = (ProjectExpression) exprPlan2.getSinks().get(0);
- assertEquals( load2Schema.getField(0).uid, prj2.getUid() );
+ assertEquals( load2Schema.getField(0).uid, prj2.getFieldSchema().uid );
assertEquals( 0, prj2.getColNum() );
assertEquals( 1, prj2.getInputNum() );
}
@@ -625,7 +619,7 @@ public class TestLogicalPlanMigrationVis
LogicalPlan plan = lpt.buildPlan("store c into '/test/empty';");
// check basics
- org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+ org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migratePlan(plan);
assertEquals( LOCogroup.class, newPlan.getSuccessors( newPlan.getSources().get(0) ).get(0).getClass() );
LOCogroup cogroup = (LOCogroup) newPlan.getSuccessors( newPlan.getSources().get(0) ).get(0);
@@ -700,7 +694,7 @@ public class TestLogicalPlanMigrationVis
assertEquals( 1, exprPlan.getSinks().size() );
assertEquals( ProjectExpression.class, exprPlan.getSinks().get(0).getClass() );
ProjectExpression prj = (ProjectExpression) exprPlan.getSinks().get(0);
- assertEquals( loadSchema.getField(0).uid, prj.getUid() );
+ assertEquals( loadSchema.getField(0).uid, prj.getFieldSchema().uid );
assertEquals( 0, prj.getColNum() );
assertEquals( 0, prj.getInputNum() );
@@ -708,7 +702,7 @@ public class TestLogicalPlanMigrationVis
assertEquals( 1, exprPlan2.getSinks().size() );
assertEquals( ProjectExpression.class, exprPlan2.getSinks().get(0).getClass() );
ProjectExpression prj2 = (ProjectExpression) exprPlan2.getSinks().get(0);
- assertEquals( loadSchema.getField(1).uid, prj2.getUid() );
+ assertEquals( loadSchema.getField(1).uid, prj2.getFieldSchema().uid );
assertEquals( 1, prj2.getColNum() );
assertEquals( 0, prj2.getInputNum() );
@@ -716,7 +710,7 @@ public class TestLogicalPlanMigrationVis
assertEquals( 1, exprPlan3.getSinks().size() );
assertEquals( ProjectExpression.class, exprPlan3.getSinks().get(0).getClass() );
ProjectExpression prj3 = (ProjectExpression) exprPlan3.getSinks().get(0);
- assertEquals( load2Schema.getField(0).uid, prj3.getUid() );
+ assertEquals( load2Schema.getField(0).uid, prj3.getFieldSchema().uid );
assertEquals( 0, prj3.getColNum() );
assertEquals( 1, prj3.getInputNum() );
@@ -724,7 +718,7 @@ public class TestLogicalPlanMigrationVis
assertEquals( 1, exprPlan4.getSinks().size() );
assertEquals( ProjectExpression.class, exprPlan4.getSinks().get(0).getClass() );
ProjectExpression prj4 = (ProjectExpression) exprPlan4.getSinks().get(0);
- assertEquals( load2Schema.getField(1).uid, prj4.getUid() );
+ assertEquals( load2Schema.getField(1).uid, prj4.getFieldSchema().uid );
assertEquals( 1, prj4.getColNum() );
assertEquals( 1, prj4.getInputNum() );
}
@@ -751,18 +745,12 @@ public class TestLogicalPlanMigrationVis
return uids;
}
- private org.apache.pig.experimental.logical.relational.LogicalPlan migratePlan(LogicalPlan lp) throws VisitorException{
+ private org.apache.pig.newplan.logical.relational.LogicalPlan migratePlan(LogicalPlan lp) throws VisitorException{
LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);
visitor.visit();
- org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
- try {
- UidStamper stamper = new UidStamper(newPlan);
- stamper.visit();
+ org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
- return newPlan;
- }catch(Exception e) {
- throw new VisitorException(e);
- }
+ return newPlan;
}
}