You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/31 03:33:09 UTC
svn commit: r1635656 [1/5] - in /hive/branches/spark:
itests/src/test/resources/ ql/src/java/org/apache/hadoop/hive/ql/lib/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/
ql/src/java/org/apache/h...
Author: xuefu
Date: Fri Oct 31 02:33:08 2014
New Revision: 1635656
URL: http://svn.apache.org/r1635656
Log:
HIVE-8202: Support SMB Join for Hive on Spark [Spark Branch] (Szehon via Xuefu)
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_16.q.out
Modified:
hive/branches/spark/itests/src/test/resources/testconfiguration.properties
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join32.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_smb_mapjoin_14.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_1.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_13.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_14.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_15.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_2.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_3.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_4.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_5.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_6.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_7.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_8.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_9.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket2.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket3.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket4.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/disable_merge_for_bucketing.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/load_dyn_part2.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/parquet_join.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/smb_mapjoin_17.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/smb_mapjoin_20.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/smb_mapjoin_21.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/smb_mapjoin_25.q.out
Modified: hive/branches/spark/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/src/test/resources/testconfiguration.properties?rev=1635656&r1=1635655&r2=1635656&view=diff
==============================================================================
--- hive/branches/spark/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/branches/spark/itests/src/test/resources/testconfiguration.properties Fri Oct 31 02:33:08 2014
@@ -463,6 +463,7 @@ spark.query.files=add_part_multiple.q, \
auto_sortmerge_join_13.q, \
auto_sortmerge_join_14.q, \
auto_sortmerge_join_15.q, \
+ auto_sortmerge_join_16.q, \
auto_sortmerge_join_2.q, \
auto_sortmerge_join_3.q, \
auto_sortmerge_join_4.q, \
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java?rev=1635656&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java Fri Oct 31 02:33:08 2014
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lib;
+
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.util.Stack;
+import java.util.regex.Matcher;
+
+/**
+ * Rule that matches a particular type of node.
+ */
+public class TypeRule implements Rule {
+
+ private Class nodeClass;
+
+ public TypeRule(Class<?> nodeClass) {
+ this.nodeClass = nodeClass;
+ }
+
+ @Override
+ public int cost(Stack<Node> stack) throws SemanticException {
+ if (stack == null) {
+ return -1;
+ }
+ if (nodeClass.isInstance(stack.peek())) {
+ return 1;
+ }
+ return -1;
+ }
+
+ @Override
+ public String getName() {
+ return nodeClass.getName();
+ }
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1635656&r1=1635655&r2=1635656&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Fri Oct 31 02:33:08 2014
@@ -111,7 +111,7 @@ public class Optimizer {
// If optimize hive.optimize.bucketmapjoin.sortedmerge is set, add both
// BucketMapJoinOptimizer and SortedMergeBucketMapJoinOptimizer
if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN))
- && !isTezExecEngine && !isSparkExecEngine) {
+ && !isTezExecEngine) {
if (!bucketMapJoinOptimizer) {
// No need to add BucketMapJoinOptimizer twice
transformations.add(new BucketMapJoinOptimizer());
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java?rev=1635656&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java Fri Oct 31 02:33:08 2014
@@ -0,0 +1,177 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hive.ql.optimizer.spark;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.GenSparkProcContext;
+import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+/**
+* Operator factory for Spark SMBJoin processing.
+*/
+public final class SparkSortMergeJoinFactory {
+
+ private SparkSortMergeJoinFactory() {
+ // prevent instantiation
+ }
+
+ /**
+ * Get the branch on which we are invoked (walking) from. See diagram below.
+ * We are at the SMBJoinOp and could have come from TS of any of the input tables.
+ */
+ public static int getPositionParent(SMBMapJoinOperator op,
+ Stack<Node> stack) {
+ int size = stack.size();
+ assert size >= 2 && stack.get(size - 1) == op;
+ Operator<? extends OperatorDesc> parent =
+ (Operator<? extends OperatorDesc>) stack.get(size - 2);
+ List<Operator<? extends OperatorDesc>> parOp = op.getParentOperators();
+ int pos = parOp.indexOf(parent);
+ return pos;
+ }
+
+ /**
+ * SortMergeMapJoin processor, input is a SMBJoinOp that is part of a MapWork:
+ *
+ * MapWork:
+ *
+ * (Big) (Small) (Small)
+ * TS TS TS
+ * \ | /
+ * \ DS DS
+ * \ | /
+ * SMBJoinOP
+ *
+ * 1. Initializes the MapWork's aliasToWork, pointing to big-table's TS.
+ * 2. Adds the bucketing information to the MapWork.
+ * 3. Adds localwork to the MapWork, with localWork's aliasToWork pointing to small-table's TS.
+ */
+ private static class SortMergeJoinProcessor implements NodeProcessor {
+
+ public static void setupBucketMapJoinInfo(MapWork plan, SMBMapJoinOperator currMapJoinOp) {
+ if (currMapJoinOp != null) {
+ Map<String, Map<String, List<String>>> aliasBucketFileNameMapping =
+ currMapJoinOp.getConf().getAliasBucketFileNameMapping();
+ if (aliasBucketFileNameMapping != null) {
+ MapredLocalWork localPlan = plan.getMapLocalWork();
+ if (localPlan == null) {
+ localPlan = currMapJoinOp.getConf().getLocalWork();
+ } else {
+ // local plan is not null, we want to merge it into SMBMapJoinOperator's local work
+ MapredLocalWork smbLocalWork = currMapJoinOp.getConf().getLocalWork();
+ if (smbLocalWork != null) {
+ localPlan.getAliasToFetchWork().putAll(smbLocalWork.getAliasToFetchWork());
+ localPlan.getAliasToWork().putAll(smbLocalWork.getAliasToWork());
+ }
+ }
+
+ if (localPlan == null) {
+ return;
+ }
+ plan.setMapLocalWork(null);
+ currMapJoinOp.getConf().setLocalWork(localPlan);
+
+ BucketMapJoinContext bucketMJCxt = new BucketMapJoinContext();
+ localPlan.setBucketMapjoinContext(bucketMJCxt);
+ bucketMJCxt.setAliasBucketFileNameMapping(aliasBucketFileNameMapping);
+ bucketMJCxt.setBucketFileNameMapping(
+ currMapJoinOp.getConf().getBigTableBucketNumMapping());
+ localPlan.setInputFileChangeSensitive(true);
+ bucketMJCxt.setMapJoinBigTableAlias(currMapJoinOp.getConf().getBigTableAlias());
+ bucketMJCxt
+ .setBucketMatcherClass(org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class);
+ bucketMJCxt.setBigTablePartSpecToFileMapping(
+ currMapJoinOp.getConf().getBigTablePartSpecToFileMapping());
+
+ plan.setUseBucketizedHiveInputFormat(true);
+
+ }
+ }
+ }
+
+ /**
+ * Initialize the mapWork.
+ *
+ * @param opProcCtx
+ * processing context
+ */
+ private static void initSMBJoinPlan(MapWork mapWork,
+ GenSparkProcContext opProcCtx, boolean local)
+ throws SemanticException {
+ TableScanOperator ts = (TableScanOperator) opProcCtx.currentRootOperator;
+ String currAliasId = findAliasId(opProcCtx, ts);
+ GenMapRedUtils.setMapWork(mapWork, opProcCtx.parseContext,
+ opProcCtx.inputs, null, ts, currAliasId, opProcCtx.conf, local);
+ }
+
+ private static String findAliasId(GenSparkProcContext opProcCtx, TableScanOperator ts) {
+ for (String alias : opProcCtx.topOps.keySet()) {
+ if (opProcCtx.topOps.get(alias) == ts) {
+ return alias;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * 1. Initializes the MapWork's aliasToWork, pointing to big-table's TS.
+ * 2. Adds the bucketing information to the MapWork.
+ * 3. Adds localwork to the MapWork, with localWork's aliasToWork pointing to small-table's TS.
+ */
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ SMBMapJoinOperator mapJoin = (SMBMapJoinOperator) nd;
+ GenSparkProcContext ctx = (GenSparkProcContext) procCtx;
+
+ SparkTask currTask = ctx.currentTask;
+
+ // find the branch on which this processor was invoked
+ int pos = getPositionParent(mapJoin, stack);
+ boolean local = pos != mapJoin.getConf().getPosBigTable();
+
+ MapWork mapWork = ctx.smbJoinWorkMap.get(mapJoin);
+ initSMBJoinPlan(mapWork, ctx, local);
+
+ // find the associated mapWork that contains this processor.
+ setupBucketMapJoinInfo(mapWork, mapJoin);
+
+ // local aliases need not to hand over context further
+ return false;
+ }
+ }
+
+ public static NodeProcessor getTableScanMapJoin() {
+ return new SortMergeJoinProcessor();
+ }
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java?rev=1635656&r1=1635655&r2=1635656&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java Fri Oct 31 02:33:08 2014
@@ -18,20 +18,13 @@
package org.apache.hadoop.hive.ql.parse.spark;
-import java.io.Serializable;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -49,6 +42,14 @@ import org.apache.hadoop.hive.ql.plan.Op
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
import org.apache.hadoop.hive.ql.plan.SparkWork;
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
/**
* GenSparkProcContext maintains information about the tasks and operators
* as we walk the operator tree to break them into SparkTasks.
@@ -100,6 +101,9 @@ public class GenSparkProcContext impleme
// map that says which mapjoin belongs to which work item
public final Map<MapJoinOperator, List<BaseWork>> mapJoinWorkMap;
+ // a map to keep track of which MapWork item holds which SMBMapJoinOp
+ public final Map<SMBMapJoinOperator, MapWork> smbJoinWorkMap;
+
// a map to keep track of which root generated which work
public final Map<Operator<?>, BaseWork> rootToWorkMap;
@@ -134,13 +138,6 @@ public class GenSparkProcContext impleme
// This is necessary as sometimes semantic analyzer's mapping is different than operator's own alias.
public final Map<String, Operator<? extends OperatorDesc>> topOps;
- // Keep track of the current table alias (from last TableScan)
- public String currentAliasId;
-
- // Keep track of the current Table-Scan.
- public TableScanOperator currentTs;
-
-
@SuppressWarnings("unchecked")
public GenSparkProcContext(HiveConf conf, ParseContext parseContext,
List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -158,6 +155,7 @@ public class GenSparkProcContext impleme
this.leafOperatorToFollowingWork = new LinkedHashMap<Operator<?>, BaseWork>();
this.linkOpWithWorkMap = new LinkedHashMap<Operator<?>, Map<BaseWork, SparkEdgeProperty>>();
this.linkWorkWithReduceSinkMap = new LinkedHashMap<BaseWork, List<ReduceSinkOperator>>();
+ this.smbJoinWorkMap = new LinkedHashMap<SMBMapJoinOperator, MapWork>();
this.mapJoinWorkMap = new LinkedHashMap<MapJoinOperator, List<BaseWork>>();
this.rootToWorkMap = new LinkedHashMap<Operator<?>, BaseWork>();
this.childToWorkMap = new LinkedHashMap<Operator<?>, List<BaseWork>>();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1635656&r1=1635655&r2=1635656&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Fri Oct 31 02:33:08 2014
@@ -18,21 +18,14 @@
package org.apache.hadoop.hive.ql.parse.spark;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
@@ -49,15 +42,21 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* GenSparkUtils is a collection of shared helper methods to produce SparkWork
@@ -94,7 +93,7 @@ public class GenSparkUtils {
return unionWork;
}
- public ReduceWork createReduceWork(GenSparkProcContext context, Operator<?> root, SparkWork sparkWork) {
+ public ReduceWork createReduceWork(GenSparkProcContext context, Operator<?> root, SparkWork sparkWork) throws SemanticException {
Preconditions.checkArgument(!root.getParentOperators().isEmpty(),
"AssertionError: expected root.getParentOperators() to be non-empty");
@@ -128,10 +127,19 @@ public class GenSparkUtils {
}
if (reduceWork.getReducer() instanceof JoinOperator) {
- //reduce-side join
+ //reduce-side join, use MR-style shuffle
edgeProp.setMRShuffle();
}
+ //If its a FileSink to bucketed files, also use MR-style shuffle to get compatible taskId for bucket-name
+ FileSinkOperator fso = getChildOperator(reduceWork.getReducer(), FileSinkOperator.class);
+ if (fso != null) {
+ String bucketCount = fso.getConf().getTableInfo().getProperties().getProperty(hive_metastoreConstants.BUCKET_COUNT);
+ if (bucketCount != null && Integer.valueOf(bucketCount) > 1) {
+ edgeProp.setMRShuffle();
+ }
+ }
+
sparkWork.connect(
context.preceedingWork,
reduceWork, edgeProp);
@@ -158,7 +166,12 @@ public class GenSparkUtils {
}
public MapWork createMapWork(GenSparkProcContext context, Operator<?> root,
- SparkWork sparkWork, PrunedPartitionList partitions) throws SemanticException {
+ SparkWork sparkWork, PrunedPartitionList partitions) throws SemanticException {
+ return createMapWork(context, root, sparkWork, partitions, false);
+ }
+
+ public MapWork createMapWork(GenSparkProcContext context, Operator<?> root,
+ SparkWork sparkWork, PrunedPartitionList partitions, boolean deferSetup) throws SemanticException {
Preconditions.checkArgument(root.getParentOperators().isEmpty(),
"AssertionError: expected root.getParentOperators() to be empty");
MapWork mapWork = new MapWork("Map "+ (++sequenceNumber));
@@ -170,7 +183,9 @@ public class GenSparkUtils {
root.getClass().getName());
String alias = ((TableScanOperator)root).getConf().getAlias();
- setupMapWork(mapWork, context, partitions, root, alias);
+ if (!deferSetup) {
+ setupMapWork(mapWork, context, partitions, root, alias);
+ }
// add new item to the Spark work
sparkWork.add(mapWork);
@@ -322,27 +337,17 @@ public class GenSparkUtils {
return true;
}
-
- /**
- * Is an operator of the given class a child of the given operator. This is more flexible
- * than GraphWalker to tell apart subclasses such as SMBMapJoinOp vs MapJoinOp that have a common name.
- * @param op parent operator to start search
- * @param klazz given class
- * @return
- * @throws SemanticException
- */
- public static Operator<?> getChildOperator(Operator<?> op, Class klazz) throws SemanticException {
+ public static <T> T getChildOperator(Operator<?> op, Class<T> klazz) throws SemanticException {
if (klazz.isInstance(op)) {
- return op;
+ return (T) op;
}
List<Operator<?>> childOperators = op.getChildOperators();
for (Operator<?> childOp : childOperators) {
- Operator result = getChildOperator(childOp, klazz);
+ T result = getChildOperator(childOp, klazz);
if (result != null) {
return result;
}
}
return null;
}
-
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1635656&r1=1635655&r2=1635656&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java Fri Oct 31 02:33:08 2014
@@ -18,36 +18,40 @@
package org.apache.hadoop.hive.ql.parse.spark;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Stack;
-
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Stack;
/**
* GenSparkWork separates the operator tree into spark tasks.
@@ -102,6 +106,26 @@ public class GenSparkWork implements Nod
SparkWork sparkWork = context.currentTask.getWork();
+
+ if (GenSparkUtils.getChildOperator(root, DummyStoreOperator.class) != null) {
+ /*
+ * SMB join case:
+ *
+ * (Big) (Small) (Small)
+ * TS TS TS
+ * \ | /
+ * \ DS DS
+ * \ | /
+ * SMBJoinOP
+ *
+ * Only create MapWork rooted at TS of big table.
+ * If there are dummy-store operators anywhere in TS's children path, then this is for the small tables.
+ * No separate Map-Task need to be created for small table TS, as they will be read by the MapWork of the big-table.
+ */
+ return null;
+ }
+ SMBMapJoinOperator smbOp = (SMBMapJoinOperator) GenSparkUtils.getChildOperator(root, SMBMapJoinOperator.class);
+
// Right now the work graph is pretty simple. If there is no
// Preceding work we have a root and will generate a map
// vertex. If there is a preceding work we will generate
@@ -119,7 +143,18 @@ public class GenSparkWork implements Nod
} else {
// create a new vertex
if (context.preceedingWork == null) {
- work = utils.createMapWork(context, root, sparkWork, null);
+ if (smbOp != null) {
+ //This logic is for SortMergeBucket MapJoin case.
+ //This MapWork (of big-table, see above..) is later initialized by SparkMapJoinFactory processor, so don't initialize it here.
+ //Just keep track of it in the context, for later processing.
+ work = utils.createMapWork(context, root, sparkWork, null, true);
+ if (context.smbJoinWorkMap.get(smbOp) != null) {
+ throw new SemanticException("Each SMBMapJoin should be associated only with one Mapwork");
+ }
+ context.smbJoinWorkMap.put(smbOp, (MapWork) work);
+ } else {
+ work = utils.createMapWork(context, root, sparkWork, null);
+ }
} else {
work = utils.createReduceWork(context, root, sparkWork);
}
@@ -284,9 +319,17 @@ public class GenSparkWork implements Nod
edgeProp.setShuffleSort();
}
if (rWork.getReducer() instanceof JoinOperator) {
- //reduce-side join
+ //reduce-side join, use MR-style shuffle
edgeProp.setMRShuffle();
}
+ //If its a FileSink to bucketed files, also use MR-style shuffle to get compatible taskId for bucket-name
+ FileSinkOperator fso = GenSparkUtils.getChildOperator(rWork.getReducer(), FileSinkOperator.class);
+ if (fso != null) {
+ String bucketCount = fso.getConf().getTableInfo().getProperties().getProperty(hive_metastoreConstants.BUCKET_COUNT);
+ if (bucketCount != null && Integer.valueOf(bucketCount) > 1) {
+ edgeProp.setMRShuffle();
+ }
+ }
sparkWork.connect(work, rWork, edgeProp);
context.connectedReduceSinks.add(rs);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1635656&r1=1635655&r2=1635656&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Fri Oct 31 02:33:08 2014
@@ -17,27 +17,15 @@
*/
package org.apache.hadoop.hive.ql.parse.spark;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
@@ -54,6 +42,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.lib.TypeRule;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck;
import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer;
@@ -61,8 +50,7 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
import org.apache.hadoop.hive.ql.optimizer.spark.SetSparkReducerParallelism;
-import org.apache.hadoop.hive.ql.optimizer.spark.SparkMapJoinOptimizer;
-import org.apache.hadoop.hive.ql.optimizer.spark.SparkReduceSinkMapJoinProc;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory;
import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -73,6 +61,17 @@ import org.apache.hadoop.hive.ql.plan.Mo
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
/**
* SparkCompiler translates the operator plan into SparkTasks.
*
@@ -181,6 +180,20 @@ public class SparkCompiler extends TaskC
GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
ogw.startWalking(topNodes, null);
+
+
+ // ------------------- Second Pass -----------------------
+ // SMB Join optimizations to add the "localWork" and bucketing data structures to MapWork.
+ opRules.clear();
+ opRules.put(new TypeRule(SMBMapJoinOperator.class),
+ SparkSortMergeJoinFactory.getTableScanMapJoin());
+
+ disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+ topNodes = new ArrayList<Node>();
+ topNodes.addAll(pCtx.getTopOps().values());
+ ogw = new GenSparkWorkWalker(disp, procCtx);
+ ogw.startWalking(topNodes, null);
+
// we need to clone some operator plans and remove union operators still
for (BaseWork w: procCtx.workWithUnionOperators) {
GenSparkUtils.getUtils().removeUnionOperators(conf, procCtx, w);
Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join32.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join32.q.out?rev=1635656&r1=1635655&r2=1635656&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join32.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join32.q.out Fri Oct 31 02:33:08 2014
@@ -168,77 +168,51 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP PARTITION-LEVEL SORT, 1), Map 4 (GROUP PARTITION-LEVEL SORT, 1)
- Reducer 3 <- Reducer 2 (GROUP SORT, 1)
+ Reducer 2 <- Map 1 (GROUP SORT, 1)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
- alias: v
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Filter Operator
- predicate: name is not null (type: boolean)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reduce Output Operator
- key expressions: name (type: string)
- sort order: +
- Map-reduce partition columns: name (type: string)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- value expressions: registration (type: string)
- Map 4
- Map Operator Tree:
- TableScan
alias: s
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Filter Operator
predicate: name is not null (type: boolean)
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reduce Output Operator
- key expressions: name (type: string)
- sort order: +
- Map-reduce partition columns: name (type: string)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ Sorted Merge Bucket Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {name}
+ 1 {registration}
+ keys:
+ 0 name (type: string)
+ 1 name (type: string)
+ outputColumnNames: _col0, _col8
+ Select Operator
+ expressions: _col0 (type: string), _col8 (type: string)
+ outputColumnNames: _col0, _col8
+ Group By Operator
+ aggregations: count(DISTINCT _col8)
+ keys: _col0 (type: string), _col8 (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2
+ Reduce Output Operator
+ key expressions: _col0 (type: string), _col1 (type: string)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: string)
Reducer 2
Reduce Operator Tree:
- Join Operator
- condition map:
- Inner Join 0 to 1
- condition expressions:
- 0 {KEY.reducesinkkey0}
- 1 {VALUE._col1}
- outputColumnNames: _col0, _col8
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Select Operator
- expressions: _col0 (type: string), _col8 (type: string)
- outputColumnNames: _col0, _col8
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Group By Operator
- aggregations: count(DISTINCT _col8)
- keys: _col0 (type: string), _col8 (type: string)
- mode: hash
- outputColumnNames: _col0, _col1, _col2
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: string), _col1 (type: string)
- sort order: ++
- Map-reduce partition columns: _col0 (type: string)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reducer 3
- Reduce Operator Tree:
Group By Operator
aggregations: count(DISTINCT KEY._col1:0._col0)
keys: KEY._col0 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Select Operator
expressions: _col0 (type: string), _col1 (type: bigint)
outputColumnNames: _col0, _col1
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
File Output Operator
compressed: false
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
@@ -316,77 +290,51 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP PARTITION-LEVEL SORT, 1), Map 4 (GROUP PARTITION-LEVEL SORT, 1)
- Reducer 3 <- Reducer 2 (GROUP SORT, 1)
+ Reducer 2 <- Map 1 (GROUP SORT, 1)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
- alias: v
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Filter Operator
- predicate: name is not null (type: boolean)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reduce Output Operator
- key expressions: name (type: string)
- sort order: +
- Map-reduce partition columns: name (type: string)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- value expressions: registration (type: string)
- Map 4
- Map Operator Tree:
- TableScan
alias: s
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Filter Operator
predicate: name is not null (type: boolean)
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reduce Output Operator
- key expressions: name (type: string)
- sort order: +
- Map-reduce partition columns: name (type: string)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ Sorted Merge Bucket Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {name}
+ 1 {registration}
+ keys:
+ 0 name (type: string)
+ 1 name (type: string)
+ outputColumnNames: _col0, _col8
+ Select Operator
+ expressions: _col0 (type: string), _col8 (type: string)
+ outputColumnNames: _col0, _col8
+ Group By Operator
+ aggregations: count(DISTINCT _col8)
+ keys: _col0 (type: string), _col8 (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2
+ Reduce Output Operator
+ key expressions: _col0 (type: string), _col1 (type: string)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: string)
Reducer 2
Reduce Operator Tree:
- Join Operator
- condition map:
- Inner Join 0 to 1
- condition expressions:
- 0 {KEY.reducesinkkey0}
- 1 {VALUE._col1}
- outputColumnNames: _col0, _col8
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Select Operator
- expressions: _col0 (type: string), _col8 (type: string)
- outputColumnNames: _col0, _col8
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Group By Operator
- aggregations: count(DISTINCT _col8)
- keys: _col0 (type: string), _col8 (type: string)
- mode: hash
- outputColumnNames: _col0, _col1, _col2
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: string), _col1 (type: string)
- sort order: ++
- Map-reduce partition columns: _col0 (type: string)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reducer 3
- Reduce Operator Tree:
Group By Operator
aggregations: count(DISTINCT KEY._col1:0._col0)
keys: KEY._col0 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Select Operator
expressions: _col0 (type: string), _col1 (type: bigint)
outputColumnNames: _col0, _col1
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
File Output Operator
compressed: false
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
@@ -488,52 +436,22 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP PARTITION-LEVEL SORT, 1), Map 4 (GROUP PARTITION-LEVEL SORT, 1)
- Reducer 3 <- Reducer 2 (GROUP SORT, 1)
+ Reducer 2 <- Map 1 (GROUP SORT, 1)
#### A masked pattern was here ####
Vertices:
Map 1
- Map 4
Reducer 2
Reduce Operator Tree:
- Join Operator
- condition map:
- Inner Join 0 to 1
- condition expressions:
- 0 {KEY.reducesinkkey0}
- 1 {VALUE._col1}
- outputColumnNames: _col0, _col9
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Select Operator
- expressions: _col0 (type: string), _col9 (type: string)
- outputColumnNames: _col0, _col9
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Group By Operator
- aggregations: count(DISTINCT _col9)
- keys: _col0 (type: string), _col9 (type: string)
- mode: hash
- outputColumnNames: _col0, _col1, _col2
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: string), _col1 (type: string)
- sort order: ++
- Map-reduce partition columns: _col0 (type: string)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reducer 3
- Reduce Operator Tree:
Group By Operator
aggregations: count(DISTINCT KEY._col1:0._col0)
keys: KEY._col0 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Select Operator
expressions: _col0 (type: string), _col1 (type: bigint)
outputColumnNames: _col0, _col1
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
File Output Operator
compressed: false
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat