You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/07/28 20:55:37 UTC
svn commit: r1693142 - in
/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer:
MultiQueryOptimizerSpark.java NoopFilterRemover.java
Author: xuefu
Date: Tue Jul 28 18:55:37 2015
New Revision: 1693142
URL: http://svn.apache.org/r1693142
Log:
PIG-4594: Enable TestMultiQuery in spark mode (Liyun via Xuefu)
Added two new files which were missing in previous commit.
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java?rev=1693142&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java Tue Jul 28 18:55:37 2015
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+
+/**
+ * MultiQueryOptimizer for spark
+ */
+public class MultiQueryOptimizerSpark extends SparkOpPlanVisitor {
+ private String scope;
+ private NodeIdGenerator nig;
+
+ public MultiQueryOptimizerSpark(SparkOperPlan plan) {
+ super(plan, new ReverseDependencyOrderWalker<SparkOperator, SparkOperPlan>(plan));
+ nig = NodeIdGenerator.getGenerator();
+ List<SparkOperator> roots = plan.getRoots();
+ scope = roots.get(0).getOperatorKey().getScope();
+ }
+
+
+ @Override
+ public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+ try {
+ if (!sparkOp.isSplitter()) {
+ return;
+ }
+
+ List<SparkOperator> splittees = getPlan().getSuccessors(sparkOp);
+
+ if (splittees == null) {
+ return;
+ }
+
+ //If the size of predecessors of splittee is more than 1, then not do multiquery optimization
+ //@see TestMultiQueryBasic#testMultiQueryWithFJ_2
+ for (SparkOperator splittee : splittees) {
+ if (getPlan().getPredecessors(splittee).size() > 1) {
+ return;
+ }
+ }
+
+ if (splittees.size() == 1) {
+ // We don't need a POSplit here, we can merge the splittee into spliter
+ SparkOperator singleSplitee = splittees.get(0);
+ POStore poStore = null;
+ PhysicalOperator firstNodeLeaf = sparkOp.physicalPlan.getLeaves().get(0);
+ if (firstNodeLeaf instanceof POStore) {
+ poStore = (POStore) firstNodeLeaf;
+ }
+ PhysicalOperator firstNodeLeafPred = sparkOp.physicalPlan.getPredecessors(firstNodeLeaf).get(0);
+ sparkOp.physicalPlan.remove(poStore); // remove unnecessary store
+ List<PhysicalOperator> firstNodeRoots = singleSplitee.physicalPlan.getRoots();
+ sparkOp.physicalPlan.merge(singleSplitee.physicalPlan);
+ for (int j = 0; j < firstNodeRoots.size(); j++) {
+ PhysicalOperator firstNodeRoot = firstNodeRoots.get(j);
+ POLoad poLoad = null;
+ if (firstNodeRoot instanceof POLoad && poStore != null) {
+ poLoad = (POLoad) firstNodeRoot;
+ if (poLoad.getLFile().getFileName().equals(poStore.getSFile().getFileName())) {
+ PhysicalOperator firstNodeRootSucc = sparkOp.physicalPlan.getSuccessors(firstNodeRoot).get(0);
+ sparkOp.physicalPlan.remove(poLoad); // remove unnecessary load
+ sparkOp.physicalPlan.forceConnect(firstNodeLeafPred, firstNodeRootSucc);
+ }
+ }
+ }
+ addSubPlanPropertiesToParent(sparkOp, singleSplitee);
+ removeSplittee(getPlan(), sparkOp, singleSplitee);
+ } else {
+ //If the size of splittee is more than 1, we need create a split which type is POSplit, merge all the physical plans
+ // of splittees to the physical plan of split and remove the splittees.
+ List<PhysicalOperator> firstNodeLeaves = sparkOp.physicalPlan.getLeaves();
+ PhysicalOperator firstNodeLeaf = firstNodeLeaves.size() > 0 ? firstNodeLeaves.get(0) : null;
+ POStore poStore = null;
+ if (firstNodeLeaf != null && firstNodeLeaf instanceof POStore) {
+ poStore = (POStore) firstNodeLeaf;
+ sparkOp.physicalPlan.remove(poStore); // remove unnecessary store
+ POSplit split = getSplit();
+ ArrayList<SparkOperator> spliteesCopy = new ArrayList
+ <SparkOperator>(splittees);
+ for (SparkOperator splitee : spliteesCopy) {
+ List<PhysicalOperator> firstNodeRoots = splitee.physicalPlan.getRoots();
+ for (int i = 0; i < firstNodeRoots.size(); i++) {
+ if (firstNodeRoots.get(i) instanceof POLoad) {
+ POLoad poLoad = (POLoad) firstNodeRoots.get(i);
+ if (poLoad.getLFile().getFileName().equals(poStore.getSFile().getFileName())) {
+ splitee.physicalPlan.remove(poLoad); // remove unnecessary load
+ split.addPlan(splitee.physicalPlan);
+ addSubPlanPropertiesToParent(sparkOp, splitee);
+ removeSplittee(getPlan(), sparkOp, splitee);
+ }
+ }
+ }
+ }
+
+ sparkOp.physicalPlan.addAsLeaf(split);
+ }
+ }
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+
+ private void removeSplittee(SparkOperPlan plan, SparkOperator splitter,
+ SparkOperator splittee) throws PlanException {
+ if (plan.getSuccessors(splittee) != null) {
+ List<SparkOperator> succs = new ArrayList();
+ succs.addAll(plan.getSuccessors(splittee));
+ plan.disconnect(splitter, splittee);
+ for (SparkOperator succSparkOperator : succs) {
+ plan.disconnect(splittee, succSparkOperator);
+ plan.connect(splitter, succSparkOperator);
+ }
+ }
+ getPlan().remove(splittee);
+ }
+
+ private POSplit getSplit() {
+ return new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ }
+
+ static public void addSubPlanPropertiesToParent(SparkOperator parentOper, SparkOperator subPlanOper) {
+ // Copy only map side properties. For eg: crossKeys.
+ // Do not copy reduce side specific properties. For eg: useSecondaryKey, segmentBelow, sortOrder, etc
+ if (subPlanOper.getCrossKeys() != null) {
+ for (String key : subPlanOper.getCrossKeys()) {
+ parentOper.addCrossKey(key);
+ }
+ }
+ parentOper.copyFeatures(subPlanOper, null);
+
+ if (subPlanOper.getRequestedParallelism() > parentOper.getRequestedParallelism()) {
+ parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
+ }
+ subPlanOper.setRequestedParallelismByReference(parentOper);
+ parentOper.UDFs.addAll(subPlanOper.UDFs);
+ parentOper.scalars.addAll(subPlanOper.scalars);
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java?rev=1693142&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java Tue Jul 28 18:55:37 2015
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * For historical reasons splits will always produce filters that pass
+ * everything through unchanged. This optimizer removes these.
+ * <p/>
+ * The condition we look for is POFilters with a constant boolean
+ * (true) expression as it's plan.
+ */
+public class NoopFilterRemover extends SparkOpPlanVisitor {
+ private Log log = LogFactory.getLog(NoopFilterRemover.class);
+
+ public NoopFilterRemover(SparkOperPlan plan) {
+ super(plan, new DependencyOrderWalker<SparkOperator, SparkOperPlan>(plan));
+ }
+
+ @Override
+ public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+ List<POFilter> filters = PlanHelper.getPhysicalOperators(sparkOp
+ .physicalPlan, POFilter.class);
+ for (POFilter filter : filters) {
+ PhysicalPlan filterPlan = filter.getPlan();
+ if (filterPlan.size() == 1) {
+ PhysicalOperator fp = filterPlan.getRoots().get(0);
+ if (fp instanceof ConstantExpression) {
+ ConstantExpression exp = (ConstantExpression) fp;
+ Object value = exp.getValue();
+ if (value instanceof Boolean) {
+ Boolean filterValue = (Boolean) value;
+ if (filterValue) {
+ removeFilter(filter, sparkOp.physicalPlan);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void removeFilter(POFilter filter, PhysicalPlan plan) {
+ if (plan.size() > 1) {
+ try {
+ List<PhysicalOperator> fInputs = filter.getInputs();
+ List<PhysicalOperator> sucs = plan.getSuccessors(filter);
+
+ plan.removeAndReconnect(filter);
+ if (sucs != null && sucs.size() != 0) {
+ for (PhysicalOperator suc : sucs) {
+ suc.setInputs(fInputs);
+ }
+ }
+ } catch (PlanException pe) {
+ log.info("Couldn't remove a filter in optimizer: " + pe.getMessage());
+ }
+ }
+ }
+}