You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2015/07/22 22:48:11 UTC
[43/50] [abbrv] hive git commit: HIVE-11303: Getting Tez
LimitExceededException after dag execution on large query (Jason Dere,
reviewed by Vikram Dixit)
HIVE-11303: Getting Tez LimitExceededException after dag execution on large query (Jason Dere, reviewed by Vikram Dixit)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/04d54f61
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/04d54f61
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/04d54f61
Branch: refs/heads/hbase-metastore
Commit: 04d54f61c9f56906160936751e772080c079498c
Parents: 9904162
Author: Jason Dere <jd...@hortonworks.com>
Authored: Tue Jul 21 14:03:12 2015 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Tue Jul 21 14:03:12 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../test/resources/testconfiguration.properties | 4 +
.../apache/hadoop/hive/ql/exec/JoinUtil.java | 87 +-
.../hadoop/hive/ql/exec/MapJoinOperator.java | 2 +-
.../apache/hadoop/hive/ql/exec/Operator.java | 6 +
.../hive/ql/exec/tez/KeyValuesAdapter.java | 47 ++
.../hive/ql/exec/tez/KeyValuesFromKeyValue.java | 90 +++
.../ql/exec/tez/KeyValuesFromKeyValues.java | 48 ++
.../hive/ql/exec/tez/ReduceRecordProcessor.java | 11 +-
.../hive/ql/exec/tez/ReduceRecordSource.java | 12 +-
.../ql/exec/vector/VectorMapJoinOperator.java | 1 -
.../mapjoin/VectorMapJoinCommonOperator.java | 1 +
.../hive/ql/optimizer/ConvertJoinMapJoin.java | 213 +++--
.../hive/ql/optimizer/MapJoinProcessor.java | 44 +-
.../ql/optimizer/ReduceSinkMapJoinProc.java | 84 +-
.../hadoop/hive/ql/parse/GenTezProcContext.java | 12 +
.../hadoop/hive/ql/parse/GenTezUtils.java | 23 +-
.../apache/hadoop/hive/ql/parse/GenTezWork.java | 81 +-
.../apache/hadoop/hive/ql/plan/BaseWork.java | 2 +-
.../hive/ql/plan/CommonMergeJoinDesc.java | 4 +
.../hadoop/hive/ql/plan/ExprNodeDescUtils.java | 115 +++
.../apache/hadoop/hive/ql/plan/MapJoinDesc.java | 11 +
.../apache/hadoop/hive/ql/plan/ReduceWork.java | 2 +-
.../clientpositive/tez_dynpart_hashjoin_1.q | 101 +++
.../clientpositive/tez_dynpart_hashjoin_2.q | 83 ++
.../tez_vector_dynpart_hashjoin_1.q | 102 +++
.../tez_vector_dynpart_hashjoin_2.q | 84 ++
.../tez/tez_dynpart_hashjoin_1.q.out | 791 ++++++++++++++++++
.../tez/tez_dynpart_hashjoin_2.q.out | 564 +++++++++++++
.../tez/tez_vector_dynpart_hashjoin_1.q.out | 804 +++++++++++++++++++
.../tez/tez_vector_dynpart_hashjoin_2.q.out | 570 +++++++++++++
31 files changed, 3899 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 39477d6..33b67dd 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1046,6 +1046,9 @@ public class HiveConf extends Configuration {
"job, process those skewed keys. The same key need not be skewed for all the tables, and so,\n" +
"the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a\n" +
"map-join."),
+ HIVEDYNAMICPARTITIONHASHJOIN("hive.optimize.dynamic.partition.hashjoin", false,
+ "Whether to enable dynamically partitioned hash join optimization. \n" +
+ "This setting is also dependent on enabling hive.auto.convert.join"),
HIVECONVERTJOIN("hive.auto.convert.join", true,
"Whether Hive enables the optimization about converting common join into mapjoin based on the input file size"),
HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", true,
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 97715fc..fbde465 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -334,6 +334,10 @@ minitez.query.files=bucket_map_join_tez1.q,\
tez_dml.q,\
tez_fsstat.q,\
tez_insert_overwrite_local_directory_1.q,\
+ tez_dynpart_hashjoin_1.q,\
+ tez_dynpart_hashjoin_2.q,\
+ tez_vector_dynpart_hashjoin_1.q,\
+ tez_vector_dynpart_hashjoin_2.q,\
tez_join_hash.q,\
tez_join_result_complex.q,\
tez_join_tests.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
index 7b57550..0aaa51a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.mapred.Reporter;
@@ -65,8 +66,21 @@ public class JoinUtil {
int iterate = Math.min(exprEntries.length, inputObjInspector.length);
for (byte alias = 0; alias < iterate; alias++) {
+ ObjectInspector inputOI = inputObjInspector[alias];
+
+ // For vectorized reduce-side operators getting inputs from a reduce sink,
+ // the row object inspector will get a flattened version of the object inspector
+ // where the nested key/value structs are replaced with a single struct:
+ // Example: { key: { reducesinkkey0:int }, value: { _col0:int, _col1:int, .. } }
+ // Would get converted to the following for a vectorized input:
+ // { 'key.reducesinkkey0':int, 'value._col0':int, 'value._col1':int, .. }
+ // The ExprNodeEvaluator initialzation below gets broken with the flattened
+ // object inpsectors, so convert it back to the a form that contains the
+ // nested key/value structs.
+ inputOI = unflattenObjInspector(inputOI);
+
if (alias == (byte) posBigTableAlias ||
- exprEntries[alias] == null || inputObjInspector[alias] == null) {
+ exprEntries[alias] == null || inputOI == null) {
// skip the driver and directly loadable tables
continue;
}
@@ -74,7 +88,7 @@ public class JoinUtil {
List<ExprNodeEvaluator> exprList = exprEntries[alias];
List<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>();
for (int i = 0; i < exprList.size(); i++) {
- fieldOIList.add(exprList.get(i).initialize(inputObjInspector[alias]));
+ fieldOIList.add(exprList.get(i).initialize(inputOI));
}
result[alias] = fieldOIList;
}
@@ -350,4 +364,73 @@ public class JoinUtil {
rc.setTableDesc(tblDesc);
return rc;
}
+
+ private static String KEY_FIELD_PREFIX = (Utilities.ReduceField.KEY + ".").toLowerCase();
+ private static String VALUE_FIELD_PREFIX = (Utilities.ReduceField.VALUE + ".").toLowerCase();
+
+ /**
+ * Create a new struct object inspector for the list of struct fields, first removing the
+ * prefix from the field name.
+ * @param fields
+ * @param prefixToRemove
+ * @return
+ */
+ private static ObjectInspector createStructFromFields(List<StructField> fields, String prefixToRemove) {
+ int prefixLength = prefixToRemove.length() + 1; // also remove the '.' after the prefix
+ ArrayList<String> fieldNames = new ArrayList<String>();
+ ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+ for (StructField field : fields) {
+ fieldNames.add(field.getFieldName().substring(prefixLength));
+ fieldOIs.add(field.getFieldObjectInspector());
+ }
+ return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
+ }
+
+ /**
+ * Checks the input object inspector to see if it is in for form of a flattened struct
+ * like the ones generated by a vectorized reduce sink input:
+ * { 'key.reducesinkkey0':int, 'value._col0':int, 'value._col1':int, .. }
+ * If so, then it creates an "unflattened" struct that contains nested key/value
+ * structs:
+ * { key: { reducesinkkey0:int }, value: { _col0:int, _col1:int, .. } }
+ *
+ * @param oi
+ * @return unflattened object inspector if unflattening is needed,
+ * otherwise the original object inspector
+ */
+ private static ObjectInspector unflattenObjInspector(ObjectInspector oi) {
+ if (oi instanceof StructObjectInspector) {
+ // Check if all fields start with "key." or "value."
+ // If so, then unflatten by adding an additional level of nested key and value structs
+ // Example: { "key.reducesinkkey0":int, "key.reducesinkkey1": int, "value._col6":int }
+ // Becomes
+ // { "key": { "reducesinkkey0":int, "reducesinkkey1":int }, "value": { "_col6":int } }
+ ArrayList<StructField> keyFields = new ArrayList<StructField>();
+ ArrayList<StructField> valueFields = new ArrayList<StructField>();
+ for (StructField field : ((StructObjectInspector) oi).getAllStructFieldRefs()) {
+ String fieldNameLower = field.getFieldName().toLowerCase();
+ if (fieldNameLower.startsWith(KEY_FIELD_PREFIX)) {
+ keyFields.add(field);
+ } else if (fieldNameLower.startsWith(VALUE_FIELD_PREFIX)) {
+ valueFields.add(field);
+ } else {
+ // Not a flattened struct, no need to unflatten
+ return oi;
+ }
+ }
+
+ // All field names are of the form "key." or "value."
+ // Create key/value structs and add the respective fields to each one
+ ArrayList<ObjectInspector> reduceFieldOIs = new ArrayList<ObjectInspector>();
+ reduceFieldOIs.add(createStructFromFields(keyFields, Utilities.ReduceField.KEY.toString()));
+ reduceFieldOIs.add(createStructFromFields(valueFields, Utilities.ReduceField.VALUE.toString()));
+
+ // Finally create the outer struct to contain the key, value structs
+ return ObjectInspectorFactory.getStandardStructObjectInspector(
+ Utilities.reduceFieldNameList,
+ reduceFieldOIs);
+ }
+
+ return oi;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index a40f0a9..1b9d7ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -154,7 +154,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
final ExecMapperContext mapContext = getExecContext();
final MapredContext mrContext = MapredContext.get();
- if (!conf.isBucketMapJoin()) {
+ if (!conf.isBucketMapJoin() && !conf.isDynamicPartitionHashJoin()) {
/*
* The issue with caching in case of bucket map join is that different tasks
* process different buckets and if the container is reused to join a different bucket,
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index d7f1b42..0f02737 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -1354,4 +1354,10 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
return childOperators;
}
}
+
+ public void removeParents() {
+ for (Operator<?> parent : new ArrayList<Operator<?>>(getParentOperators())) {
+ removeParent(parent);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesAdapter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesAdapter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesAdapter.java
new file mode 100644
index 0000000..8f706fe
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesAdapter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+
+/**
+ * Key-values interface for the Reader used by ReduceRecordSource
+ */
+public interface KeyValuesAdapter {
+ /**
+ * Get the key for current record
+ * @return
+ * @throws IOException
+ */
+ Object getCurrentKey() throws IOException;
+
+ /**
+ * Get the values for the current record
+ * @return
+ * @throws IOException
+ */
+ Iterable<Object> getCurrentValues() throws IOException;
+
+ /**
+ * Move to the next record
+ * @return true if successful, false if there are no more records to process
+ * @throws IOException
+ */
+ boolean next() throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValue.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValue.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValue.java
new file mode 100644
index 0000000..51cdeca
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValue.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * Provides a key/values (note the plural values) interface out of a KeyValueReader,
+ * needed by ReduceRecordSource when reading input from a key/value source.
+ */
+public class KeyValuesFromKeyValue implements KeyValuesAdapter {
+ protected KeyValueReader reader;
+ protected ValueIterator<Object> valueIterator =
+ new ValueIterator<Object>();
+
+ private static class ValueIterator<T> implements Iterator<T>, Iterable<T> {
+
+ protected boolean hasNextValue = false;
+ protected T value = null;
+
+ @Override
+ public boolean hasNext() {
+ return hasNextValue;
+ }
+
+ @Override
+ public T next() {
+ if (!hasNextValue) {
+ throw new NoSuchElementException();
+ }
+ hasNextValue = false;
+ return value;
+ }
+
+ void reset(T value) {
+ this.value = value;
+ hasNextValue = true;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return this;
+ }
+ }
+
+ public KeyValuesFromKeyValue(KeyValueReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ return reader.getCurrentKey();
+ }
+
+ @Override
+ public Iterable<Object> getCurrentValues() throws IOException {
+ Object obj = reader.getCurrentValue();
+ valueIterator.reset(obj);
+ return valueIterator;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ return reader.next();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValues.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValues.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValues.java
new file mode 100644
index 0000000..b027bce
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KeyValuesFromKeyValues.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+
+/**
+ * Provides a key/values interface out of a KeyValuesReader for use by ReduceRecordSource.
+ */
+public class KeyValuesFromKeyValues implements KeyValuesAdapter {
+ protected KeyValuesReader reader;
+
+ public KeyValuesFromKeyValues(KeyValuesReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ return reader.getCurrentKey();
+ }
+
+ @Override
+ public Iterable<Object> getCurrentValues() throws IOException {
+ return reader.getCurrentValues();
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ return reader.next();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 545d7c6..d649672 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -50,6 +50,7 @@ import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.library.api.KeyValuesReader;
/**
@@ -129,10 +130,11 @@ public class ReduceRecordProcessor extends RecordProcessor{
tagToReducerMap.put(mergeReduceWork.getTag(), mergeReduceWork);
}
- bigTablePosition = (byte) reduceWork.getTag();
((TezContext) MapredContext.get()).setDummyOpsMap(connectOps);
}
+ bigTablePosition = (byte) reduceWork.getTag();
+
ObjectInspector[] mainWorkOIs = null;
((TezContext) MapredContext.get()).setInputs(inputs);
((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
@@ -227,10 +229,13 @@ public class ReduceRecordProcessor extends RecordProcessor{
reducer.setParentOperators(null); // clear out any parents as reducer is the root
TableDesc keyTableDesc = redWork.getKeyDesc();
- KeyValuesReader reader = (KeyValuesReader) inputs.get(inputName).getReader();
+ Reader reader = inputs.get(inputName).getReader();
sources[tag] = new ReduceRecordSource();
- sources[tag].init(jconf, redWork.getReducer(), redWork.getVectorMode(), keyTableDesc,
+ // Only the big table input source should be vectorized (if applicable)
+ // Note this behavior may have to change if we ever implement a vectorized merge join
+ boolean vectorizedRecordSource = (tag == bigTablePosition) && redWork.getVectorMode();
+ sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, keyTableDesc,
valueTableDesc, reader, tag == bigTablePosition, (byte) tag,
redWork.getVectorScratchColumnTypeMap());
ois[tag] = sources[tag].getObjectInspector();
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index 20f6dba..89f7572 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValuesReader;
/**
@@ -107,7 +109,7 @@ public class ReduceRecordSource implements RecordSource {
/* this is only used in the error code path */
private List<VectorExpressionWriter> valueStringWriters;
- private KeyValuesReader reader;
+ private KeyValuesAdapter reader;
private boolean handleGroupKey;
@@ -120,7 +122,7 @@ public class ReduceRecordSource implements RecordSource {
private final GroupIterator groupIterator = new GroupIterator();
void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc,
- TableDesc valueTableDesc, KeyValuesReader reader, boolean handleGroupKey, byte tag,
+ TableDesc valueTableDesc, Reader reader, boolean handleGroupKey, byte tag,
Map<Integer, String> vectorScratchColumnTypeMap)
throws Exception {
@@ -129,7 +131,11 @@ public class ReduceRecordSource implements RecordSource {
this.reducer = reducer;
this.vectorized = vectorized;
this.keyTableDesc = keyTableDesc;
- this.reader = reader;
+ if (reader instanceof KeyValueReader) {
+ this.reader = new KeyValuesFromKeyValue((KeyValueReader) reader);
+ } else {
+ this.reader = new KeyValuesFromKeyValues((KeyValuesReader) reader);
+ }
this.handleGroupKey = handleGroupKey;
this.tag = tag;
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
index e9bd44a..9bd811c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
@@ -100,7 +100,6 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator {
@Override
public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
-
// Use a final variable to properly parameterize the processVectorInspector closure.
// Using a member variable in the closure will not do the right thing...
final int parameterizePosBigTable = conf.getPosBigTable();
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index 4c8c4b1..87ebcf2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -551,6 +551,7 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
@Override
protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+
Collection<Future<?>> result = super.initializeOp(hconf);
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 5a87bd6..e3acdfc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -93,9 +93,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
if (retval == null) {
return retval;
} else {
- int pos = 0; // it doesn't matter which position we use in this case.
- convertJoinSMBJoin(joinOp, context, pos, 0, false);
- return null;
+ fallbackToReduceSideJoin(joinOp, context);
}
}
@@ -103,27 +101,8 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// exact number of buckets. Else choose the largest number of estimated
// reducers from the parent operators.
int numBuckets = -1;
- int estimatedBuckets = -1;
if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
- for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) {
- if (parentOp.getOpTraits().getNumBuckets() > 0) {
- numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ?
- parentOp.getOpTraits().getNumBuckets() : numBuckets;
- }
-
- if (parentOp instanceof ReduceSinkOperator) {
- ReduceSinkOperator rs = (ReduceSinkOperator) parentOp;
- estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ?
- rs.getConf().getNumReducers() : estimatedBuckets;
- }
- }
-
- if (numBuckets <= 0) {
- numBuckets = estimatedBuckets;
- if (numBuckets <= 0) {
- numBuckets = 1;
- }
- }
+ numBuckets = estimateNumBuckets(joinOp, true);
} else {
numBuckets = 1;
}
@@ -136,7 +115,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
} else {
// only case is full outer join with SMB enabled which is not possible. Convert to regular
// join.
- convertJoinSMBJoin(joinOp, context, 0, 0, false);
+ fallbackToReduceSideJoin(joinOp, context);
return null;
}
}
@@ -155,20 +134,18 @@ public class ConvertJoinMapJoin implements NodeProcessor {
if (mapJoinConversionPos < 0) {
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- int pos = 0; // it doesn't matter which position we use in this case.
- convertJoinSMBJoin(joinOp, context, pos, 0, false);
+ fallbackToReduceSideJoin(joinOp, context);
return null;
}
- MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos);
+ MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos, true);
// map join operator by default has no bucket cols and num of reduce sinks
// reduced by 1
- mapJoinOp
-.setOpTraits(new OpTraits(null, -1, null));
+ mapJoinOp.setOpTraits(new OpTraits(null, -1, null));
mapJoinOp.setStatistics(joinOp.getStatistics());
// propagate this change till the next RS
for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) {
- setAllChildrenTraitsToNull(childOp);
+ setAllChildrenTraits(childOp, mapJoinOp.getOpTraits());
}
return null;
@@ -180,7 +157,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// we cannot convert to bucket map join, we cannot convert to
// map join either based on the size. Check if we can convert to SMB join.
if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
- convertJoinSMBJoin(joinOp, context, 0, 0, false);
+ fallbackToReduceSideJoin(joinOp, context);
return null;
}
Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
@@ -209,8 +186,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// contains aliases from sub-query
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- int pos = 0; // it doesn't matter which position we use in this case.
- convertJoinSMBJoin(joinOp, context, pos, 0, false);
+ fallbackToReduceSideJoin(joinOp, context);
return null;
}
@@ -220,8 +196,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
} else {
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- int pos = 0; // it doesn't matter which position we use in this case.
- convertJoinSMBJoin(joinOp, context, pos, 0, false);
+ fallbackToReduceSideJoin(joinOp, context);
}
return null;
}
@@ -317,16 +292,16 @@ public class ConvertJoinMapJoin implements NodeProcessor {
mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators());
}
- private void setAllChildrenTraitsToNull(Operator<? extends OperatorDesc> currentOp) {
+ private void setAllChildrenTraits(Operator<? extends OperatorDesc> currentOp, OpTraits opTraits) {
if (currentOp instanceof ReduceSinkOperator) {
return;
}
- currentOp.setOpTraits(new OpTraits(null, -1, null));
+ currentOp.setOpTraits(new OpTraits(opTraits.getBucketColNames(), opTraits.getNumBuckets(), opTraits.getSortCols()));
for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) {
if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) {
break;
}
- setAllChildrenTraitsToNull(childOp);
+ setAllChildrenTraits(childOp, opTraits);
}
}
@@ -338,7 +313,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
return false;
}
- MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition);
+ MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition, true);
MapJoinDesc joinDesc = mapJoinOp.getConf();
joinDesc.setBucketMapJoin(true);
@@ -633,7 +608,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
*/
public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
- int bigTablePosition) throws SemanticException {
+ int bigTablePosition, boolean removeReduceSink) throws SemanticException {
// bail on mux operator because currently the mux operator masks the emit keys
// of the constituent reduce sinks.
for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
@@ -646,45 +621,49 @@ public class ConvertJoinMapJoin implements NodeProcessor {
MapJoinOperator mapJoinOp =
MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, joinOp,
joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(),
- joinOp.getConf().getMapAliases(), bigTablePosition, true);
+ joinOp.getConf().getMapAliases(), bigTablePosition, true, removeReduceSink);
mapJoinOp.getConf().setHybridHashJoin(HiveConf.getBoolVar(context.conf,
- HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN));
+ HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN));
Operator<? extends OperatorDesc> parentBigTableOp =
mapJoinOp.getParentOperators().get(bigTablePosition);
if (parentBigTableOp instanceof ReduceSinkOperator) {
- for (Operator<?> p : parentBigTableOp.getParentOperators()) {
- // we might have generated a dynamic partition operator chain. Since
- // we're removing the reduce sink we need do remove that too.
- Set<Operator<?>> dynamicPartitionOperators = new HashSet<Operator<?>>();
- Map<Operator<?>, AppMasterEventOperator> opEventPairs = new HashMap<>();
- for (Operator<?> c : p.getChildOperators()) {
- AppMasterEventOperator event = findDynamicPartitionBroadcast(c);
- if (event != null) {
- dynamicPartitionOperators.add(c);
- opEventPairs.put(c, event);
+ if (removeReduceSink) {
+ for (Operator<?> p : parentBigTableOp.getParentOperators()) {
+ // we might have generated a dynamic partition operator chain. Since
+ // we're removing the reduce sink we need do remove that too.
+ Set<Operator<?>> dynamicPartitionOperators = new HashSet<Operator<?>>();
+ Map<Operator<?>, AppMasterEventOperator> opEventPairs = new HashMap<>();
+ for (Operator<?> c : p.getChildOperators()) {
+ AppMasterEventOperator event = findDynamicPartitionBroadcast(c);
+ if (event != null) {
+ dynamicPartitionOperators.add(c);
+ opEventPairs.put(c, event);
+ }
}
- }
- for (Operator<?> c : dynamicPartitionOperators) {
- if (context.pruningOpsRemovedByPriorOpt.isEmpty() ||
- !context.pruningOpsRemovedByPriorOpt.contains(opEventPairs.get(c))) {
- p.removeChild(c);
- // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
- LOG.info("Disabling dynamic pruning for: "
- + ((DynamicPruningEventDesc) opEventPairs.get(c).getConf()).getTableScan().getName()
- + ". Need to be removed together with reduce sink");
+ for (Operator<?> c : dynamicPartitionOperators) {
+ if (context.pruningOpsRemovedByPriorOpt.isEmpty() ||
+ !context.pruningOpsRemovedByPriorOpt.contains(opEventPairs.get(c))) {
+ p.removeChild(c);
+ // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
+ LOG.info("Disabling dynamic pruning for: "
+ + ((DynamicPruningEventDesc) opEventPairs.get(c).getConf()).getTableScan().getName()
+ + ". Need to be removed together with reduce sink");
+ }
+ }
+ for (Operator<?> op : dynamicPartitionOperators) {
+ context.pruningOpsRemovedByPriorOpt.add(opEventPairs.get(op));
}
}
- for (Operator<?> op : dynamicPartitionOperators) {
- context.pruningOpsRemovedByPriorOpt.add(opEventPairs.get(op));
+
+ mapJoinOp.getParentOperators().remove(bigTablePosition);
+ if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) {
+ mapJoinOp.getParentOperators().add(bigTablePosition,
+ parentBigTableOp.getParentOperators().get(0));
}
+ parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp);
}
- mapJoinOp.getParentOperators().remove(bigTablePosition);
- if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) {
- mapJoinOp.getParentOperators().add(bigTablePosition,
- parentBigTableOp.getParentOperators().get(0));
- }
- parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp);
+
for (Operator<? extends OperatorDesc>op : mapJoinOp.getParentOperators()) {
if (!(op.getChildOperators().contains(mapJoinOp))) {
op.getChildOperators().add(mapJoinOp);
@@ -720,4 +699,100 @@ public class ConvertJoinMapJoin implements NodeProcessor {
return null;
}
+
+ /**
+ * Estimate the number of buckets in the join, using the parent operators' OpTraits and/or
+ * parent operators' number of reducers
+ * @param joinOp
+ * @param useOpTraits Whether OpTraits should be used for the estimate.
+ * @return
+ */
+ private static int estimateNumBuckets(JoinOperator joinOp, boolean useOpTraits) {
+ int numBuckets = -1;
+ int estimatedBuckets = -1;
+
+ for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) {
+ if (parentOp.getOpTraits().getNumBuckets() > 0) {
+ numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ?
+ parentOp.getOpTraits().getNumBuckets() : numBuckets;
+ }
+
+ if (parentOp instanceof ReduceSinkOperator) {
+ ReduceSinkOperator rs = (ReduceSinkOperator) parentOp;
+ estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ?
+ rs.getConf().getNumReducers() : estimatedBuckets;
+ }
+ }
+
+ if (!useOpTraits) {
+ // Ignore the value we got from OpTraits.
+ // The logic below will fall back to the estimate from numReducers
+ numBuckets = -1;
+ }
+
+ if (numBuckets <= 0) {
+ numBuckets = estimatedBuckets;
+ if (numBuckets <= 0) {
+ numBuckets = 1;
+ }
+ }
+
+ return numBuckets;
+ }
+
+ private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context)
+ throws SemanticException {
+ // Attempt dynamic partitioned hash join
+ // Since we don't have big table index yet, must start with estimate of numReducers
+ int numReducers = estimateNumBuckets(joinOp, false);
+ LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers");
+ int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers);
+ if (bigTablePos >= 0) {
+ // Now that we have the big table index, get real numReducers value based on big table RS
+ ReduceSinkOperator bigTableParentRS =
+ (ReduceSinkOperator) (joinOp.getParentOperators().get(bigTablePos));
+ numReducers = bigTableParentRS.getConf().getNumReducers();
+ LOG.debug("Real big table reducers = " + numReducers);
+
+ MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePos, false);
+ if (mapJoinOp != null) {
+ LOG.info("Selected dynamic partitioned hash join");
+ mapJoinOp.getConf().setDynamicPartitionHashJoin(true);
+ // Set OpTraits for dynamically partitioned hash join:
+ // bucketColNames: Re-use previous joinOp's bucketColNames. Parent operators should be
+ // reduce sink, which should have bucket columns based on the join keys.
+ // numBuckets: set to number of reducers
+ // sortCols: This is an unsorted join - no sort cols
+ OpTraits opTraits = new OpTraits(
+ joinOp.getOpTraits().getBucketColNames(),
+ numReducers,
+ null);
+ mapJoinOp.setOpTraits(opTraits);
+ mapJoinOp.setStatistics(joinOp.getStatistics());
+ // propagate this change till the next RS
+ for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) {
+ setAllChildrenTraits(childOp, mapJoinOp.getOpTraits());
+ }
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context)
+ throws SemanticException {
+ if (context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) &&
+ context.conf.getBoolVar(HiveConf.ConfVars.HIVEDYNAMICPARTITIONHASHJOIN)) {
+ if (convertJoinDynamicPartitionedHashJoin(joinOp, context)) {
+ return;
+ }
+ }
+
+ // we are just converting to a common merge join operator. The shuffle
+ // join in map-reduce case.
+ int pos = 0; // it doesn't matter which position we use in this case.
+ LOG.info("Fallback to common merge join operator");
+ convertJoinSMBJoin(joinOp, context, pos, 0, false);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
index 4d84f0f..f8f2b7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
@@ -29,6 +29,8 @@ import java.util.Map;
import java.util.Set;
import java.util.Stack;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -57,6 +59,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.parse.GenMapRedWalker;
+import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -89,6 +92,7 @@ public class MapJoinProcessor implements Transform {
// (column type + column name). The column name is not really used anywhere, but it
// needs to be passed. Use the string defined below for that.
private static final String MAPJOINKEY_FIELDPREFIX = "mapjoinkey";
+ private static final Log LOG = LogFactory.getLog(MapJoinProcessor.class.getName());
public MapJoinProcessor() {
}
@@ -356,11 +360,18 @@ public class MapJoinProcessor implements Transform {
public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf,
JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases,
int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
+ return convertJoinOpMapJoinOp(hconf, op, leftInputJoin, baseSrc, mapAliases,
+ mapJoinPos, noCheckOuterJoin, true);
+ }
+
+ public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf,
+ JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases,
+ int mapJoinPos, boolean noCheckOuterJoin, boolean adjustParentsChildren)
+ throws SemanticException {
MapJoinDesc mapJoinDescriptor =
getMapJoinDesc(hconf, op, leftInputJoin, baseSrc, mapAliases,
- mapJoinPos, noCheckOuterJoin);
-
+ mapJoinPos, noCheckOuterJoin, adjustParentsChildren);
// reduce sink row resolver used to generate map join op
RowSchema outputRS = op.getSchema();
@@ -1025,7 +1036,7 @@ public class MapJoinProcessor implements Transform {
public static MapJoinDesc getMapJoinDesc(HiveConf hconf,
JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases,
- int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
+ int mapJoinPos, boolean noCheckOuterJoin, boolean adjustParentsChildren) throws SemanticException {
JoinDesc desc = op.getConf();
JoinCondDesc[] condns = desc.getConds();
Byte[] tagOrder = desc.getTagOrder();
@@ -1072,6 +1083,26 @@ public class MapJoinProcessor implements Transform {
// get the join keys from old parent ReduceSink operators
Map<Byte, List<ExprNodeDesc>> keyExprMap = pair.getSecond();
+ if (!adjustParentsChildren) {
+ // Since we did not remove reduce sink parents, keep the original value expressions
+ newValueExprs = valueExprs;
+
+ // Join key exprs are represented in terms of the original table columns,
+ // we need to convert these to the generated column names we can see in the Join operator
+ Map<Byte, List<ExprNodeDesc>> newKeyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
+ for (Map.Entry<Byte, List<ExprNodeDesc>> mapEntry : keyExprMap.entrySet()) {
+ Byte pos = mapEntry.getKey();
+ ReduceSinkOperator rsParent = oldReduceSinkParentOps.get(pos.byteValue());
+ List<ExprNodeDesc> keyExprList =
+ ExprNodeDescUtils.resolveJoinKeysAsRSColumns(mapEntry.getValue(), rsParent);
+ if (keyExprList == null) {
+ throw new SemanticException("Error resolving join keys");
+ }
+ newKeyExprMap.put(pos, keyExprList);
+ }
+ keyExprMap = newKeyExprMap;
+ }
+
// construct valueTableDescs and valueFilteredTableDescs
List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
@@ -1163,4 +1194,11 @@ public class MapJoinProcessor implements Transform {
return mapJoinDescriptor;
}
+
+ public static MapJoinDesc getMapJoinDesc(HiveConf hconf,
+ JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases,
+ int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
+ return getMapJoinDesc(hconf, op, leftInputJoin, baseSrc,
+ mapAliases, mapJoinPos, noCheckOuterJoin, true);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
index bca91dd..b546838 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
@@ -21,12 +21,15 @@ package org.apache.hadoop.hive.ql.optimizer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -58,11 +61,13 @@ import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import com.google.common.collect.Sets;
+
import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.FIXED;
public class ReduceSinkMapJoinProc implements NodeProcessor {
- protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
+ private final static Log LOG = LogFactory.getLog(ReduceSinkMapJoinProc.class.getName());
/* (non-Javadoc)
* This processor addresses the RS-MJ case that occurs in tez on the small/hash
@@ -79,7 +84,40 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
GenTezProcContext context = (GenTezProcContext) procContext;
MapJoinOperator mapJoinOp = (MapJoinOperator)nd;
- if (stack.size() < 2 || !(stack.get(stack.size() - 2) instanceof ReduceSinkOperator)) {
+ // remember the original parent list before we start modifying it.
+ if (!context.mapJoinParentMap.containsKey(mapJoinOp)) {
+ List<Operator<?>> parents = new ArrayList<Operator<?>>(mapJoinOp.getParentOperators());
+ context.mapJoinParentMap.put(mapJoinOp, parents);
+ }
+
+ boolean isBigTable = stack.size() < 2
+ || !(stack.get(stack.size() - 2) instanceof ReduceSinkOperator);
+
+ ReduceSinkOperator parentRS = null;
+ if (!isBigTable) {
+ parentRS = (ReduceSinkOperator)stack.get(stack.size() - 2);
+
+ // For dynamic partitioned hash join, the big table will also be coming from a ReduceSinkOperator
+ // Check for this condition.
+ // TODO: use indexOf(), or parentRS.getTag()?
+ isBigTable =
+ (mapJoinOp.getParentOperators().indexOf(parentRS) == mapJoinOp.getConf().getPosBigTable());
+ }
+
+ if (mapJoinOp.getConf().isDynamicPartitionHashJoin() &&
+ !context.mapJoinToUnprocessedSmallTableReduceSinks.containsKey(mapJoinOp)) {
+ // Initialize set of unprocessed small tables
+ Set<ReduceSinkOperator> rsSet = Sets.newIdentityHashSet();
+ for (int pos = 0; pos < mapJoinOp.getParentOperators().size(); ++pos) {
+ if (pos == mapJoinOp.getConf().getPosBigTable()) {
+ continue;
+ }
+ rsSet.add((ReduceSinkOperator) mapJoinOp.getParentOperators().get(pos));
+ }
+ context.mapJoinToUnprocessedSmallTableReduceSinks.put(mapJoinOp, rsSet);
+ }
+
+ if (isBigTable) {
context.currentMapJoinOperators.add(mapJoinOp);
return null;
}
@@ -87,14 +125,29 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
context.preceedingWork = null;
context.currentRootOperator = null;
- ReduceSinkOperator parentRS = (ReduceSinkOperator)stack.get(stack.size() - 2);
+ return processReduceSinkToHashJoin(parentRS, mapJoinOp, context);
+ }
+
+ public static BaseWork getMapJoinParentWork(GenTezProcContext context, Operator<?> parentRS) {
+ BaseWork parentWork;
+ if (context.unionWorkMap.containsKey(parentRS)) {
+ parentWork = context.unionWorkMap.get(parentRS);
+ } else {
+ assert context.childToWorkMap.get(parentRS).size() == 1;
+ parentWork = context.childToWorkMap.get(parentRS).get(0);
+ }
+ return parentWork;
+ }
+
+ public static Object processReduceSinkToHashJoin(ReduceSinkOperator parentRS, MapJoinOperator mapJoinOp,
+ GenTezProcContext context) throws SemanticException {
// remove the tag for in-memory side of mapjoin
parentRS.getConf().setSkipTag(true);
parentRS.setSkipTag(true);
- // remember the original parent list before we start modifying it.
- if (!context.mapJoinParentMap.containsKey(mapJoinOp)) {
- List<Operator<?>> parents = new ArrayList<Operator<?>>(mapJoinOp.getParentOperators());
- context.mapJoinParentMap.put(mapJoinOp, parents);
+
+ // Mark this small table as being processed
+ if (mapJoinOp.getConf().isDynamicPartitionHashJoin()) {
+ context.mapJoinToUnprocessedSmallTableReduceSinks.get(mapJoinOp).remove(parentRS);
}
List<BaseWork> mapJoinWork = null;
@@ -109,13 +162,7 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
*
*/
mapJoinWork = context.mapJoinWorkMap.get(mapJoinOp);
- BaseWork parentWork;
- if (context.unionWorkMap.containsKey(parentRS)) {
- parentWork = context.unionWorkMap.get(parentRS);
- } else {
- assert context.childToWorkMap.get(parentRS).size() == 1;
- parentWork = context.childToWorkMap.get(parentRS).get(0);
- }
+ BaseWork parentWork = getMapJoinParentWork(context, parentRS);
// set the link between mapjoin and parent vertex
int pos = context.mapJoinParentMap.get(mapJoinOp).indexOf(parentRS);
@@ -161,6 +208,11 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
keyCount /= bucketCount;
tableSize /= bucketCount;
}
+ } else if (joinConf.isDynamicPartitionHashJoin()) {
+ // For dynamic partitioned hash join, assuming table is split evenly among the reduce tasks.
+ bucketCount = parentRS.getConf().getNumReducers();
+ keyCount /= bucketCount;
+ tableSize /= bucketCount;
}
}
LOG.info("Mapjoin " + mapJoinOp + ", pos: " + pos + " --> " + parentWork.getName() + " ("
@@ -218,6 +270,8 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
edgeType = EdgeType.CUSTOM_SIMPLE_EDGE;
}
}
+ } else if (mapJoinOp.getConf().isDynamicPartitionHashJoin()) {
+ edgeType = EdgeType.CUSTOM_SIMPLE_EDGE;
}
TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets);
@@ -232,7 +286,7 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
}
ReduceSinkOperator r = null;
- if (parentRS.getConf().getOutputName() != null) {
+ if (context.connectedReduceSinks.contains(parentRS)) {
LOG.debug("Cloning reduce sink for multi-child broadcast edge");
// we've already set this one up. Need to clone for the next work.
r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
index f474eae..9334c73 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.parse;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -107,6 +108,10 @@ public class GenTezProcContext implements NodeProcessorCtx{
// map that says which mapjoin belongs to which work item
public final Map<MapJoinOperator, List<BaseWork>> mapJoinWorkMap;
+ // Mapping of reducesink to mapjoin operators
+ // Only used for dynamic partitioned hash joins (mapjoin operator in the reducer)
+ public final Map<Operator<?>, MapJoinOperator> smallTableParentToMapJoinMap;
+
// a map to keep track of which root generated which work
public final Map<Operator<?>, BaseWork> rootToWorkMap;
@@ -151,6 +156,11 @@ public class GenTezProcContext implements NodeProcessorCtx{
// remember the connections between ts and event
public final Map<TableScanOperator, List<AppMasterEventOperator>> tsToEventMap;
+ // When processing dynamic partitioned hash joins, some of the small tables may not get processed
+ // before the mapjoin's parents are removed during GenTezWork.process(). This is to keep
+ // track of which small tables haven't been processed yet.
+ public Map<MapJoinOperator, Set<ReduceSinkOperator>> mapJoinToUnprocessedSmallTableReduceSinks;
+
@SuppressWarnings("unchecked")
public GenTezProcContext(HiveConf conf, ParseContext parseContext,
List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -167,6 +177,7 @@ public class GenTezProcContext implements NodeProcessorCtx{
this.leafOperatorToFollowingWork = new LinkedHashMap<Operator<?>, BaseWork>();
this.linkOpWithWorkMap = new LinkedHashMap<Operator<?>, Map<BaseWork, TezEdgeProperty>>();
this.linkWorkWithReduceSinkMap = new LinkedHashMap<BaseWork, List<ReduceSinkOperator>>();
+ this.smallTableParentToMapJoinMap = new LinkedHashMap<Operator<?>, MapJoinOperator>();
this.mapJoinWorkMap = new LinkedHashMap<MapJoinOperator, List<BaseWork>>();
this.rootToWorkMap = new LinkedHashMap<Operator<?>, BaseWork>();
this.childToWorkMap = new LinkedHashMap<Operator<?>, List<BaseWork>>();
@@ -188,6 +199,7 @@ public class GenTezProcContext implements NodeProcessorCtx{
this.tsToEventMap = new LinkedHashMap<TableScanOperator, List<AppMasterEventOperator>>();
this.opMergeJoinWorkMap = new LinkedHashMap<Operator<?>, MergeJoinWork>();
this.currentMergeJoinOperator = null;
+ this.mapJoinToUnprocessedSmallTableReduceSinks = new HashMap<MapJoinOperator, Set<ReduceSinkOperator>>();
rootTasks.add(currentTask);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 93ad145..a9d1f8e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -130,12 +131,13 @@ public class GenTezUtils {
tezWork.add(reduceWork);
TezEdgeProperty edgeProp;
+ EdgeType edgeType = determineEdgeType(context.preceedingWork, reduceWork);
if (reduceWork.isAutoReduceParallelism()) {
edgeProp =
- new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true,
+ new TezEdgeProperty(context.conf, edgeType, true,
reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer);
} else {
- edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+ edgeProp = new TezEdgeProperty(edgeType);
}
tezWork.connect(
@@ -470,4 +472,21 @@ public class GenTezUtils {
curr.removeChild(child);
}
+
+ public static EdgeType determineEdgeType(BaseWork preceedingWork, BaseWork followingWork) {
+ if (followingWork instanceof ReduceWork) {
+ // Ideally there should be a better way to determine that the followingWork contains
+ // a dynamic partitioned hash join, but in some cases (createReduceWork()) it looks like
+ // the work must be created/connected first, before the GenTezProcContext can be updated
+ // with the mapjoin/work relationship.
+ ReduceWork reduceWork = (ReduceWork) followingWork;
+ if (reduceWork.getReducer() instanceof MapJoinOperator) {
+ MapJoinOperator joinOp = (MapJoinOperator) reduceWork.getReducer();
+ if (joinOp.getConf().isDynamicPartitionHashJoin()) {
+ return EdgeType.CUSTOM_SIMPLE_EDGE;
+ }
+ }
+ }
+ return EdgeType.SIMPLE_EDGE;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
index 6b3e19d..c4e0413 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -165,8 +166,11 @@ public class GenTezWork implements NodeProcessor {
mergeJoinWork.addMergedWork(work, null, context.leafOperatorToFollowingWork);
Operator<? extends OperatorDesc> parentOp =
getParentFromStack(context.currentMergeJoinOperator, stack);
+ // Set the big table position. Both the reduce work and merge join operator
+ // should be set with the same value.
int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp);
work.setTag(pos);
+ context.currentMergeJoinOperator.getConf().setBigTablePosition(pos);
tezWork.setVertexType(work, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES);
for (BaseWork parentWork : tezWork.getParents(work)) {
TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, work);
@@ -190,6 +194,50 @@ public class GenTezWork implements NodeProcessor {
// remember which mapjoin operator links with which work
if (!context.currentMapJoinOperators.isEmpty()) {
for (MapJoinOperator mj: context.currentMapJoinOperators) {
+ // For dynamic partitioned hash join, ReduceSinkMapJoinProc rule may not get run for all
+ // of the ReduceSink parents, because the parents of the MapJoin operator get
+ // removed later on in this method. Keep track of the parent to mapjoin mapping
+ // so we can later run the same logic that is run in ReduceSinkMapJoinProc.
+ if (mj.getConf().isDynamicPartitionHashJoin()) {
+ // Since this is a dynamic partitioned hash join, the work for this join should be a ReduceWork
+ ReduceWork reduceWork = (ReduceWork) work;
+ int bigTablePosition = mj.getConf().getPosBigTable();
+ reduceWork.setTag(bigTablePosition);
+
+ // Use context.mapJoinParentMap to get the original RS parents, because
+ // the MapJoin's parents may have been replaced by dummy operator.
+ List<Operator<?>> mapJoinOriginalParents = context.mapJoinParentMap.get(mj);
+ if (mapJoinOriginalParents == null) {
+ throw new SemanticException("Unexpected error - context.mapJoinParentMap did not have an entry for " + mj);
+ }
+ for (int pos = 0; pos < mapJoinOriginalParents.size(); ++pos) {
+ // This processing only needs to happen for the small tables
+ if (pos == bigTablePosition) {
+ continue;
+ }
+ Operator<?> parentOp = mapJoinOriginalParents.get(pos);
+ context.smallTableParentToMapJoinMap.put(parentOp, mj);
+
+ ReduceSinkOperator parentRS = (ReduceSinkOperator) parentOp;
+
+ // TableDesc needed for dynamic partitioned hash join
+ GenMapRedUtils.setKeyAndValueDesc(reduceWork, parentRS);
+
+ // For small table RS parents that have already been processed, we need to
+ // add the tag to the RS work to the reduce work that contains this map join.
+ // This was not being done for normal mapjoins, where the small table typically
+ // has its ReduceSink parent removed.
+ if (!context.mapJoinToUnprocessedSmallTableReduceSinks.get(mj).contains(parentRS)) {
+ // This reduce sink has been processed already, so the work for the parentRS exists
+ BaseWork parentWork = ReduceSinkMapJoinProc.getMapJoinParentWork(context, parentRS);
+ int tag = parentRS.getConf().getTag();
+ tag = (tag == -1 ? 0 : tag);
+ reduceWork.getTagToInput().put(tag, parentWork.getName());
+ }
+
+ }
+ }
+
LOG.debug("Processing map join: " + mj);
// remember the mapping in case we scan another branch of the
// mapjoin later
@@ -369,15 +417,44 @@ public class GenTezWork implements NodeProcessor {
// remember the output name of the reduce sink
rs.getConf().setOutputName(rWork.getName());
+ // For dynamic partitioned hash join, run the ReduceSinkMapJoinProc logic for any
+ // ReduceSink parents that we missed.
+ MapJoinOperator mj = context.smallTableParentToMapJoinMap.get(rs);
+ if (mj != null) {
+ // Only need to run the logic for tables we missed
+ if (context.mapJoinToUnprocessedSmallTableReduceSinks.get(mj).contains(rs)) {
+ // ReduceSinkMapJoinProc logic does not work unless the ReduceSink is connected as
+ // a parent of the MapJoin, but at this point we have already removed all of the
+ // parents from the MapJoin.
+ // Try temporarily adding the RS as a parent
+ ArrayList<Operator<?>> tempMJParents = new ArrayList<Operator<?>>();
+ tempMJParents.add(rs);
+ mj.setParentOperators(tempMJParents);
+ // ReduceSink also needs MapJoin as child
+ List<Operator<?>> rsChildren = rs.getChildOperators();
+ rsChildren.add(mj);
+
+ // Since the MapJoin has had all of its other parents removed at this point,
+ // it would be bad here if processReduceSinkToHashJoin() tries to do anything
+ // with the RS parent based on its position in the list of parents.
+ ReduceSinkMapJoinProc.processReduceSinkToHashJoin(rs, mj, context);
+
+ // Remove any parents from MapJoin again
+ mj.removeParents();
+ // TODO: do we also need to remove the MapJoin from the list of RS's children?
+ }
+ }
+
if (!context.connectedReduceSinks.contains(rs)) {
// add dependency between the two work items
TezEdgeProperty edgeProp;
+ EdgeType edgeType = utils.determineEdgeType(work, followingWork);
if (rWork.isAutoReduceParallelism()) {
edgeProp =
- new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true,
+ new TezEdgeProperty(context.conf, edgeType, true,
rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer);
} else {
- edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+ edgeProp = new TezEdgeProperty(edgeType);
}
tezWork.connect(work, followingWork, edgeProp);
context.connectedReduceSinks.add(rs);
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
index fa697ef..d574c5c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
@@ -47,7 +47,7 @@ public abstract class BaseWork extends AbstractOperatorDesc {
// Their function is mainly as root ops to give the mapjoin the correct
// schema info.
List<HashTableDummyOperator> dummyOps;
- int tag;
+ int tag = 0;
private final List<String> sortColNames = new ArrayList<String>();
private MapredLocalWork mrLocalWork;
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
index f9c34cb..cce9bc4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java
@@ -45,4 +45,8 @@ public class CommonMergeJoinDesc extends MapJoinDesc implements Serializable {
public int getBigTablePosition() {
return mapJoinConversionPos;
}
+
+ public void setBigTablePosition(int pos) {
+ mapJoinConversionPos = pos;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
index fb3c4a3..e291a48 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.optimizer.ConstantPropagateProcFactory;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
@@ -278,6 +279,59 @@ public class ExprNodeDescUtils {
throw new SemanticException("Met multiple parent operators");
}
+ public static List<ExprNodeDesc> resolveJoinKeysAsRSColumns(List<ExprNodeDesc> sourceList,
+ Operator<?> reduceSinkOp) {
+ ArrayList<ExprNodeDesc> result = new ArrayList<ExprNodeDesc>(sourceList.size());
+ for (ExprNodeDesc source : sourceList) {
+ ExprNodeDesc newExpr = resolveJoinKeysAsRSColumns(source, reduceSinkOp);
+ if (newExpr == null) {
+ return null;
+ }
+ result.add(newExpr);
+ }
+ return result;
+ }
+
+ /**
+ * Join keys are expressions based on the select operator. Resolve the expressions so they
+ * are based on the ReduceSink operator
+ * SEL -> RS -> JOIN
+ * @param source
+ * @param reduceSinkOp
+ * @return
+ */
+ public static ExprNodeDesc resolveJoinKeysAsRSColumns(ExprNodeDesc source, Operator<?> reduceSinkOp) {
+ // Assuming this is only being done for join keys. As a result we shouldn't have to recursively
+ // check any nested child expressions, because the result of the expression should exist as an
+ // output column of the ReduceSink operator
+ if (source == null) {
+ return null;
+ }
+
+ // columnExprMap has the reverse of what we need - a mapping of the internal column names
+ // to the ExprNodeDesc from the previous operation.
+ // Find the key/value where the ExprNodeDesc value matches the column we are searching for.
+ // The key portion of the entry will be the internal column name for the join key expression.
+ for (Map.Entry<String, ExprNodeDesc> mapEntry : reduceSinkOp.getColumnExprMap().entrySet()) {
+ if (mapEntry.getValue().isSame(source)) {
+ String columnInternalName = mapEntry.getKey();
+ if (source instanceof ExprNodeColumnDesc) {
+ // The join key is a table column. Create the ExprNodeDesc based on this column.
+ ColumnInfo columnInfo = reduceSinkOp.getSchema().getColumnInfo(columnInternalName);
+ return new ExprNodeColumnDesc(columnInfo);
+ } else {
+ // Join key expression is likely some expression involving functions/operators, so there
+ // is no actual table column for this. But the ReduceSink operator should still have an
+ // output column corresponding to this expression, using the columnInternalName.
+ // TODO: does tableAlias matter for this kind of expression?
+ return new ExprNodeColumnDesc(source.getTypeInfo(), columnInternalName, "", false);
+ }
+ }
+ }
+
+ return null; // Couldn't find reference to expression
+ }
+
public static ExprNodeDesc[] extractComparePair(ExprNodeDesc expr1, ExprNodeDesc expr2) {
expr1 = extractConstant(expr1);
expr2 = extractConstant(expr2);
@@ -483,4 +537,65 @@ public class ExprNodeDescUtils {
return exprColLst;
}
+
+ public static List<ExprNodeDesc> flattenExprList(List<ExprNodeDesc> sourceList) {
+ ArrayList<ExprNodeDesc> result = new ArrayList<ExprNodeDesc>(sourceList.size());
+ for (ExprNodeDesc source : sourceList) {
+ result.add(flattenExpr(source));
+ }
+ return result;
+ }
+
+ /**
+ * A normal reduce operator's rowObjectInspector looks like a struct containing
+ * nested key/value structs that contain the column values:
+ * { key: { reducesinkkey0:int }, value: { _col0:int, _col1:int, .. } }
+ *
+ * While the rowObjectInspector looks the same for vectorized queries during
+ * compilation time, within the tasks at query execution the rowObjectInspector
+ * has changed to a flatter structure without nested key/value structs:
+ * { 'key.reducesinkkey0':int, 'value._col0':int, 'value._col1':int, .. }
+ *
+ * Trying to fetch 'key.reducesinkkey0' by name from the list of flattened
+ * ObjectInspectors does not work because the '.' gets interpreted as a field member,
+ * even though it is a flattened list of column values.
+ * This workaround converts the column name referenced in the ExprNodeDesc
+ * from a nested field name (key.reducesinkkey0) to key_reducesinkkey0,
+ * simply by replacing '.' with '_'.
+ * @param source
+ * @return
+ */
+ public static ExprNodeDesc flattenExpr(ExprNodeDesc source) {
+ if (source instanceof ExprNodeGenericFuncDesc) {
+ // all children expression should be resolved
+ ExprNodeGenericFuncDesc function = (ExprNodeGenericFuncDesc) source.clone();
+ List<ExprNodeDesc> newChildren = flattenExprList(function.getChildren());
+ for (ExprNodeDesc newChild : newChildren) {
+ if (newChild == null) {
+ // Could not resolve all of the function children, fail
+ return null;
+ }
+ }
+ function.setChildren(newChildren);
+ return function;
+ }
+ if (source instanceof ExprNodeColumnDesc) {
+ ExprNodeColumnDesc column = (ExprNodeColumnDesc) source;
+ // Create a new ColumnInfo, replacing STRUCT.COLUMN with STRUCT_COLUMN
+ String newColumn = column.getColumn().replace('.', '_');
+ return new ExprNodeColumnDesc(source.getTypeInfo(), newColumn, column.getTabAlias(), false);
+ }
+ if (source instanceof ExprNodeFieldDesc) {
+ // field expression should be resolved
+ ExprNodeFieldDesc field = (ExprNodeFieldDesc) source.clone();
+ ExprNodeDesc fieldDesc = flattenExpr(field.getDesc());
+ if (fieldDesc == null) {
+ return null;
+ }
+ field.setDesc(fieldDesc);
+ return field;
+ }
+ // constant or null expr, just return
+ return source;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
index cee9100..e27b89b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
@@ -29,6 +29,8 @@ import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+
/**
* Map Join operator Descriptor implementation.
*
@@ -71,6 +73,7 @@ public class MapJoinDesc extends JoinDesc implements Serializable {
protected boolean genJoinKeys = true;
private boolean isHybridHashJoin;
+ private boolean isDynamicPartitionHashJoin = false;
// Extra parameters only for vectorization.
private VectorMapJoinDesc vectorDesc;
@@ -369,4 +372,12 @@ public class MapJoinDesc extends JoinDesc implements Serializable {
public boolean getGenJoinKeys() {
return genJoinKeys;
}
+
+ public boolean isDynamicPartitionHashJoin() {
+ return isDynamicPartitionHashJoin;
+ }
+
+ public void setDynamicPartitionHashJoin(boolean isDistributedHashJoin) {
+ this.isDynamicPartitionHashJoin = isDistributedHashJoin;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
index a78a92e..020d6de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
@@ -136,7 +136,7 @@ public class ReduceWork extends BaseWork {
return null;
}
if (valueObjectInspector == null) {
- valueObjectInspector = getObjectInspector(tagToValueDesc.get(0));
+ valueObjectInspector = getObjectInspector(tagToValueDesc.get(tag));
}
return valueObjectInspector;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_1.q b/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_1.q
new file mode 100644
index 0000000..e3325c4
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_1.q
@@ -0,0 +1,101 @@
+
+set hive.explain.user=false;
+set hive.auto.convert.join=false;
+set hive.optimize.dynamic.partition.hashjoin=false;
+
+-- First try with regular mergejoin
+explain
+select
+ *
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null
+order by a.cint;
+
+select
+ *
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null
+order by a.cint;
+
+explain
+select
+ count(*)
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null;
+
+select
+ count(*)
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null;
+
+explain
+select
+ a.csmallint, count(*) c1
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null
+group by a.csmallint
+order by c1;
+
+select
+ a.csmallint, count(*) c1
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null
+group by a.csmallint
+order by c1;
+
+set hive.auto.convert.join=true;
+set hive.optimize.dynamic.partition.hashjoin=true;
+set hive.auto.convert.join.noconditionaltask.size=200000;
+set hive.exec.reducers.bytes.per.reducer=200000;
+
+-- Try with dynamically partitioned hashjoin
+explain
+select
+ *
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null
+order by a.cint;
+
+select
+ *
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null
+order by a.cint;
+
+explain
+select
+ count(*)
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null;
+
+select
+ count(*)
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null;
+
+explain
+select
+ a.csmallint, count(*) c1
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null
+group by a.csmallint
+order by c1;
+
+select
+ a.csmallint, count(*) c1
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null
+group by a.csmallint
+order by c1;
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_2.q b/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_2.q
new file mode 100644
index 0000000..af4e2b8
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_2.q
@@ -0,0 +1,83 @@
+
+set hive.explain.user=false;
+set hive.auto.convert.join=false;
+set hive.optimize.dynamic.partition.hashjoin=false;
+
+-- Multiple tables, and change the order of the big table (alltypesorc)
+-- First try with regular mergejoin
+explain
+select
+ a.*
+from
+ alltypesorc a,
+ src b,
+ src c
+where
+ a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+ and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+select
+ a.*
+from
+ alltypesorc a,
+ src b,
+ src c
+where
+ a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+ and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+set hive.auto.convert.join=true;
+set hive.optimize.dynamic.partition.hashjoin=true;
+set hive.auto.convert.join.noconditionaltask.size=2000;
+set hive.exec.reducers.bytes.per.reducer=200000;
+
+-- noconditionaltask.size needs to be low enough that entire filtered table results do not fit in one task's hash table
+-- Try with dynamically partitioned hash join
+explain
+select
+ a.*
+from
+ alltypesorc a,
+ src b,
+ src c
+where
+ a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+ and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+select
+ a.*
+from
+ alltypesorc a,
+ src b,
+ src c
+where
+ a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+ and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+-- Try different order of tables
+explain
+select
+ a.*
+from
+ src b,
+ alltypesorc a,
+ src c
+where
+ a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+ and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+select
+ a.*
+from
+ src b,
+ alltypesorc a,
+ src c
+where
+ a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+ and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q b/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q
new file mode 100644
index 0000000..65fee16
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q
@@ -0,0 +1,102 @@
+
+set hive.explain.user=false;
+set hive.auto.convert.join=false;
+set hive.optimize.dynamic.partition.hashjoin=false;
+
+-- First try with regular mergejoin
+explain
+select
+ *
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null
+order by a.cint;
+
+select
+ *
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null
+order by a.cint;
+
+explain
+select
+ count(*)
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null;
+
+select
+ count(*)
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null;
+
+explain
+select
+ a.csmallint, count(*) c1
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null
+group by a.csmallint
+order by c1;
+
+select
+ a.csmallint, count(*) c1
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null
+group by a.csmallint
+order by c1;
+
+set hive.auto.convert.join=true;
+set hive.optimize.dynamic.partition.hashjoin=true;
+set hive.auto.convert.join.noconditionaltask.size=200000;
+set hive.exec.reducers.bytes.per.reducer=200000;
+set hive.vectorized.execution.enabled=true;
+
+-- Try with dynamically partitioned hashjoin
+explain
+select
+ *
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null
+order by a.cint;
+
+select
+ *
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null
+order by a.cint;
+
+explain
+select
+ count(*)
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null;
+
+select
+ count(*)
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null;
+
+explain
+select
+ a.csmallint, count(*) c1
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null
+group by a.csmallint
+order by c1;
+
+select
+ a.csmallint, count(*) c1
+from alltypesorc a join alltypesorc b on a.cint = b.cint
+where
+ a.cint between 1000000 and 3000000 and b.cbigint is not null
+group by a.csmallint
+order by c1;
http://git-wip-us.apache.org/repos/asf/hive/blob/04d54f61/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_2.q b/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_2.q
new file mode 100644
index 0000000..606f455
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_2.q
@@ -0,0 +1,84 @@
+
+set hive.explain.user=false;
+set hive.auto.convert.join=false;
+set hive.optimize.dynamic.partition.hashjoin=false;
+
+-- Multiple tables, and change the order of the big table (alltypesorc)
+-- First try with regular mergejoin
+explain
+select
+ a.*
+from
+ alltypesorc a,
+ src b,
+ src c
+where
+ a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+ and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+select
+ a.*
+from
+ alltypesorc a,
+ src b,
+ src c
+where
+ a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+ and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+set hive.auto.convert.join=true;
+set hive.optimize.dynamic.partition.hashjoin=true;
+set hive.auto.convert.join.noconditionaltask.size=2000;
+set hive.exec.reducers.bytes.per.reducer=200000;
+set hive.vectorized.execution.enabled=true;
+
+-- noconditionaltask.size needs to be low enough that entire filtered table results do not fit in one task's hash table
+-- Try with dynamically partitioned hash join
+explain
+select
+ a.*
+from
+ alltypesorc a,
+ src b,
+ src c
+where
+ a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+ and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+select
+ a.*
+from
+ alltypesorc a,
+ src b,
+ src c
+where
+ a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+ and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+-- Try different order of tables
+explain
+select
+ a.*
+from
+ src b,
+ alltypesorc a,
+ src c
+where
+ a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+ and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;
+
+select
+ a.*
+from
+ src b,
+ alltypesorc a,
+ src c
+where
+ a.csmallint = cast(b.key as int) and a.csmallint = (cast(c.key as int) + 0)
+ and (a.csmallint < 100)
+order by a.csmallint, a.ctinyint, a.cint;