You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/07/28 20:55:37 UTC

svn commit: r1693142 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer: MultiQueryOptimizerSpark.java NoopFilterRemover.java

Author: xuefu
Date: Tue Jul 28 18:55:37 2015
New Revision: 1693142

URL: http://svn.apache.org/r1693142
Log:
PIG-4594: Enable TestMultiQuery in spark mode (Liyun via Xuefu)
Added two new files which were missing in previous commit.

Added:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java?rev=1693142&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java Tue Jul 28 18:55:37 2015
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+
+/**
+ * MultiQueryOptimizer for spark
+ */
+public class MultiQueryOptimizerSpark extends SparkOpPlanVisitor {
+    private String scope;
+    private NodeIdGenerator nig;
+
+    public MultiQueryOptimizerSpark(SparkOperPlan plan) {
+        super(plan, new ReverseDependencyOrderWalker<SparkOperator, SparkOperPlan>(plan));
+        nig = NodeIdGenerator.getGenerator();
+        List<SparkOperator> roots = plan.getRoots();
+        scope = roots.get(0).getOperatorKey().getScope();
+    }
+
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+        try {
+            if (!sparkOp.isSplitter()) {
+                return;
+            }
+
+            List<SparkOperator> splittees = getPlan().getSuccessors(sparkOp);
+
+            if (splittees == null) {
+                return;
+            }
+
+            //If the size of predecessors of splittee is more than 1, then not do multiquery optimization
+            //@see TestMultiQueryBasic#testMultiQueryWithFJ_2
+            for (SparkOperator splittee : splittees) {
+                if (getPlan().getPredecessors(splittee).size() > 1) {
+                    return;
+                }
+            }
+
+            if (splittees.size() == 1) {
+                // We don't need a POSplit here, we can merge the splittee into spliter
+                SparkOperator singleSplitee = splittees.get(0);
+                POStore poStore = null;
+                PhysicalOperator firstNodeLeaf = sparkOp.physicalPlan.getLeaves().get(0);
+                if (firstNodeLeaf instanceof POStore) {
+                    poStore = (POStore) firstNodeLeaf;
+                }
+                PhysicalOperator firstNodeLeafPred = sparkOp.physicalPlan.getPredecessors(firstNodeLeaf).get(0);
+                sparkOp.physicalPlan.remove(poStore);  // remove  unnecessary store
+                List<PhysicalOperator> firstNodeRoots = singleSplitee.physicalPlan.getRoots();
+                sparkOp.physicalPlan.merge(singleSplitee.physicalPlan);
+                for (int j = 0; j < firstNodeRoots.size(); j++) {
+                    PhysicalOperator firstNodeRoot = firstNodeRoots.get(j);
+                    POLoad poLoad = null;
+                    if (firstNodeRoot instanceof POLoad && poStore != null) {
+                        poLoad = (POLoad) firstNodeRoot;
+                        if (poLoad.getLFile().getFileName().equals(poStore.getSFile().getFileName())) {
+                            PhysicalOperator firstNodeRootSucc = sparkOp.physicalPlan.getSuccessors(firstNodeRoot).get(0);
+                            sparkOp.physicalPlan.remove(poLoad); // remove unnecessary load
+                            sparkOp.physicalPlan.forceConnect(firstNodeLeafPred, firstNodeRootSucc);
+                        }
+                    }
+                }
+                addSubPlanPropertiesToParent(sparkOp, singleSplitee);
+                removeSplittee(getPlan(), sparkOp, singleSplitee);
+            } else {
+                //If the size of splittee is more than 1, we need create a split which type is POSplit, merge all the physical plans
+                // of splittees to the physical plan of split and remove the splittees.
+                List<PhysicalOperator> firstNodeLeaves = sparkOp.physicalPlan.getLeaves();
+                PhysicalOperator firstNodeLeaf = firstNodeLeaves.size() > 0 ? firstNodeLeaves.get(0) : null;
+                POStore poStore = null;
+                if (firstNodeLeaf != null && firstNodeLeaf instanceof POStore) {
+                    poStore = (POStore) firstNodeLeaf;
+                    sparkOp.physicalPlan.remove(poStore); // remove  unnecessary store
+                    POSplit split = getSplit();
+                    ArrayList<SparkOperator> spliteesCopy = new ArrayList
+                            <SparkOperator>(splittees);
+                    for (SparkOperator splitee : spliteesCopy) {
+                        List<PhysicalOperator> firstNodeRoots = splitee.physicalPlan.getRoots();
+                        for (int i = 0; i < firstNodeRoots.size(); i++) {
+                            if (firstNodeRoots.get(i) instanceof POLoad) {
+                                POLoad poLoad = (POLoad) firstNodeRoots.get(i);
+                                if (poLoad.getLFile().getFileName().equals(poStore.getSFile().getFileName())) {
+                                    splitee.physicalPlan.remove(poLoad);  // remove  unnecessary load
+                                    split.addPlan(splitee.physicalPlan);
+                                    addSubPlanPropertiesToParent(sparkOp, splitee);
+                                    removeSplittee(getPlan(), sparkOp, splitee);
+                                }
+                            }
+                        }
+                    }
+
+                    sparkOp.physicalPlan.addAsLeaf(split);
+                }
+            }
+        } catch (PlanException e) {
+            throw new VisitorException(e);
+        }
+    }
+
+    private void removeSplittee(SparkOperPlan plan, SparkOperator splitter,
+                                SparkOperator splittee) throws PlanException {
+        if (plan.getSuccessors(splittee) != null) {
+            List<SparkOperator> succs = new ArrayList();
+            succs.addAll(plan.getSuccessors(splittee));
+            plan.disconnect(splitter, splittee);
+            for (SparkOperator succSparkOperator : succs) {
+                plan.disconnect(splittee, succSparkOperator);
+                plan.connect(splitter, succSparkOperator);
+            }
+        }
+        getPlan().remove(splittee);
+    }
+
+    private POSplit getSplit() {
+        return new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope)));
+    }
+
+    static public void addSubPlanPropertiesToParent(SparkOperator parentOper, SparkOperator subPlanOper) {
+        // Copy only map side properties. For eg: crossKeys.
+        // Do not copy reduce side specific properties. For eg: useSecondaryKey, segmentBelow, sortOrder, etc
+        if (subPlanOper.getCrossKeys() != null) {
+            for (String key : subPlanOper.getCrossKeys()) {
+                parentOper.addCrossKey(key);
+            }
+        }
+        parentOper.copyFeatures(subPlanOper, null);
+
+        if (subPlanOper.getRequestedParallelism() > parentOper.getRequestedParallelism()) {
+            parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
+        }
+        subPlanOper.setRequestedParallelismByReference(parentOper);
+        parentOper.UDFs.addAll(subPlanOper.UDFs);
+        parentOper.scalars.addAll(subPlanOper.scalars);
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java?rev=1693142&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java Tue Jul 28 18:55:37 2015
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * For historical reasons splits will always produce filters that pass
+ * everything through unchanged. This optimizer removes these.
+ * <p/>
+ * The condition we look for is POFilters with a constant boolean
+ * (true) expression as it's plan.
+ */
+public class NoopFilterRemover extends SparkOpPlanVisitor {
+    private Log log = LogFactory.getLog(NoopFilterRemover.class);
+
+    public NoopFilterRemover(SparkOperPlan plan) {
+        super(plan, new DependencyOrderWalker<SparkOperator, SparkOperPlan>(plan));
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+        List<POFilter> filters = PlanHelper.getPhysicalOperators(sparkOp
+                .physicalPlan, POFilter.class);
+        for (POFilter filter : filters) {
+            PhysicalPlan filterPlan = filter.getPlan();
+            if (filterPlan.size() == 1) {
+                PhysicalOperator fp = filterPlan.getRoots().get(0);
+                if (fp instanceof ConstantExpression) {
+                    ConstantExpression exp = (ConstantExpression) fp;
+                    Object value = exp.getValue();
+                    if (value instanceof Boolean) {
+                        Boolean filterValue = (Boolean) value;
+                        if (filterValue) {
+                            removeFilter(filter, sparkOp.physicalPlan);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void removeFilter(POFilter filter, PhysicalPlan plan) {
+        if (plan.size() > 1) {
+            try {
+                List<PhysicalOperator> fInputs = filter.getInputs();
+                List<PhysicalOperator> sucs = plan.getSuccessors(filter);
+
+                plan.removeAndReconnect(filter);
+                if (sucs != null && sucs.size() != 0) {
+                    for (PhysicalOperator suc : sucs) {
+                        suc.setInputs(fInputs);
+                    }
+                }
+            } catch (PlanException pe) {
+                log.info("Couldn't remove a filter in optimizer: " + pe.getMessage());
+            }
+        }
+    }
+}