You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/02/18 23:20:09 UTC
svn commit: r911616 [4/7] - in /hadoop/pig/branches/load-store-redesign: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/
src/org/apach...
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/MergeFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/MergeFilter.java?rev=911616&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/MergeFilter.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/MergeFilter.java Thu Feb 18 22:20:07 2010
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.util.Pair;
+
+public class MergeFilter extends Rule {
+
+ public MergeFilter(String n) {
+ super(n);
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new MergeFilterTransformer();
+ }
+
+ public class MergeFilterTransformer extends Transformer {
+
+ private OperatorSubPlan subPlan;
+
+ @Override
+ public boolean check(OperatorPlan matched) throws IOException {
+ LOFilter filter = (LOFilter)matched.getSources().get(0);
+ List<Operator> succeds = currentPlan.getSuccessors(filter);
+ // if this filter is followed by another filter, we should combine them
+ if (succeds != null && succeds.size() == 1) {
+ if (succeds.get(0) instanceof LOFilter) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws IOException {
+ subPlan = new OperatorSubPlan(currentPlan);
+
+ LOFilter filter = (LOFilter)matched.getSources().get(0);
+
+ subPlan.add(filter);
+
+ List<Operator> succeds = currentPlan.getSuccessors(filter);
+ if (succeds != null && succeds.size()== 1 && (succeds.get(0) instanceof LOFilter)) {
+ LOFilter next = (LOFilter)succeds.get(0);
+ combineFilterCond(filter, next);
+ Pair<Integer, Integer> p1 = currentPlan.disconnect(filter, next);
+ List<Operator> ll = currentPlan.getSuccessors(next);
+ if (ll!= null && ll.size()>0) {
+ Operator op = ll.get(0);
+ Pair<Integer, Integer> p2 = currentPlan.disconnect(next, op);
+ currentPlan.connect(filter, p1.first, op, p2.second);
+ subPlan.add(op);
+ }
+
+ currentPlan.remove(next);
+ }
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ // combine the condition of two filters. The condition of second filter
+ // is added into the condition of first filter with an AND operator.
+ private void combineFilterCond(LOFilter f1, LOFilter f2) throws IOException {
+ LogicalExpressionPlan p1 = f1.getFilterPlan();
+ LogicalExpressionPlan p2 = f2.getFilterPlan();
+ LogicalExpressionPlan andPlan = new LogicalExpressionPlan();
+
+ // add existing operators
+ Iterator<Operator> iter = p1.getOperators();
+ while(iter.hasNext()) {
+ andPlan.add(iter.next());
+ }
+
+ iter = p2.getOperators();
+ while(iter.hasNext()) {
+ andPlan.add(iter.next());
+ }
+
+ // add all connections
+ iter = p1.getOperators();
+ while(iter.hasNext()) {
+ Operator n = iter.next();
+ List<Operator> l = p1.getPredecessors(n);
+ if (l != null) {
+ for(Operator op: l) {
+ andPlan.connect(op, n);
+ }
+ }
+ }
+
+ iter = p2.getOperators();
+ while(iter.hasNext()) {
+ Operator n = iter.next();
+ List<Operator> l = p2.getPredecessors(n);
+ if (l != null) {
+ for(Operator op: l) {
+ andPlan.connect(op, n);
+ }
+ }
+ }
+
+ // create an AND
+ new AndExpression(andPlan, (LogicalExpression)p1.getSources().get(0), (LogicalExpression)p2.getSources().get(0));
+
+ f1.setFilterPlan(andPlan);
+ }
+
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ // the pattern that this rule looks for
+ // is filter operator
+ LogicalPlan plan = new LogicalPlan();
+ LogicalRelationalOperator op = new LOFilter(plan);
+ plan.add(op);
+
+ return plan;
+ }
+}
+
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java?rev=911616&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java Thu Feb 18 22:20:07 2010
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.util.Pair;
+
+public class PushUpFilter extends Rule {
+
+ public PushUpFilter(String n) {
+ super(n);
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new PushUpFilterTransformer();
+ }
+
+ public class PushUpFilterTransformer extends Transformer {
+
+ private OperatorSubPlan subPlan;
+
+ @Override
+ public boolean check(OperatorPlan matched) throws IOException {
+ // check if it is inner join
+ LOJoin join = (LOJoin)matched.getSources().get(0);
+ boolean[] innerFlags = join.getInnerFlags();
+ for(boolean inner: innerFlags) {
+ if (!inner){
+ return false;
+ }
+ }
+
+ Operator next = matched.getSinks().get(0);
+ while(next != null && next instanceof LOFilter) {
+ LOFilter filter = (LOFilter)next;
+ LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+
+ // collect all uids used in the filter plan
+ Set<Long> uids = new HashSet<Long>();
+ Iterator<Operator> iter = filterPlan.getOperators();
+ while(iter.hasNext()) {
+ Operator op = iter.next();
+ if (op instanceof ProjectExpression) {
+ long uid = ((ProjectExpression)op).getUid();
+ uids.add(uid);
+ }
+ }
+
+ List<Operator> preds = currentPlan.getPredecessors(join);
+
+ for(int j=0; j<preds.size(); j++) {
+ if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) {
+ return true;
+ }
+ }
+
+ // if current filter can not move up, check next filter
+ List<Operator> l = currentPlan.getSuccessors(filter);
+ if (l != null) {
+ next = l.get(0);
+ } else {
+ next = null;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws IOException {
+ subPlan = new OperatorSubPlan(currentPlan);
+
+ LOJoin join = (LOJoin)matched.getSources().get(0);
+ subPlan.add(join);
+
+ Operator next = matched.getSinks().get(0);
+ while(next != null && next instanceof LOFilter) {
+ LOFilter filter = (LOFilter)next;
+ subPlan.add(filter);
+
+ LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+
+ // collect all uids used in the filter plan
+ Set<Long> uids = new HashSet<Long>();
+ Iterator<Operator> iter = filterPlan.getOperators();
+ while(iter.hasNext()) {
+ Operator op = iter.next();
+ if (op instanceof ProjectExpression) {
+ long uid = ((ProjectExpression)op).getUid();
+ uids.add(uid);
+ }
+ }
+
+ // find the farthest predecessor that has all the fields
+ LogicalRelationalOperator input = join;
+ List<Operator> preds = currentPlan.getPredecessors(input);
+ while(preds != null) {
+ boolean found = false;
+ for(int j=0; j<preds.size(); j++) {
+ if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) {
+ input = (LogicalRelationalOperator)preds.get(j);
+ subPlan.add(input);
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ break;
+ }
+ preds = currentPlan.getPredecessors(input);
+ }
+
+ if (input != join) {
+ Operator pred = currentPlan.getPredecessors(filter).get(0);
+ Operator succed = currentPlan.getSuccessors(filter).get(0);
+ subPlan.add(succed);
+
+ Pair<Integer, Integer> p1 = currentPlan.disconnect(pred, filter);
+ Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, succed);
+ currentPlan.connect(pred, p1.first, succed, p2.second);
+
+ succed = currentPlan.getSuccessors(input).get(0);
+ Pair<Integer, Integer> p3 = currentPlan.disconnect(input, succed);
+ currentPlan.connect(input, p3.first, filter, 0);
+ currentPlan.connect(filter, 0, succed, p3.second);
+
+ return;
+ }
+
+ List<Operator> l = currentPlan.getSuccessors(filter);
+ if (l != null) {
+ next = l.get(0);
+ } else {
+ next = null;
+ }
+ }
+ }
+
+ // check if a relational operator contains all of the specified uids
+ private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) {
+ LogicalSchema schema = op.getSchema();
+ List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
+ Set<Long> all = new HashSet<Long>();
+ for(LogicalSchema.LogicalFieldSchema f:fields) {
+ all.add(f.uid);
+ }
+
+ return all.containsAll(uids);
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ // the pattern that this rule looks for
+ // is join -> filter
+ LogicalPlan plan = new LogicalPlan();
+ LogicalRelationalOperator op1 = new LOJoin(plan);
+ LogicalRelationalOperator op2 = new LOFilter(plan);
+ plan.add(op1);
+ plan.add(op2);
+ plan.connect(op1, op2);
+
+ return plan;
+ }
+}
+
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/SplitFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/SplitFilter.java?rev=911616&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/SplitFilter.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/rules/SplitFilter.java Thu Feb 18 22:20:07 2010
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.util.Pair;
+
+public class SplitFilter extends Rule {
+
+ public SplitFilter(String n) {
+ super(n);
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new SplitFilterTransformer();
+ }
+
+ public class SplitFilterTransformer extends Transformer {
+ private OperatorSubPlan subPlan;
+
+ @Override
+ public boolean check(OperatorPlan matched) throws IOException {
+ LOFilter filter = (LOFilter)matched.getSources().get(0);
+ LogicalExpressionPlan cond = filter.getFilterPlan();
+ LogicalExpression root = (LogicalExpression) cond.getSources().get(0);
+ if (root instanceof AndExpression) {
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws IOException {
+ subPlan = new OperatorSubPlan(currentPlan);
+
+ // split one LOFilter into 2 by "AND"
+ LOFilter filter = (LOFilter)matched.getSources().get(0);
+ LogicalExpressionPlan cond = filter.getFilterPlan();
+ LogicalExpression root = (LogicalExpression) cond.getSources().get(0);
+ if (!(root instanceof AndExpression)) {
+ return;
+ }
+ LogicalExpressionPlan op1 = new LogicalExpressionPlan();
+ op1.add((LogicalExpression)cond.getSuccessors(root).get(0));
+ fillSubPlan(cond, op1, (LogicalExpression)cond.getSuccessors(root).get(0));
+
+ LogicalExpressionPlan op2 = new LogicalExpressionPlan();
+ op2.add((LogicalExpression)cond.getSuccessors(root).get(1));
+ fillSubPlan(cond, op2, (LogicalExpression)cond.getSuccessors(root).get(1));
+
+ filter.setFilterPlan(op1);
+ LOFilter filter2 = new LOFilter((LogicalPlan)currentPlan, op2);
+ currentPlan.add(filter2);
+
+ Operator succed = null;
+ try {
+ List<Operator> succeds = currentPlan.getSuccessors(filter);
+ if (succeds != null) {
+ succed = succeds.get(0);
+ subPlan.add(succed);
+ Pair<Integer, Integer> p = currentPlan.disconnect(filter, succed);
+ currentPlan.connect(filter2, 0, succed, p.second);
+ currentPlan.connect(filter, p.first, filter2, 0);
+ } else {
+ currentPlan.connect(filter, 0, filter2, 0);
+ }
+ }catch(Exception e) {
+ throw new IOException(e);
+ }
+
+ subPlan.add(filter);
+ subPlan.add(filter2);
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ private void fillSubPlan(OperatorPlan origPlan,
+ OperatorPlan subPlan, Operator startOp) throws IOException {
+
+ List<Operator> l = origPlan.getSuccessors(startOp);
+ if (l != null) {
+ for(Operator le: l) {
+ subPlan.add(le);
+ subPlan.connect(startOp, le);
+ fillSubPlan(origPlan, subPlan, le);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ // the pattern that this rule looks for
+ // is filter
+ LogicalPlan plan = new LogicalPlan();
+ LogicalRelationalOperator op2 = new LOFilter(plan);
+ plan.add(op2);
+
+ return plan;
+ }
+}
+
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java Thu Feb 18 22:20:07 2010
@@ -60,9 +60,9 @@
* @return all operators in the plan that have no predecessors, or
* an empty list if the plan is empty.
*/
- public List<Operator> getRoots() {
+ public List<Operator> getSources() {
if (roots.size() == 0 && ops.size() > 0) {
- for (Operator op : ops) {
+ for (Operator op : ops) {
if (toEdges.get(op) == null) {
roots.add(op);
}
@@ -76,7 +76,7 @@
* @return all operators in the plan that have no successors, or
* an empty list if the plan is empty.
*/
- public List<Operator> getLeaves() {
+ public List<Operator> getSinks() {
if (leaves.size() == 0 && ops.size() > 0) {
for (Operator op : ops) {
if (fromEdges.get(op) == null) {
@@ -200,5 +200,62 @@
public Iterator<Operator> getOperators() {
return ops.iterator();
}
-
+
+ public boolean isEqual(OperatorPlan other) {
+ return isEqual(this, other);
+ }
+
+ private static boolean checkPredecessors(Operator op1,
+ Operator op2) {
+ try {
+ List<Operator> preds = op1.getPlan().getPredecessors(op1);
+ List<Operator> otherPreds = op2.getPlan().getPredecessors(op2);
+ if (preds == null && otherPreds == null) {
+ // intentionally blank
+ } else if (preds == null || otherPreds == null) {
+ return false;
+ } else {
+ if (preds.size() != otherPreds.size()) return false;
+ for (int i = 0; i < preds.size(); i++) {
+ Operator p1 = preds.get(i);
+ Operator p2 = otherPreds.get(i);
+ if (!p1.isEqual(p2)) return false;
+ if (!checkPredecessors(p1, p2)) return false;
+ }
+ }
+ return true;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected static boolean isEqual(OperatorPlan p1, OperatorPlan p2) {
+ if (p1 == p2) {
+ return true;
+ }
+
+ if (p1 != null && p2 != null) {
+ List<Operator> leaves = p1.getSinks();
+ List<Operator> otherLeaves = p2.getSinks();
+ if (leaves.size() != otherLeaves.size()) return false;
+ // Must find some leaf that is equal to each leaf. There is no
+ // guarantee leaves will be returned in any particular order.
+ boolean foundAll = true;
+ for (Operator op1 : leaves) {
+ boolean foundOne = false;
+ for (Operator op2 : otherLeaves) {
+ if (op1.isEqual(op2) && checkPredecessors(op1, op2)) {
+ foundOne = true;
+ break;
+ }
+ }
+ foundAll &= foundOne;
+ if (!foundAll) return false;
+ }
+ return foundAll;
+ }
+
+ return false;
+ }
+
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java Thu Feb 18 22:20:07 2010
@@ -64,7 +64,7 @@
List<Operator> fifo = new ArrayList<Operator>();
Set<Operator> seen = new HashSet<Operator>();
- List<Operator> leaves = plan.getLeaves();
+ List<Operator> leaves = plan.getSinks();
if (leaves == null) return;
for (Operator op : leaves) {
doAllPredecessors(op, seen, fifo);
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DepthFirstWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DepthFirstWalker.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DepthFirstWalker.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DepthFirstWalker.java Thu Feb 18 22:20:07 2010
@@ -45,7 +45,7 @@
*/
@Override
public void walk(PlanVisitor visitor) throws IOException {
- List<Operator> roots = plan.getRoots();
+ List<Operator> roots = plan.getSources();
Set<Operator> seen = new HashSet<Operator>();
depthFirst(null, roots, seen, visitor);
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/Operator.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/Operator.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/Operator.java Thu Feb 18 22:20:07 2010
@@ -18,6 +18,7 @@
package org.apache.pig.experimental.plan;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -26,6 +27,7 @@
protected String name;
protected OperatorPlan plan; // plan that contains this operator
protected Map<String, Object> annotations;
+ protected final int hashPrime = 31;
public Operator(String n, OperatorPlan p) {
name = n;
@@ -36,8 +38,9 @@
/**
* Accept a visitor at this node in the graph.
* @param v Visitor to accept.
+ * @throws IOException
*/
- public abstract void accept(PlanVisitor v);
+ public abstract void accept(PlanVisitor v) throws IOException;
public String getName() {
return name;
@@ -70,4 +73,12 @@
return annotations.get(key);
}
+ /**
+ * This is like a shallow equals comparison.
+ * It returns true if two operators have equivalent properties even if they are
+ * different objects. Here properties mean equivalent plan and equivalent name.
+ * @param operator
+ * @return true if two object have equivalent properties, else false
+ */
+ public abstract boolean isEqual(Operator operator);
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/OperatorPlan.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/OperatorPlan.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/OperatorPlan.java Thu Feb 18 22:20:07 2010
@@ -19,14 +19,9 @@
package org.apache.pig.experimental.plan;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.pig.impl.util.Pair;
public interface OperatorPlan {
@@ -41,20 +36,20 @@
* @return all operators in the plan that have no predecessors, or
* an empty list if the plan is empty.
*/
- public List<Operator> getRoots();
+ public List<Operator> getSources();
/**
* Get all operators in the plan that have no successors.
* @return all operators in the plan that have no successors, or
* an empty list if the plan is empty.
*/
- public List<Operator> getLeaves();
+ public List<Operator> getSinks();
/**
* For a given operator, get all operators immediately before it in the
* plan.
* @param op operator to fetch predecessors of
- * @return list of all operators imeediately before op, or an empty list
+ * @return list of all operators immediately before op, or an empty list
* if op is a root.
* @throws IOException if op is not in the plan.
*/
@@ -63,7 +58,7 @@
/**
* For a given operator, get all operators immediately after it.
* @param op operator to fetch successors of
- * @return list of all operators imeediately after op, or an empty list
+ * @return list of all operators immediately after op, or an empty list
* if op is a leaf.
* @throws IOException if op is not in the plan.
*/
@@ -117,4 +112,13 @@
* @return an iterator of all operators in this plan
*/
public Iterator<Operator> getOperators();
+
+ /**
+ * This is like a shallow comparison.
+ * Two plans are equal if they have equivalent operators and equivalent
+ * structure.
+ * @param other object to compare
+ * @return boolean if both the plans are equivalent
+ */
+ public boolean isEqual( OperatorPlan other );
}
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/OperatorSubPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/OperatorSubPlan.java?rev=911616&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/OperatorSubPlan.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/OperatorSubPlan.java Thu Feb 18 22:20:07 2010
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * Class to represent a view of a plan. The view contains a subset of the plan.
+ * All the operators returned from the view are the same objects to the operators
+ * in its base plan. It is used to represent match results.
+ *
+ */
+public class OperatorSubPlan implements OperatorPlan {
+
+ private OperatorPlan basePlan;
+ private List<Operator> roots;
+ private List<Operator> leaves;
+ private Set<Operator> operators;
+
+ public OperatorSubPlan(OperatorPlan base) {
+ basePlan = base;
+ roots = new ArrayList<Operator>();
+ leaves = new ArrayList<Operator>();
+ operators = new HashSet<Operator>();
+ }
+
+ public OperatorPlan getBasePlan() {
+ return basePlan;
+ }
+
+ public void add(Operator op) {
+ operators.add(op);
+ }
+
+ public void connect(Operator from, int fromPos, Operator to, int toPos) {
+ throw new UnsupportedOperationException("connect() can not be called on OperatorSubPlan");
+ }
+
+ public void connect(Operator from, Operator to) {
+ throw new UnsupportedOperationException("connect() can not be called on OperatorSubPlan");
+ }
+
+ public Pair<Integer, Integer> disconnect(Operator from, Operator to) throws IOException {
+ throw new UnsupportedOperationException("disconnect() can not be called on OperatorSubPlan");
+ }
+
+ public List<Operator> getSinks() {
+ if (leaves.size() == 0 && operators.size() > 0) {
+ for (Operator op : operators) {
+ try {
+ if (getSuccessors(op) == null) {
+ leaves.add(op);
+ }
+ }catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return leaves;
+ }
+
+ public Iterator<Operator> getOperators() {
+ return operators.iterator();
+ }
+
+ public List<Operator> getPredecessors(Operator op) throws IOException {
+ List<Operator> l = basePlan.getPredecessors(op);
+ List<Operator> list = null;
+ if (l != null) {
+ for(Operator oper: l) {
+ if (operators.contains(oper)) {
+ if (list == null) {
+ list = new ArrayList<Operator>();
+ }
+ list.add(oper);
+ }
+ }
+ }
+
+ return list;
+ }
+
+ public List<Operator> getSources() {
+ if (roots.size() == 0 && operators.size() > 0) {
+ for (Operator op : operators) {
+ try {
+ if (getPredecessors(op) == null) {
+ roots.add(op);
+ }
+ }catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return roots;
+ }
+
+ public List<Operator> getSuccessors(Operator op) throws IOException {
+ List<Operator> l = basePlan.getSuccessors(op);
+ List<Operator> list = null;
+ if (l != null) {
+ for(Operator oper: l) {
+ if (operators.contains(oper)) {
+ if (list == null) {
+ list = new ArrayList<Operator>();
+ }
+ list.add(oper);
+ }
+ }
+ }
+
+ return list;
+ }
+
+ public void remove(Operator op) throws IOException {
+ operators.remove(op);
+ leaves.clear();
+ roots.clear();
+ }
+
+ public int size() {
+ return operators.size();
+ }
+
+ @Override
+ public boolean isEqual(OperatorPlan other) {
+ return BaseOperatorPlan.isEqual(this, other);
+ }
+}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/PlanEdge.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/PlanEdge.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/PlanEdge.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/PlanEdge.java Thu Feb 18 22:20:07 2010
@@ -83,7 +83,8 @@
Operator keeper = null;
for (int j = 0; i.hasNext(); j++) {
keeper = i.next();
- if (keeper.equals(value)) {
+ //if (keeper.equals(value)) {
+ if (keeper == value) {
i.remove();
index = j;
break;
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/PlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/PlanVisitor.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/PlanVisitor.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/PlanVisitor.java Thu Feb 18 22:20:07 2010
@@ -31,6 +31,9 @@
*/
public abstract class PlanVisitor {
+ // TODO Remove this scope value
+ final protected static String DEFAULT_SCOPE = "scope";
+
protected OperatorPlan plan;
/**
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java Thu Feb 18 22:20:07 2010
@@ -58,7 +58,7 @@
List<Operator> fifo = new ArrayList<Operator>();
Set<Operator> seen = new HashSet<Operator>();
- List<Operator> roots = plan.getRoots();
+ List<Operator> roots = plan.getSources();
if (roots == null) return;
for (Operator op : roots) {
doAllSuccessors(op, seen, fifo);
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/SubtreeDependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/SubtreeDependencyOrderWalker.java?rev=911616&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/SubtreeDependencyOrderWalker.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/SubtreeDependencyOrderWalker.java Thu Feb 18 22:20:07 2010
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class SubtreeDependencyOrderWalker extends DependencyOrderWalker {
+ private Operator startNode;
+
+ public SubtreeDependencyOrderWalker(OperatorPlan plan) {
+ super(plan);
+ }
+
+ public SubtreeDependencyOrderWalker(OperatorPlan plan, Operator startNode) {
+ super(plan);
+ this.startNode = startNode;
+ }
+
+ public void walk(PlanVisitor visitor) throws IOException {
+ List<Operator> fifo = new ArrayList<Operator>();
+ Set<Operator> seen = new HashSet<Operator>();
+
+ // get all predecessors of startNode
+ doAllPredecessors(startNode, seen, fifo);
+
+ for (Operator op: fifo) {
+ op.accept(visitor);
+ }
+ }
+}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java Thu Feb 18 22:20:07 2010
@@ -37,7 +37,7 @@
* Each rule is has two parts: a pattern and and associated transformer.
* Transformers have two important functions: check(), and transform().
* The pattern describes a pattern of node types that the optimizer will
- * look ot match. If that match is found anywhere in the plan, then check()
+ * look to match. If that match is found anywhere in the plan, then check()
* will be called. check() allows the rule to look more in depth at the
* matched pattern and decide whether the rule should be run or not. For
* example, one might design a rule to push filters above join that would
@@ -74,7 +74,13 @@
maxIter = (iterations < 1 ? defaultIterations : iterations);
}
- public void addPlanTransformListener(PlanTransformListener listener) {
+ /**
+ * Adds a listener to the optimization. This listener will be fired
+ * after each rule transforms a plan. Listeners are guaranteed to
+ * be fired in the order they are added.
+ * @param listener
+ */
+ protected void addPlanTransformListener(PlanTransformListener listener) {
listeners.add(listener);
}
@@ -87,6 +93,7 @@
* @throws OptimizerException
*/
public void optimize() throws IOException {
+
for (Set<Rule> rs : ruleSets) {
boolean sawMatch = false;
int numIterations = 0;
@@ -101,7 +108,7 @@
sawMatch = true;
transformer.transform(m);
for(PlanTransformListener l: listeners) {
- l.transformed(plan, transformer);
+ l.transformed(plan, transformer.reportChanges());
}
}
}
@@ -109,5 +116,5 @@
}
} while(sawMatch && ++numIterations < maxIter);
}
- }
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java Thu Feb 18 22:20:07 2010
@@ -18,6 +18,8 @@
package org.apache.pig.experimental.plan.optimizer;
+import java.io.IOException;
+
import org.apache.pig.experimental.plan.OperatorPlan;
/**
@@ -26,9 +28,10 @@
public interface PlanTransformListener {
/**
* the listener that is notified after a plan is transformed
- * @param plan the plan that is transformed
- * @param transformer the transformer that transforms this plan
+ * @param fp the full plan that has been transformed
+ * @param tp a plan containing only the operators that have been transformed
+ * @throws IOException
*/
- public void transformed(OperatorPlan plan, Transformer transformer);
+ public void transformed(OperatorPlan fp, OperatorPlan tp) throws IOException;
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/Rule.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/Rule.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/Rule.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/Rule.java Thu Feb 18 22:20:07 2010
@@ -28,33 +28,15 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.pig.experimental.plan.BaseOperatorPlan;
import org.apache.pig.experimental.plan.Operator;
import org.apache.pig.experimental.plan.OperatorPlan;
-import org.apache.pig.experimental.plan.PlanVisitor;
-import org.apache.pig.impl.util.Pair;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
/**
* Rules describe a pattern of operators. They also reference a Transformer.
* If the pattern of operators is found one or more times in the provided plan,
* then the optimizer will use the associated Transformer to transform the
* plan.
- *
- * The syntax for rules is (x(y, z)) where x is a base node, and y and z are
- * node that precede x. So the graph for the Pig Latin script:
- * A = load;
- * B = load;
- * C = join A, B;
- * D = filter C
- * would be: filter(join(load, load));
- *
- * Rules with multiple end points (leaves) are expressed as (x(), y()) where
- * both x and y are leaves.
- *
- * It is expected that the name given to each node in the pattern exactly
- * matches the name of the class of the node in the Plan to be matched. So
- * to build a rule that matched a join followed by a filter in the logical
- * plan, the pattern would be LOFilter(LOJoin).
*/
public abstract class Rule {
@@ -121,7 +103,7 @@
public List<OperatorPlan> match(OperatorPlan plan) {
currentPlan = plan;
- List<Operator> leaves = pattern.getLeaves();
+ List<Operator> leaves = pattern.getSinks();
Iterator<Operator> iter = plan.getOperators();
List<OperatorPlan> matchedList = new ArrayList<OperatorPlan>();
@@ -162,11 +144,11 @@
siblings = plan.getSuccessors(s);
}else{
// for a root, we get its siblings by getting all roots
- siblings = plan.getRoots();
+ siblings = plan.getSources();
}
}catch(IOException e) {
// not going to happen
- throw new RuntimeException(e);
+ throw new RuntimeException(e);
}
int index = siblings.indexOf(op);
if (siblings.size()-index < leaves.size()) {
@@ -234,21 +216,14 @@
}
- private class PatternMatchOperatorPlan implements OperatorPlan {
- OperatorPlan parent;
- List<Operator> roots;
- List<Operator> leaves;
- Set<Operator> operators;
-
- public PatternMatchOperatorPlan(OperatorPlan parent) {
- this.parent = parent;
- roots = new ArrayList<Operator>();
- leaves = new ArrayList<Operator>();
- operators = new HashSet<Operator>();
+ private class PatternMatchOperatorPlan extends OperatorSubPlan {
+
+ public PatternMatchOperatorPlan(OperatorPlan basePlan) {
+ super(basePlan);
}
protected boolean check(List<Operator> planOps) throws IOException {
- List<Operator> patternOps = pattern.getLeaves();
+ List<Operator> patternOps = pattern.getSinks();
if (planOps.size() != patternOps.size()) {
return false;
}
@@ -258,10 +233,13 @@
if (!check(planOps.get(i), patternOps.get(i), s)) {
return false;
}
- operators.addAll(s);
+ Iterator<Operator> iter = s.iterator();
+ while(iter.hasNext()) {
+ add(iter.next());
+ }
}
- if (operators.size() == pattern.size()) {
+ if (size() == pattern.size()) {
return true;
}
@@ -283,13 +261,9 @@
if (!match(planOp, patternOp)) {
return false;
}
-
- if (pattern.getLeaves().contains(patternOp) && !leaves.contains(planOp)) {
- leaves.add(planOp);
- }
-
+
// check if their predecessors match
- List<Operator> preds1 = parent.getPredecessors(planOp);
+ List<Operator> preds1 = getBasePlan().getPredecessors(planOp);
List<Operator> preds2 = pattern.getPredecessors(patternOp);
if (preds1 == null && preds2 != null) {
return false;
@@ -300,27 +274,25 @@
}
// we've reached the root of the pattern, so a match is found
- if (preds2 == null || preds2.size() == 0) {
- if (!roots.contains(planOp)) {
- roots.add(planOp);
- }
+ if (preds2 == null || preds2.size() == 0) {
opers.push(planOp);
return true;
}
- int index = 0;
- boolean match = true;
+ int index = 0;
// look for predecessors
while(index < preds1.size()) {
+ boolean match = true;
if (match(preds1.get(index), preds2.get(0))) {
if ( (preds1.size() - index) < preds2.size()) {
return false;
}
-
+
+ int oldSize = opers.size();
for(int i=0; i<preds2.size(); i++) {
if (!check(preds1.get(index+i), preds2.get(i), opers)) {
- for(int j=0; j<i; j++) {
- opers.pop();
+ for(int j=opers.size(); j>oldSize; j--) {
+ opers.pop();
}
match = false;
break;
@@ -335,67 +307,6 @@
}
return false;
- }
-
- public void add(Operator op) {
- throw new UnsupportedOperationException("add() can not be called on PatternMatchOperatorPlan");
- }
-
- public void connect(Operator from, int fromPos, Operator to, int toPos) {
- throw new UnsupportedOperationException("connect() can not be called on PatternMatchOperatorPlan");
- }
-
- public void connect(Operator from, Operator to) {
- throw new UnsupportedOperationException("connect() can not be called on PatternMatchOperatorPlan");
- }
-
- public Pair<Integer, Integer> disconnect(Operator from, Operator to) throws IOException {
- throw new UnsupportedOperationException("disconnect() can not be called on PatternMatchOperatorPlan");
- }
-
- public List<Operator> getLeaves() {
- return leaves;
- }
-
- public Iterator<Operator> getOperators() {
- return operators.iterator();
- }
-
- public List<Operator> getPredecessors(Operator op) throws IOException {
- List<Operator> l = parent.getPredecessors(op);
- List<Operator> list = new ArrayList<Operator>();
- for(Operator oper: l) {
- if (operators.contains(oper)) {
- list.add(oper);
- }
- }
-
- return list;
- }
-
- public List<Operator> getRoots() {
- return roots;
- }
-
- public List<Operator> getSuccessors(Operator op) throws IOException {
- List<Operator> l = parent.getSuccessors(op);
- List<Operator> list = new ArrayList<Operator>();
- for(Operator oper: l) {
- if (operators.contains(oper)) {
- list.add(oper);
- }
- }
-
- return list;
- }
-
- public void remove(Operator op) throws IOException {
- throw new UnsupportedOperationException("remove() can not be called on PatternMatchOperatorPlan");
- }
-
- public int size() {
- return operators.size();
- }
-
+ }
}
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/Transformer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/Transformer.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/Transformer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/optimizer/Transformer.java Thu Feb 18 22:20:07 2010
@@ -43,5 +43,14 @@
* @throws IOException
*/
public abstract void transform(OperatorPlan matched) throws IOException;
+
+ /**
+ * Report what parts of the tree were transformed. This is so that
+ * listeners can know which part of the tree to visit and modify
+ * schemas, annotations, etc. So any nodes that were removed need
+ * will not be in this plan, only nodes that were added or moved.
+ * @return OperatorPlan that describes just the changed nodes.
+ */
+ public abstract OperatorPlan reportChanges();
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileSpec.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileSpec.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileSpec.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileSpec.java Thu Feb 18 22:20:07 2010
@@ -20,7 +20,6 @@
import java.io.Serializable;
import org.apache.pig.FuncSpec;
-import org.apache.pig.impl.PigContext;
/**
@@ -59,4 +58,20 @@
public int getSize() {
throw new UnsupportedOperationException("File Size not implemented yet");
}
+
+ @Override
+ public boolean equals(Object other) {
+ if (other != null && other instanceof FileSpec) {
+ FileSpec ofs = (FileSpec)other;
+ if (!fileName.equals(ofs.fileName)) return false;
+ return funcSpec.equals(ofs.funcSpec);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return getFuncName().hashCode() + fileName.hashCode();
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java Thu Feb 18 22:20:07 2010
@@ -20,11 +20,10 @@
import java.util.List;
-import org.apache.pig.impl.plan.PlanVisitor;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
/**
* This abstract class represents the logical Binary Expression Operator
@@ -35,7 +34,7 @@
public abstract class BinaryExpressionOperator extends ExpressionOperator {
private static final long serialVersionUID = 2L;
- private static Log log = LogFactory.getLog(BinaryExpressionOperator.class);
+ // private static Log log = LogFactory.getLog(BinaryExpressionOperator.class);
/**
* @param plan
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java Thu Feb 18 22:20:07 2010
@@ -40,7 +40,11 @@
import org.apache.pig.impl.logicalLayer.LOJoin;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LogicalPlanCloner;
import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
/**
@@ -138,7 +142,7 @@
// Limit cannot be pushed up
if (predecessor instanceof LOCogroup || predecessor instanceof LOFilter ||
predecessor instanceof LOLoad || predecessor instanceof LOSplit ||
- predecessor instanceof LOSplitOutput || predecessor instanceof LODistinct || predecessor instanceof LOJoin)
+ predecessor instanceof LODistinct || predecessor instanceof LOJoin)
{
return;
}
@@ -234,6 +238,48 @@
throw new OptimizerException(msg, errCode, PigException.BUG, e);
}
}
+ // Limit and OrderBy (LOSort) can be separated by split
+ else if (predecessor instanceof LOSplitOutput) {
+ if(mode == ExecType.LOCAL) {
+ //We don't need this optimisation to happen in the local mode.
+ //so we do nothing here.
+ } else {
+ List<LogicalOperator> grandparants = mPlan
+ .getPredecessors(predecessor);
+ // After insertion of splitters, any node in the plan can
+ // have at most one predecessor
+ if (grandparants != null && grandparants.size() != 0
+ && grandparants.get(0) instanceof LOSplit) {
+ List<LogicalOperator> greatGrandparants = mPlan
+ .getPredecessors(grandparants.get(0));
+ if (greatGrandparants != null
+ && greatGrandparants.size() != 0
+ && greatGrandparants.get(0) instanceof LOSort) {
+ LOSort sort = (LOSort)greatGrandparants.get(0);
+ LOSort newSort = new LOSort(
+ sort.getPlan(),
+ new OperatorKey(
+ sort.getOperatorKey().scope,
+ NodeIdGenerator
+ .getGenerator()
+ .getNextNodeId(
+ sort.getOperatorKey().scope)),
+ sort.getSortColPlans(),
+ sort.getAscendingCols(),
+ sort.getUserFunc());
+
+ newSort.setLimit(limit.getLimit());
+ try {
+ mPlan.replace(limit, newSort);
+ } catch (PlanException e) {
+ int errCode = 2012;
+ String msg = "Can not replace LOLimit with LOSort after splitter";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+ }
+ }
+ }
else {
int errCode = 2013;
String msg = "Moving LOLimit in front of " + predecessor.getClass().getSimpleName() + " is not implemented";
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java Thu Feb 18 22:20:07 2010
@@ -93,7 +93,7 @@
errCode = 4000;
break;
}
- errMsg = "Output specification is invalid: "+outLoc;
+ errMsg = "Output specification '"+outLoc+"' is invalid or already exists";
msgCollector.collect(errMsg, MessageType.Error) ;
throw new PlanValidationException(errMsg, errCode, errSrc, ioe);
} catch (InterruptedException ie) {
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java Thu Feb 18 22:20:07 2010
@@ -39,13 +39,40 @@
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.test.utils.LogicalPlanTester;
public class TestCombiner extends TestCase {
-
-
MiniCluster cluster = MiniCluster.buildCluster();
+
+ @Test
+ public void testSuccessiveUserFuncs1() throws Exception{
+
+ LogicalPlanTester tester = new LogicalPlanTester();
+ tester.buildPlan( "a = load 'students.txt' as (c1,c2,c3,c4); ");
+ tester.buildPlan("c = group a by c2; ");
+ tester.buildPlan("f = foreach c generate COUNT(org.apache.pig.builtin.Distinct($1.$2)); ");
+ LogicalPlan lp = tester.buildPlan("store f into 'out';");
+ PigContext pc = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()).getPigContext();
+ assertTrue((Util.buildMRPlan(Util.buildPhysicalPlan(lp,pc),pc).getRoots().get(0).combinePlan.isEmpty()));
+ }
+
+ @Test
+ public void testSuccessiveUserFuncs2() throws Exception{
+
+ LogicalPlanTester tester = new LogicalPlanTester();
+ tester.buildPlan( "a = load 'students.txt' as (c1,c2,c3,c4); ");
+ tester.buildPlan("c = group a by c2; ");
+ String dummyUDF = JiraPig1030.class.getName();
+ tester.buildPlan("f = foreach c generate COUNT("+dummyUDF+"" +
+ "(org.apache.pig.builtin.Distinct($1.$2),"+dummyUDF+"())); ");
+ LogicalPlan lp = tester.buildPlan("store f into 'out';");
+ PigContext pc = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()).getPigContext();
+ assertTrue((Util.buildMRPlan(Util.buildPhysicalPlan(lp,pc),pc).getRoots().get(0).combinePlan.isEmpty()));
+ }
@Test
public void testOnCluster() throws Exception {
@@ -117,7 +144,6 @@
return inputFileName;
}
-
@Test
public void testNoCombinerUse() {
// To simulate this, we will have two input files
@@ -373,7 +399,7 @@
return "";
}
}
-
+
@Test
public void testJiraPig1030() {
// test that combiner is NOT invoked when
@@ -416,4 +442,5 @@
}
}
}
+
}