You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2015/03/02 20:12:37 UTC
drill git commit: DRILL-2236: Optimize hash inner join by swapping
inputs based on row count comparison. Add a planner option to enable/disable
this feature.
Repository: drill
Updated Branches:
refs/heads/master 9c0738d94 -> 3442215fd
DRILL-2236: Optimize hash inner join by swapping inputs based on row count comparison. Add a planner option to enable/disable this feature.
Revise code based on review comments.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3442215f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3442215f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3442215f
Branch: refs/heads/master
Commit: 3442215fd91e700f659bc055cd7c05b623bc59b3
Parents: 9c0738d
Author: Jinfeng Ni <jn...@maprtech.com>
Authored: Thu Jan 29 13:24:28 2015 -0800
Committer: Jinfeng Ni <jn...@maprtech.com>
Committed: Mon Mar 2 10:03:31 2015 -0800
----------------------------------------------------------------------
.../exec/planner/physical/HashJoinPrel.java | 54 +++++++++----
.../drill/exec/planner/physical/JoinPrel.java | 4 +-
.../exec/planner/physical/MergeJoinPrel.java | 2 +-
.../exec/planner/physical/PlannerSettings.java | 11 +++
.../physical/explain/NumberingRelWriter.java | 7 ++
.../physical/visitor/SwapHashJoinVisitor.java | 79 ++++++++++++++++++++
.../planner/sql/handlers/DefaultSqlHandler.java | 13 +++-
.../server/options/SystemOptionManager.java | 2 +
8 files changed, 154 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
index a3c42de..f63057f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.physical;
import java.io.IOException;
import java.util.List;
+import net.hydromatic.optiq.runtime.FlatLists;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.exec.ExecConstants;
@@ -46,18 +47,24 @@ import com.google.common.collect.Lists;
public class HashJoinPrel extends JoinPrel {
+ private boolean swapped = false;
+
public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
- JoinRelType joinType) throws InvalidRelException {
- super(cluster, traits, left, right, condition, joinType);
+ JoinRelType joinType) throws InvalidRelException {
+ this(cluster, traits, left, right, condition, joinType, false);
+ }
+ public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+ JoinRelType joinType, boolean swapped) throws InvalidRelException {
+ super(cluster, traits, left, right, condition, joinType);
+ this.swapped = swapped;
RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys);
}
-
@Override
public JoinRelBase copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
try {
- return new HashJoinPrel(this.getCluster(), traitSet, left, right, conditionExpr, joinType);
+ return new HashJoinPrel(this.getCluster(), traitSet, left, right, conditionExpr, joinType, this.swapped);
}catch (InvalidRelException e) {
throw new AssertionError(e);
}
@@ -100,11 +107,32 @@ public class HashJoinPrel extends JoinPrel {
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+ // Depending on whether the left/right is swapped for hash inner join, pass in different
+ // combinations of parameters.
+ if (! swapped) {
+ return getHashJoinPop(creator, left, right, leftKeys, rightKeys);
+ } else {
+ return getHashJoinPop(creator, right, left, rightKeys, leftKeys);
+ }
+ }
+
+ @Override
+ public SelectionVectorMode[] getSupportedEncodings() {
+ return SelectionVectorMode.DEFAULT;
+ }
+
+ @Override
+ public SelectionVectorMode getEncoding() {
+ return SelectionVectorMode.NONE;
+ }
+
+ private PhysicalOperator getHashJoinPop(PhysicalPlanCreator creator, RelNode left, RelNode right,
+ List<Integer> leftKeys, List<Integer> rightKeys) throws IOException{
final List<String> fields = getRowType().getFieldNames();
assert isUnique(fields);
- final int leftCount = left.getRowType().getFieldCount();
- final List<String> leftFields = fields.subList(0, leftCount);
- final List<String> rightFields = fields.subList(leftCount, fields.size());
+
+ final List<String> leftFields = left.getRowType().getFieldNames();
+ final List<String> rightFields = right.getRowType().getFieldNames();
PhysicalOperator leftPop = ((Prel)left).getPhysicalOperator(creator);
PhysicalOperator rightPop = ((Prel)right).getPhysicalOperator(creator);
@@ -113,20 +141,18 @@ public class HashJoinPrel extends JoinPrel {
List<JoinCondition> conditions = Lists.newArrayList();
- buildJoinConditions(conditions, leftFields, rightFields);
+ buildJoinConditions(conditions, leftFields, rightFields, leftKeys, rightKeys);
HashJoinPOP hjoin = new HashJoinPOP(leftPop, rightPop, conditions, jtype);
return creator.addMetadata(this, hjoin);
}
- @Override
- public SelectionVectorMode[] getSupportedEncodings() {
- return SelectionVectorMode.DEFAULT;
+ public void setSwapped(boolean swapped) {
+ this.swapped = swapped;
}
- @Override
- public SelectionVectorMode getEncoding() {
- return SelectionVectorMode.NONE;
+ public boolean isSwapped() {
+ return this.swapped;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
index 3541db7..bfecd06 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
@@ -118,7 +118,9 @@ public abstract class JoinPrel extends DrillJoinRelBase implements Prel{
*/
protected void buildJoinConditions(List<JoinCondition> conditions,
List<String> leftFields,
- List<String> rightFields) {
+ List<String> rightFields,
+ List<Integer> leftKeys,
+ List<Integer> rightKeys) {
List<RexNode> conjuncts = RelOptUtil.conjunctions(this.getCondition());
short i=0;
http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
index 394a82c..b7e86e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
@@ -99,7 +99,7 @@ public class MergeJoinPrel extends JoinPrel {
List<JoinCondition> conditions = Lists.newArrayList();
- buildJoinConditions(conditions, leftFields, rightFields);
+ buildJoinConditions(conditions, leftFields, rightFields, leftKeys, rightKeys);
MergeJoinPOP mjoin = new MergeJoinPOP(leftPop, rightPop, conditions, jtype);
return creator.addMetadata(this, mjoin);
http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 96be07d..bbfbbcb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -49,6 +49,9 @@ public class PlannerSettings implements Context{
public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", false);
public static final OptionValidator PRODUCER_CONSUMER_QUEUE_SIZE = new LongValidator("planner.producer_consumer_queue_size", 10);
public static final OptionValidator HASH_SINGLE_KEY = new BooleanValidator("planner.enable_hash_single_key", true);
+ public static final OptionValidator HASH_JOIN_SWAP = new BooleanValidator("planner.enable_hashjoin_swap", true);
+ public static final OptionValidator HASH_JOIN_SWAP_MARGIN_FACTOR = new RangeDoubleValidator("planner.join.hash_join_swap_margin_factor", 0, 100, 10d);
+
public static final OptionValidator IDENTIFIER_MAX_LENGTH =
new RangeLongValidator("planner.identifier_max_length", 128 /* A minimum length is needed because option names are identifiers themselves */,
Integer.MAX_VALUE, DEFAULT_IDENTIFIER_MAX_LENGTH);
@@ -117,6 +120,14 @@ public class PlannerSettings implements Context{
return options.getOption(HASH_SINGLE_KEY.getOptionName()).bool_val;
}
+ public boolean isHashJoinSwapEnabled() {
+ return options.getOption(HASH_JOIN_SWAP.getOptionName()).bool_val;
+ }
+
+ public double getHashJoinSwapMarginFactor() {
+ return options.getOption(HASH_JOIN_SWAP_MARGIN_FACTOR.getOptionName()).float_val / 100d;
+ }
+
public long getBroadcastThreshold() {
return options.getOption(BROADCAST_THRESHOLD.getOptionName()).num_val;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/NumberingRelWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/NumberingRelWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/NumberingRelWriter.java
index 6522ad9..387a442 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/NumberingRelWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/NumberingRelWriter.java
@@ -23,8 +23,10 @@ import java.util.List;
import java.util.Map;
import net.hydromatic.linq4j.Ord;
+import net.hydromatic.optiq.runtime.FlatLists;
import net.hydromatic.optiq.runtime.Spacer;
+import org.apache.drill.exec.planner.physical.HashJoinPrel;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.explain.PrelSequencer.OpId;
import org.eigenbase.rel.RelNode;
@@ -62,6 +64,10 @@ class NumberingRelWriter implements RelWriter {
RelNode rel,
List<Pair<String, Object>> values) {
List<RelNode> inputs = rel.getInputs();
+ if (rel instanceof HashJoinPrel && ((HashJoinPrel) rel).isSwapped()) {
+ HashJoinPrel joinPrel = (HashJoinPrel) rel;
+ inputs = FlatLists.of(joinPrel.getRight(), joinPrel.getLeft());
+ }
if (!RelMetadataQuery.isVisibleInExplain(
rel,
@@ -106,6 +112,7 @@ class NumberingRelWriter implements RelWriter {
}
}
if (detailLevel == SqlExplainLevel.ALL_ATTRIBUTES) {
+ s.append(" : rowType = " + rel.getRowType().toString());
s.append(": rowcount = ")
.append(RelMetadataQuery.getRowCount(rel))
.append(", cumulative cost = ")
http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
new file mode 100644
index 0000000..18d5e60
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.physical.visitor;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.planner.physical.HashJoinPrel;
+import org.apache.drill.exec.planner.physical.JoinPrel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.eigenbase.rel.JoinRelType;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rex.RexNode;
+
+import java.util.List;
+
+/**
+ * Visit Prel tree. Find all the HashJoinPrel nodes and set the flag to swap the Left/Right for HashJoinPrel
+ * when 1) It's inner join, 2) left rowcount is < (1 + percentage) * right_row_count.
+ * The purpose of this visitor is to prevent planner from putting bigger dataset in the RIGHT side,
+ * which is not good performance-wise.
+ *
+ * @see org.apache.drill.exec.planner.physical.HashJoinPrel
+ */
+
+public class SwapHashJoinVisitor extends BasePrelVisitor<Prel, Double, RuntimeException>{
+
+ private static SwapHashJoinVisitor INSTANCE = new SwapHashJoinVisitor();
+
+ public static Prel swapHashJoin(Prel prel, Double marginFactor){
+ return prel.accept(INSTANCE, marginFactor);
+ }
+
+ private SwapHashJoinVisitor() {
+
+ }
+
+ @Override
+ public Prel visitPrel(Prel prel, Double value) throws RuntimeException {
+ List<RelNode> children = Lists.newArrayList();
+ for(Prel child : prel){
+ child = child.accept(this, value);
+ children.add(child);
+ }
+
+ return (Prel) prel.copy(prel.getTraitSet(), children);
+ }
+
+ @Override
+ public Prel visitJoin(JoinPrel prel, Double value) throws RuntimeException {
+ JoinPrel newJoin = (JoinPrel) visitPrel(prel, value);
+
+ if (prel instanceof HashJoinPrel) {
+ // Mark left/right is swapped, when INNER hash join's left row count < ( 1+ margin factor) right row count.
+ if (newJoin.getLeft().getRows() < (1 + value.doubleValue() ) * newJoin.getRight().getRows() &&
+ newJoin.getJoinType() == JoinRelType.INNER) {
+ ( (HashJoinPrel) newJoin).setSwapped(true);
+ }
+ }
+
+ return newJoin;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 35e7f5c..232778a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -56,6 +56,7 @@ import org.apache.drill.exec.planner.physical.visitor.RewriteProjectToFlatten;
import org.apache.drill.exec.planner.physical.visitor.SelectionVectorPrelVisitor;
import org.apache.drill.exec.planner.physical.visitor.SplitUpComplexExpressions;
import org.apache.drill.exec.planner.physical.visitor.StarColumnConverter;
+import org.apache.drill.exec.planner.physical.visitor.SwapHashJoinVisitor;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.planner.sql.parser.UnsupportedOperatorsVisitor;
import org.apache.drill.exec.server.options.OptionManager;
@@ -246,12 +247,20 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
phyRelNode = JoinPrelRenameVisitor.insertRenameProject(phyRelNode);
/*
- * 1.1) Break up all expressions with complex outputs into their own project operations
+ * 1.1) Swap left / right for INNER hash join, if left's row count is < (1 + margin) right's row count.
+ * We want to have smaller dataset on the right side, since hash table builds on right side.
+ */
+ if (context.getPlannerSettings().isHashJoinSwapEnabled()) {
+ phyRelNode = SwapHashJoinVisitor.swapHashJoin(phyRelNode, new Double(context.getPlannerSettings().getHashJoinSwapMarginFactor()));
+ }
+
+ /*
+ * 1.2) Break up all expressions with complex outputs into their own project operations
*/
phyRelNode = ((Prel) phyRelNode).accept(new SplitUpComplexExpressions(planner.getTypeFactory(), context.getDrillOperatorTable(), context.getPlannerSettings().functionImplementationRegistry), null);
/*
- * 1.2) Projections that contain reference to flatten are rewritten as Flatten operators followed by Project
+ * 1.3) Projections that contain reference to flatten are rewritten as Flatten operators followed by Project
*/
phyRelNode = ((Prel) phyRelNode).accept(new RewriteProjectToFlatten(planner.getTypeFactory(), context.getDrillOperatorTable()), null);
http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index aa0a5ad..3d3e96f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -54,6 +54,8 @@ public class SystemOptionManager implements OptionManager {
PlannerSettings.PRODUCER_CONSUMER_QUEUE_SIZE,
PlannerSettings.HASH_SINGLE_KEY,
PlannerSettings.IDENTIFIER_MAX_LENGTH,
+ PlannerSettings.HASH_JOIN_SWAP,
+ PlannerSettings.HASH_JOIN_SWAP_MARGIN_FACTOR,
ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION,
ExecConstants.OUTPUT_FORMAT_VALIDATOR,
ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,