You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2015/11/01 05:32:52 UTC
[07/10] hive git commit: HIVE-12290 Native Vector ReduceSink (Matt
McCline, reviewed by Gopal V)
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSingleImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSingleImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSingleImpl.java
new file mode 100644
index 0000000..bf0a25b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSingleImpl.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.keyseries;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import com.google.common.base.Preconditions;
+
+/**
+ * Implementation of when a one key series or a serialized key series is being presented.
+ *
+ */
+public abstract class VectorKeySeriesSingleImpl extends VectorKeySeriesImpl
+ implements VectorKeySeries {
+
+ private static final Log LOG = LogFactory.getLog(VectorKeySeriesSingleImpl.class.getName());
+
+ protected int currentBatchSize;
+
+ // The number of keys (with sequential duplicates collapsed, both NULL and non-NULL) in the batch.
+ protected int seriesCount;
+
+ // The current position in the key series.
+ protected int seriesPosition;
+
+ // The number of duplicates for each series key (NULL or non-NULL).
+ protected final int[] duplicateCounts;
+
+ // Whether a series key is NULL.
+ protected final boolean[] seriesIsAllNull;
+
+ // The number of non-NULL keys. They have associated hash codes and key data.
+ protected int nonNullKeyCount;
+
+ // The current non-NULL key position.
+ protected int nonNullKeyPosition;
+
+ // The hash code for each non-NULL key.
+ protected final int[] hashCodes;
+
+ VectorKeySeriesSingleImpl() {
+ super();
+
+ seriesCount = 0;
+ seriesPosition = 0;
+
+ duplicateCounts = new int[VectorizedRowBatch.DEFAULT_SIZE];
+ seriesIsAllNull = new boolean[VectorizedRowBatch.DEFAULT_SIZE];
+
+ nonNullKeyCount = 0;
+ nonNullKeyPosition = -1;
+
+ hashCodes = new int[VectorizedRowBatch.DEFAULT_SIZE];
+ }
+
+ public boolean validate() {
+ Preconditions.checkState(seriesCount > 0);
+ Preconditions.checkState(seriesCount <= currentBatchSize);
+ Preconditions.checkState(nonNullKeyCount >= 0);
+ Preconditions.checkState(nonNullKeyCount <= seriesCount);
+
+ validateDuplicateCount();
+ return true;
+ }
+
+ private void validateDuplicateCount() {
+ int sum = 0;
+ int duplicateCount;
+ for (int i = 0; i < seriesCount; i++) {
+ duplicateCount = duplicateCounts[i];
+ Preconditions.checkState(duplicateCount > 0);
+ Preconditions.checkState(duplicateCount <= currentBatchSize);
+ sum += duplicateCount;
+ }
+ Preconditions.checkState(sum == currentBatchSize);
+ }
+
+ @Override
+ public void positionToFirst() {
+ seriesPosition = 0;
+
+ currentLogical = 0;
+ currentDuplicateCount = duplicateCounts[0];
+ currentIsAllNull = seriesIsAllNull[0];
+
+ if (!currentIsAllNull) {
+ nonNullKeyPosition = 0;
+ currentHashCode = hashCodes[0];
+ setNextNonNullKey(0);
+ } else {
+ nonNullKeyPosition = -1;
+ }
+ Preconditions.checkState(currentDuplicateCount > 0);
+ }
+
+ // Consumes whole key.
+ @Override
+ public boolean next() {
+
+ currentLogical += currentDuplicateCount;
+ if (currentLogical >= currentBatchSize) {
+ return false;
+ }
+
+ Preconditions.checkState(seriesPosition + 1 < seriesCount);
+
+ seriesPosition++;
+ currentDuplicateCount = duplicateCounts[seriesPosition];
+ currentIsAllNull = seriesIsAllNull[seriesPosition];
+
+ if (!currentIsAllNull) {
+ Preconditions.checkState(nonNullKeyPosition + 1 < nonNullKeyCount);
+ nonNullKeyPosition++;
+ currentHashCode = hashCodes[nonNullKeyPosition];
+ setNextNonNullKey(nonNullKeyPosition);
+ }
+ Preconditions.checkState(currentDuplicateCount > 0);
+ return true;
+ }
+
+ // For use by VectorKeySeriesMulti so that the minimum equal key can be advanced.
+ public void advance(int duplicateCount) {
+
+ currentLogical += currentDuplicateCount;
+
+ currentDuplicateCount -= duplicateCount;
+ if (currentDuplicateCount == 0) {
+ seriesPosition++;
+ currentIsAllNull = seriesIsAllNull[seriesPosition];
+ currentDuplicateCount = duplicateCounts[seriesPosition];
+
+ if (!currentIsAllNull) {
+ nonNullKeyPosition++;
+ currentHashCode = hashCodes[nonNullKeyPosition];
+ setNextNonNullKey(nonNullKeyPosition);
+ }
+ }
+ }
+
+ protected abstract void setNextNonNullKey(int nonNullKeyPosition);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index afea926..435b438 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -168,7 +168,7 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
// This helper object deserializes LazyBinary format small table values into columns of a row
// in a vectorized row batch.
- protected transient VectorDeserializeRow smallTableVectorDeserializeRow;
+ protected transient VectorDeserializeRow<LazyBinaryDeserializeRead> smallTableVectorDeserializeRow;
// This a 2nd batch with the same "column schema" as the big table batch that can be used to
// build join output results in. If we can create some join output results in the big table
@@ -573,10 +573,11 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
* Create our vectorized copy row and deserialize row helper objects.
*/
if (smallTableMapping.getCount() > 0) {
- smallTableVectorDeserializeRow = new VectorDeserializeRow(
- new LazyBinaryDeserializeRead(
- VectorizedBatchUtil.primitiveTypeInfosFromTypeNames(
- smallTableMapping.getTypeNames())));
+ smallTableVectorDeserializeRow =
+ new VectorDeserializeRow<LazyBinaryDeserializeRead>(
+ new LazyBinaryDeserializeRead(
+ VectorizedBatchUtil.primitiveTypeInfosFromTypeNames(
+ smallTableMapping.getTypeNames())));
smallTableVectorDeserializeRow.init(smallTableMapping.getOutputColumns());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
index 260f4e1..4e2bd7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.ByteStream.Output;
/**
@@ -73,7 +74,9 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinGenerateResultOperator.class.getName());
private static final String CLASS_NAME = VectorMapJoinGenerateResultOperator.class.getName();
- private transient PrimitiveTypeInfo[] bigTablePrimitiveTypeInfos;
+ //------------------------------------------------------------------------------------------------
+
+ private transient TypeInfo[] bigTableTypeInfos;
private transient VectorSerializeRow bigTableVectorSerializeRow;
@@ -417,7 +420,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
List<Integer> projectedColumns = vContext.getProjectedColumns();
int projectionSize = vContext.getProjectedColumns().size();
- List<PrimitiveTypeInfo> typeInfoList = new ArrayList<PrimitiveTypeInfo>();
+ List<TypeInfo> typeInfoList = new ArrayList<TypeInfo>();
List<Integer> noNullsProjectionList = new ArrayList<Integer>();
for (int i = 0; i < projectionSize; i++) {
int projectedColumn = projectedColumns.get(i);
@@ -429,17 +432,19 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
int[] noNullsProjection = ArrayUtils.toPrimitive(noNullsProjectionList.toArray(new Integer[0]));
int noNullsProjectionSize = noNullsProjection.length;
- bigTablePrimitiveTypeInfos = typeInfoList.toArray(new PrimitiveTypeInfo[0]);
+ bigTableTypeInfos = typeInfoList.toArray(new TypeInfo[0]);
bigTableVectorSerializeRow =
- new VectorSerializeRow(new LazyBinarySerializeWrite(noNullsProjectionSize));
+ new VectorSerializeRow<LazyBinarySerializeWrite>(
+ new LazyBinarySerializeWrite(noNullsProjectionSize));
bigTableVectorSerializeRow.init(
- bigTablePrimitiveTypeInfos,
- noNullsProjectionList);
+ bigTableTypeInfos,
+ noNullsProjection);
- bigTableVectorDeserializeRow = new VectorDeserializeRow(
- new LazyBinaryDeserializeRead(bigTablePrimitiveTypeInfos));
+ bigTableVectorDeserializeRow =
+ new VectorDeserializeRow<LazyBinaryDeserializeRead>(
+ new LazyBinaryDeserializeRead(bigTableTypeInfos));
bigTableVectorDeserializeRow.init(noNullsProjection);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java
index a2559f8..02a3746 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMultiSet;
// Multi-Key specific imports.
-import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRowNoNulls;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow;
import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
@@ -65,7 +65,7 @@ public class VectorMapJoinInnerBigOnlyMultiKeyOperator extends VectorMapJoinInne
// Object that can take a set of columns in row in a vectorized row batch and serialized it.
// Known to not have any nulls.
- private transient VectorSerializeRowNoNulls keyVectorSerializeWriteNoNulls;
+ private transient VectorSerializeRow keyVectorSerializeWrite;
// The BinarySortable serialization of the current key.
private transient Output currentKeyOutput;
@@ -105,9 +105,9 @@ public class VectorMapJoinInnerBigOnlyMultiKeyOperator extends VectorMapJoinInne
* Initialize Multi-Key members for this specialized class.
*/
- keyVectorSerializeWriteNoNulls = new VectorSerializeRowNoNulls(
+ keyVectorSerializeWrite = new VectorSerializeRow(
new BinarySortableSerializeWrite(bigTableKeyColumnMap.length));
- keyVectorSerializeWriteNoNulls.init(bigTableKeyTypeNames, bigTableKeyColumnMap);
+ keyVectorSerializeWrite.init(bigTableKeyTypeNames, bigTableKeyColumnMap);
currentKeyOutput = new Output();
saveKeyOutput = new Output();
@@ -194,8 +194,12 @@ public class VectorMapJoinInnerBigOnlyMultiKeyOperator extends VectorMapJoinInne
* Multi-Key specific repeated lookup.
*/
- keyVectorSerializeWriteNoNulls.setOutput(currentKeyOutput);
- keyVectorSerializeWriteNoNulls.serializeWriteNoNulls(batch, 0);
+ keyVectorSerializeWrite.setOutput(currentKeyOutput);
+ keyVectorSerializeWrite.serializeWrite(batch, 0);
+ if (keyVectorSerializeWrite.getHasAnyNulls()) {
+ // Not expecting NULLs in MapJoin -- they should have been filtered out.
+ throw new HiveException("Null key not expected in MapJoin");
+ }
byte[] keyBytes = currentKeyOutput.getData();
int keyLength = currentKeyOutput.getLength();
JoinUtil.JoinResult joinResult = hashMultiSet.contains(keyBytes, 0, keyLength, hashMultiSetResults[0]);
@@ -248,8 +252,12 @@ public class VectorMapJoinInnerBigOnlyMultiKeyOperator extends VectorMapJoinInne
*/
// Generate binary sortable key for current row in vectorized row batch.
- keyVectorSerializeWriteNoNulls.setOutput(currentKeyOutput);
- keyVectorSerializeWriteNoNulls.serializeWriteNoNulls(batch, batchIndex);
+ keyVectorSerializeWrite.setOutput(currentKeyOutput);
+ keyVectorSerializeWrite.serializeWrite(batch, batchIndex);
+ if (keyVectorSerializeWrite.getHasAnyNulls()) {
+ // Not expecting NULLs in MapJoin -- they should have been filtered out.
+ throw new HiveException("Null key not expected in MapJoin");
+ }
/*
* Equal key series checking.
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java
index 7e58c75..6b63200 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap;
// Multi-Key specific imports.
-import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRowNoNulls;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow;
import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
@@ -63,7 +63,7 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera
// Object that can take a set of columns in row in a vectorized row batch and serialized it.
// Known to not have any nulls.
- private transient VectorSerializeRowNoNulls keyVectorSerializeWriteNoNulls;
+ private transient VectorSerializeRow keyVectorSerializeWrite;
// The BinarySortable serialization of the current key.
private transient Output currentKeyOutput;
@@ -103,9 +103,9 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera
* Initialize Multi-Key members for this specialized class.
*/
- keyVectorSerializeWriteNoNulls = new VectorSerializeRowNoNulls(
+ keyVectorSerializeWrite = new VectorSerializeRow(
new BinarySortableSerializeWrite(bigTableKeyColumnMap.length));
- keyVectorSerializeWriteNoNulls.init(bigTableKeyTypeNames, bigTableKeyColumnMap);
+ keyVectorSerializeWrite.init(bigTableKeyTypeNames, bigTableKeyColumnMap);
currentKeyOutput = new Output();
saveKeyOutput = new Output();
@@ -191,8 +191,12 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera
* Multi-Key specific repeated lookup.
*/
- keyVectorSerializeWriteNoNulls.setOutput(currentKeyOutput);
- keyVectorSerializeWriteNoNulls.serializeWriteNoNulls(batch, 0);
+ keyVectorSerializeWrite.setOutput(currentKeyOutput);
+ keyVectorSerializeWrite.serializeWrite(batch, 0);
+ if (keyVectorSerializeWrite.getHasAnyNulls()) {
+ // Not expecting NULLs in MapJoin -- they should have been filtered out.
+ throw new HiveException("Null key not expected in MapJoin");
+ }
byte[] keyBytes = currentKeyOutput.getData();
int keyLength = currentKeyOutput.getLength();
JoinUtil.JoinResult joinResult = hashMap.lookup(keyBytes, 0, keyLength, hashMapResults[0]);
@@ -245,8 +249,12 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera
*/
// Generate binary sortable key for current row in vectorized row batch.
- keyVectorSerializeWriteNoNulls.setOutput(currentKeyOutput);
- keyVectorSerializeWriteNoNulls.serializeWriteNoNulls(batch, batchIndex);
+ keyVectorSerializeWrite.setOutput(currentKeyOutput);
+ keyVectorSerializeWrite.serializeWrite(batch, batchIndex);
+ if (keyVectorSerializeWrite.getHasAnyNulls()) {
+ // Not expecting NULLs in MapJoin -- they should have been filtered out.
+ throw new HiveException("Null key not expected in MapJoin");
+ }
/*
* Equal key series checking.
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java
index 43e6fa7..f03bf6f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashSet;
// Multi-Key specific imports.
-import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRowNoNulls;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow;
import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
@@ -64,7 +64,7 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi
// Object that can take a set of columns in row in a vectorized row batch and serialized it.
// Known to not have any nulls.
- private transient VectorSerializeRowNoNulls keyVectorSerializeWriteNoNulls;
+ private transient VectorSerializeRow keyVectorSerializeWrite;
// The BinarySortable serialization of the current key.
private transient Output currentKeyOutput;
@@ -104,9 +104,9 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi
* Initialize Multi-Key members for this specialized class.
*/
- keyVectorSerializeWriteNoNulls = new VectorSerializeRowNoNulls(
+ keyVectorSerializeWrite = new VectorSerializeRow(
new BinarySortableSerializeWrite(bigTableKeyColumnMap.length));
- keyVectorSerializeWriteNoNulls.init(bigTableKeyTypeNames, bigTableKeyColumnMap);
+ keyVectorSerializeWrite.init(bigTableKeyTypeNames, bigTableKeyColumnMap);
currentKeyOutput = new Output();
saveKeyOutput = new Output();
@@ -193,8 +193,12 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi
* Multi-Key specific repeated lookup.
*/
- keyVectorSerializeWriteNoNulls.setOutput(currentKeyOutput);
- keyVectorSerializeWriteNoNulls.serializeWriteNoNulls(batch, 0);
+ keyVectorSerializeWrite.setOutput(currentKeyOutput);
+ keyVectorSerializeWrite.serializeWrite(batch, 0);
+ if (keyVectorSerializeWrite.getHasAnyNulls()) {
+ // Not expecting NULLs in MapJoin -- they should have been filtered out.
+ throw new HiveException("Null key not expected in MapJoin");
+ }
byte[] keyBytes = currentKeyOutput.getData();
int keyLength = currentKeyOutput.getLength();
// LOG.debug(CLASS_NAME + " processOp all " + displayBytes(keyBytes, 0, keyLength));
@@ -247,8 +251,12 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi
*/
// Generate binary sortable key for current row in vectorized row batch.
- keyVectorSerializeWriteNoNulls.setOutput(currentKeyOutput);
- keyVectorSerializeWriteNoNulls.serializeWriteNoNulls(batch, batchIndex);
+ keyVectorSerializeWrite.setOutput(currentKeyOutput);
+ keyVectorSerializeWrite.serializeWrite(batch, batchIndex);
+ if (keyVectorSerializeWrite.getHasAnyNulls()) {
+ // Not expecting NULLs in MapJoin -- they should have been filtered out.
+ throw new HiveException("Null key not expected in MapJoin");
+ }
// LOG.debug(CLASS_NAME + " currentKey " +
// VectorizedBatchUtil.displayBytes(currentKeyOutput.getData(), 0, currentKeyOutput.getLength()));
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java
index 49e0e85..2c98a24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java
@@ -294,9 +294,8 @@ public class VectorMapJoinOuterMultiKeyOperator extends VectorMapJoinOuterGenera
// Generate binary sortable key for current row in vectorized row batch.
keyVectorSerializeWrite.setOutput(currentKeyOutput);
- boolean isNull = keyVectorSerializeWrite.serializeWrite(batch, batchIndex);
-
- if (isNull) {
+ keyVectorSerializeWrite.serializeWrite(batch, batchIndex);
+ if (keyVectorSerializeWrite.getHasAnyNulls()) {
// Have that the NULL does not interfere with the current equal key series, if there
// is one. We do not set saveJoinResult.
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
index 36ee768..0ff98bd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.JoinUtil;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
/*
* An single byte array value hash map optimized for vector map join.
@@ -71,7 +72,7 @@ public abstract class VectorMapJoinFastBytesHashMap
optimizedHashMapResult.forget();
- long hashCode = VectorMapJoinFastBytesHashUtil.hashKey(keyBytes, keyStart, keyLength);
+ long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
JoinUtil.JoinResult joinResult;
if (valueRefWord == -1) {
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
index fc04504..5d8ed2d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.JoinUtil;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMultiSet;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSetResult;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
/*
* An single byte array value hash multi-set optimized for vector map join.
@@ -67,7 +68,7 @@ public abstract class VectorMapJoinFastBytesHashMultiSet
optimizedHashMultiSetResult.forget();
- long hashCode = VectorMapJoinFastBytesHashUtil.hashKey(keyBytes, keyStart, keyLength);
+ long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
JoinUtil.JoinResult joinResult;
if (count == -1) {
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
index bac10df..990a2e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.JoinUtil;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashSet;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashSetResult;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
/*
* An single byte array value hash multi-set optimized for vector map join.
@@ -62,7 +63,7 @@ public abstract class VectorMapJoinFastBytesHashSet
optimizedHashSetResult.forget();
- long hashCode = VectorMapJoinFastBytesHashUtil.hashKey(keyBytes, keyStart, keyLength);
+ long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
JoinUtil.JoinResult joinResult;
if (existance == -1) {
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
index c06482b..b978bf0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashTable;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
import com.google.common.annotations.VisibleForTesting;
@@ -70,7 +71,7 @@ public abstract class VectorMapJoinFastBytesHashTable
expandAndRehash();
}
- long hashCode = VectorMapJoinFastBytesHashUtil.hashKey(keyBytes, keyStart, keyLength);
+ long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
int intHashCode = (int) hashCode;
int slot = (intHashCode & logicalHashBucketMask);
long probeSlot = slot;
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashUtil.java
index 28f7357..80126ad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashUtil.java
@@ -22,10 +22,6 @@ import org.apache.hadoop.hive.serde2.WriteBuffers;
public class VectorMapJoinFastBytesHashUtil {
- public static long hashKey(byte[] bytes, int start, int length) {
- return WriteBuffers.murmurHash(bytes, start, length);
- }
-
public static String displayBytes(byte[] bytes, int start, int length) {
StringBuilder sb = new StringBuilder();
for (int i = start; i < start + length; i++) {
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastIntHashUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastIntHashUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastIntHashUtil.java
deleted file mode 100644
index a818cb2..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastIntHashUtil.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
-
-public class VectorMapJoinFastIntHashUtil {
-
- public static int hashKey(int key) {
- key = ~key + (key << 15); // key = (key << 15) - key - 1;
- key = key ^ (key >>> 12);
- key = key + (key << 2);
- key = key ^ (key >>> 4);
- key = key * 2057; // key = (key + (key << 3)) + (key << 11);
- key = key ^ (key >>> 16);
- return key;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
index 149f1d0..1384fc9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHash
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMap;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
/*
* An single long value map optimized for vector map join.
@@ -67,7 +68,7 @@ public class VectorMapJoinFastLongHashMap
optimizedHashMapResult.forget();
- long hashCode = VectorMapJoinFastLongHashUtil.hashKey(key);
+ long hashCode = HashCodeUtil.calculateLongHashCode(key);
// LOG.debug("VectorMapJoinFastLongHashMap lookup " + key + " hashCode " + hashCode);
long valueRef = findReadSlot(key, hashCode);
JoinUtil.JoinResult joinResult;
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
index 87c17e7..94bf706 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
/*
* An single long value multi-set optimized for vector map join.
@@ -67,7 +68,7 @@ public class VectorMapJoinFastLongHashMultiSet
optimizedHashMultiSetResult.forget();
- long hashCode = VectorMapJoinFastLongHashUtil.hashKey(key);
+ long hashCode = HashCodeUtil.calculateLongHashCode(key);
long count = findReadSlot(key, hashCode);
JoinUtil.JoinResult joinResult;
if (count == -1) {
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
index d5aa99c..2cbc548 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHash
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashSet;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
/*
* An single long value multi-set optimized for vector map join.
@@ -60,7 +61,7 @@ public class VectorMapJoinFastLongHashSet
optimizedHashSetResult.forget();
- long hashCode = VectorMapJoinFastLongHashUtil.hashKey(key);
+ long hashCode = HashCodeUtil.calculateLongHashCode(key);
long existance = findReadSlot(key, hashCode);
JoinUtil.JoinResult joinResult;
if (existance == -1) {
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
index 5b48fcf..7ea3455 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeseriali
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
import org.apache.tez.runtime.library.api.KeyValueReader;
import com.google.common.annotations.VisibleForTesting;
@@ -111,7 +112,7 @@ public abstract class VectorMapJoinFastLongHashTable
expandAndRehash();
}
- long hashCode = VectorMapJoinFastLongHashUtil.hashKey(key);
+ long hashCode = HashCodeUtil.calculateLongHashCode(key);
int intHashCode = (int) hashCode;
int slot = (intHashCode & logicalHashBucketMask);
long probeSlot = slot;
@@ -179,7 +180,7 @@ public abstract class VectorMapJoinFastLongHashTable
long tableKey = slotPairs[pairIndex + 1];
// Copy to new slot table.
- long hashCode = VectorMapJoinFastLongHashUtil.hashKey(tableKey);
+ long hashCode = HashCodeUtil.calculateLongHashCode(tableKey);
int intHashCode = (int) hashCode;
int newSlot = intHashCode & newLogicalHashBucketMask;
long newProbeSlot = newSlot;
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java
index 298ca61..1877f14 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java
@@ -25,17 +25,6 @@ import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeseriali
public class VectorMapJoinFastLongHashUtil {
- public static long hashKey(long key) {
- key = (~key) + (key << 21); // key = (key << 21) - key - 1;
- key = key ^ (key >>> 24);
- key = (key + (key << 3)) + (key << 8); // key * 265
- key = key ^ (key >>> 14);
- key = (key + (key << 2)) + (key << 4); // key * 21
- key = key ^ (key >>> 28);
- key = key + (key << 31);
- return key;
- }
-
public static long deserializeLongKey(BinarySortableDeserializeRead keyBinarySortableDeserializeRead,
HashTableKeyType hashTableKeyType) throws IOException {
long key = 0;
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
new file mode 100644
index 0000000..a79a649
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.reducesink;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hive.common.util.HashCodeUtil;
+
+/**
+ * This class is common operator class for native vectorized reduce sink.
+ */
+public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<ReduceSinkDesc>
+ implements VectorizationContextRegion {
+
+ private static final long serialVersionUID = 1L;
+ private static final String CLASS_NAME = VectorReduceSinkCommonOperator.class.getName();
+ private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+ protected VectorReduceSinkDesc vectorDesc;
+
+ /**
+ * Information about our native vectorized reduce sink created by the Vectorizer class during
+ * it decision process and useful for execution.
+ */
+ protected VectorReduceSinkInfo vectorReduceSinkInfo;
+
+ protected VectorizationContext vContext;
+
+ /**
+ * Reduce sink key vector expressions.
+ */
+
+ // This is map of which vectorized row batch columns are the key columns.
+ // And, their types.
+ protected int[] reduceSinkKeyColumnMap;
+ protected TypeInfo[] reduceSinkKeyTypeInfos;
+
+ // Optional vectorized key expressions that need to be run on each batch.
+ protected VectorExpression[] reduceSinkKeyExpressions;
+
+ // This is map of which vectorized row batch columns are the value columns.
+ // And, their types.
+ protected int[] reduceSinkValueColumnMap;
+ protected TypeInfo[] reduceSinkValueTypeInfos;
+
+ // Optional vectorized value expressions that need to be run on each batch.
+ protected VectorExpression[] reduceSinkValueExpressions;
+
+ // The above members are initialized by the constructor and must not be
+ // transient.
+ //---------------------------------------------------------------------------
+
+ // Whether there is to be a tag added to the end of each key and the tag value.
+ private transient boolean reduceSkipTag;
+ private transient byte reduceTagByte;
+
+ // Binary sortable key serializer.
+ protected transient BinarySortableSerializeWrite keyBinarySortableSerializeWrite;
+
+ // The serialized all null key and its hash code.
+ private transient byte[] nullBytes;
+ private transient int nullKeyHashCode;
+
+ // Lazy binary value serializer.
+ private transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
+
+ // This helper object serializes LazyBinary format reducer values from columns of a row
+ // in a vectorized row batch.
+ private transient VectorSerializeRow<LazyBinarySerializeWrite> valueVectorSerializeRow;
+
+ // The output buffer used to serialize a value into.
+ private transient Output valueOutput;
+
+ // The hive key and bytes writable value needed to pass the key and value to the collector.
+ private transient HiveKey keyWritable;
+ private transient BytesWritable valueBytesWritable;
+
+ // Where to write our key and value pairs.
+ private transient OutputCollector out;
+
+ // The object that determines equal key series.
+ protected transient VectorKeySeriesSerialized serializedKeySeries;
+
+ private transient long numRows = 0;
+ private transient long cntr = 1;
+ private transient long logEveryNRows = 0;
+ private final transient LongWritable recordCounter = new LongWritable();
+
+ // For debug tracing: the name of the map or reduce task.
+ protected transient String taskName;
+
+ // Debug display.
+ protected transient long batchCounter;
+
+ //---------------------------------------------------------------------------
+
+ public VectorReduceSinkCommonOperator() {
+ super();
+ }
+
+ public VectorReduceSinkCommonOperator(VectorizationContext vContext, OperatorDesc conf)
+ throws HiveException {
+ super();
+
+ ReduceSinkDesc desc = (ReduceSinkDesc) conf;
+ this.conf = desc;
+ vectorDesc = desc.getVectorDesc();
+ vectorReduceSinkInfo = vectorDesc.getVectorReduceSinkInfo();
+ this.vContext = vContext;
+
+ // Since a key expression can be a calculation and the key will go into a scratch column,
+ // we need the mapping and type information.
+ reduceSinkKeyColumnMap = vectorReduceSinkInfo.getReduceSinkKeyColumnMap();
+ reduceSinkKeyTypeInfos = vectorReduceSinkInfo.getReduceSinkKeyTypeInfos();
+ reduceSinkKeyExpressions = vectorReduceSinkInfo.getReduceSinkKeyExpressions();
+
+ reduceSinkValueColumnMap = vectorReduceSinkInfo.getReduceSinkValueColumnMap();
+ reduceSinkValueTypeInfos = vectorReduceSinkInfo.getReduceSinkValueTypeInfos();
+ reduceSinkValueExpressions = vectorReduceSinkInfo.getReduceSinkValueExpressions();
+ }
+
+ // Get the sort order
+ private boolean[] getColumnSortOrder(Properties properties, int columnCount) {
+ String columnSortOrder = properties.getProperty(serdeConstants.SERIALIZATION_SORT_ORDER);
+ boolean[] columnSortOrderIsDesc = new boolean[columnCount];
+ if (columnSortOrder == null) {
+ Arrays.fill(columnSortOrderIsDesc, false);
+ } else {
+ for (int i = 0; i < columnSortOrderIsDesc.length; i++) {
+ columnSortOrderIsDesc[i] = (columnSortOrder.charAt(i) == '-');
+ }
+ }
+ return columnSortOrderIsDesc;
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ super.initializeOp(hconf);
+
+ if (LOG.isDebugEnabled()) {
+ // Determine the name of our map or reduce task for debug tracing.
+ BaseWork work = Utilities.getMapWork(hconf);
+ if (work == null) {
+ work = Utilities.getReduceWork(hconf);
+ }
+ taskName = work.getName();
+ }
+
+ String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+ if (context != null && !context.isEmpty()) {
+ context = "_" + context.replace(" ","_");
+ }
+ statsMap.put(Counter.RECORDS_OUT_INTERMEDIATE + context, recordCounter);
+
+ reduceSkipTag = conf.getSkipTag();
+ reduceTagByte = (byte) conf.getTag();
+
+ if (isLogInfoEnabled) {
+ LOG.info("Using tag = " + (int) reduceTagByte);
+ }
+
+ TableDesc keyTableDesc = conf.getKeySerializeInfo();
+ boolean[] columnSortOrder =
+ getColumnSortOrder(keyTableDesc.getProperties(), reduceSinkKeyColumnMap.length);
+
+ keyBinarySortableSerializeWrite = new BinarySortableSerializeWrite(columnSortOrder);
+
+ // Create all nulls key.
+ try {
+ Output nullKeyOutput = new Output();
+ keyBinarySortableSerializeWrite.set(nullKeyOutput);
+ for (int i = 0; i < reduceSinkKeyColumnMap.length; i++) {
+ keyBinarySortableSerializeWrite.writeNull();
+ }
+ int nullBytesLength = nullKeyOutput.getLength();
+ nullBytes = new byte[nullBytesLength];
+ System.arraycopy(nullKeyOutput.getData(), 0, nullBytes, 0, nullBytesLength);
+ nullKeyHashCode = HashCodeUtil.calculateBytesHashCode(nullBytes, 0, nullBytesLength);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+
+ valueLazyBinarySerializeWrite = new LazyBinarySerializeWrite(reduceSinkValueColumnMap.length);
+
+ valueVectorSerializeRow =
+ new VectorSerializeRow<LazyBinarySerializeWrite>(
+ valueLazyBinarySerializeWrite);
+ valueVectorSerializeRow.init(reduceSinkValueTypeInfos, reduceSinkValueColumnMap);
+
+ valueOutput = new Output();
+ valueVectorSerializeRow.setOutput(valueOutput);
+
+ keyWritable = new HiveKey();
+
+ valueBytesWritable = new BytesWritable();
+
+ batchCounter = 0;
+ }
+
+ @Override
+ public void process(Object row, int tag) throws HiveException {
+
+ try {
+ VectorizedRowBatch batch = (VectorizedRowBatch) row;
+
+ batchCounter++;
+
+ if (batch.size == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty");
+ }
+ return;
+ }
+
+ // Perform any key expressions. Results will go into scratch columns.
+ if (reduceSinkKeyExpressions != null) {
+ for (VectorExpression ve : reduceSinkKeyExpressions) {
+ ve.evaluate(batch);
+ }
+ }
+
+ // Perform any value expressions. Results will go into scratch columns.
+ if (reduceSinkValueExpressions != null) {
+ for (VectorExpression ve : reduceSinkValueExpressions) {
+ ve.evaluate(batch);
+ }
+ }
+
+ serializedKeySeries.processBatch(batch);
+
+ boolean selectedInUse = batch.selectedInUse;
+ int[] selected = batch.selected;
+
+ int keyLength;
+ int logical;
+ int end;
+ int batchIndex;
+ do {
+ if (serializedKeySeries.getCurrentIsAllNull()) {
+
+ // Use the same logic as ReduceSinkOperator.toHiveKey.
+ //
+ if (tag == -1 || reduceSkipTag) {
+ keyWritable.set(nullBytes, 0, nullBytes.length);
+ } else {
+ keyWritable.setSize(nullBytes.length + 1);
+ System.arraycopy(nullBytes, 0, keyWritable.get(), 0, nullBytes.length);
+ keyWritable.get()[nullBytes.length] = reduceTagByte;
+ }
+ keyWritable.setDistKeyLength(nullBytes.length);
+ keyWritable.setHashCode(nullKeyHashCode);
+
+ } else {
+
+ // One serialized key for 1 or more rows for the duplicate keys.
+ // LOG.info("reduceSkipTag " + reduceSkipTag + " tag " + tag + " reduceTagByte " + (int) reduceTagByte + " keyLength " + serializedKeySeries.getSerializedLength());
+ // LOG.info("process offset " + serializedKeySeries.getSerializedStart() + " length " + serializedKeySeries.getSerializedLength());
+ keyLength = serializedKeySeries.getSerializedLength();
+ if (tag == -1 || reduceSkipTag) {
+ keyWritable.set(serializedKeySeries.getSerializedBytes(),
+ serializedKeySeries.getSerializedStart(), keyLength);
+ } else {
+ keyWritable.setSize(keyLength + 1);
+ System.arraycopy(serializedKeySeries.getSerializedBytes(),
+ serializedKeySeries.getSerializedStart(), keyWritable.get(), 0, keyLength);
+ keyWritable.get()[keyLength] = reduceTagByte;
+ }
+ keyWritable.setDistKeyLength(keyLength);
+ keyWritable.setHashCode(serializedKeySeries.getCurrentHashCode());
+ }
+
+ logical = serializedKeySeries.getCurrentLogical();
+ end = logical + serializedKeySeries.getCurrentDuplicateCount();
+ do {
+ batchIndex = (selectedInUse ? selected[logical] : logical);
+
+ valueLazyBinarySerializeWrite.reset();
+ valueVectorSerializeRow.serializeWrite(batch, batchIndex);
+
+ valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength());
+
+ collect(keyWritable, valueBytesWritable);
+ } while (++logical < end);
+
+ if (!serializedKeySeries.next()) {
+ break;
+ }
+ } while (true);
+
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException {
+ // Since this is a terminal operator, update counters explicitly -
+ // forward is not called
+ if (null != out) {
+ numRows++;
+ if (isLogInfoEnabled) {
+ if (numRows == cntr) {
+ cntr = logEveryNRows == 0 ? cntr * 10 : numRows + logEveryNRows;
+ if (cntr < 0 || numRows < 0) {
+ cntr = 0;
+ numRows = 1;
+ }
+ LOG.info(toString() + ": records written - " + numRows);
+ }
+ }
+
+ // BytesWritable valueBytesWritable = (BytesWritable) valueWritable;
+ // LOG.info("VectorReduceSinkCommonOperator collect keyWritable " + keyWritable.getLength() + " " +
+ // VectorizedBatchUtil.displayBytes(keyWritable.getBytes(), 0, keyWritable.getLength()) +
+ // " valueWritable " + valueBytesWritable.getLength() +
+ // VectorizedBatchUtil.displayBytes(valueBytesWritable.getBytes(), 0, valueBytesWritable.getLength()));
+
+ out.collect(keyWritable, valueWritable);
+ }
+ }
+
+ @Override
+ protected void closeOp(boolean abort) throws HiveException {
+ super.closeOp(abort);
+ out = null;
+ if (isLogInfoEnabled) {
+ LOG.info(toString() + ": records written - " + numRows);
+ }
+ recordCounter.set(numRows);
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "RS";
+ }
+
+ @Override
+ public OperatorType getType() {
+ return OperatorType.REDUCESINK;
+ }
+
+ @Override
+ public VectorizationContext getOuputVectorizationContext() {
+ return vContext;
+ }
+
+ @Override
+ public boolean getIsReduceSink() {
+ return true;
+ }
+
+ @Override
+ public String getReduceOutputName() {
+ return conf.getOutputName();
+ }
+
+ @Override
+ public void setOutputCollector(OutputCollector _out) {
+ this.out = _out;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java
new file mode 100644
index 0000000..cec5660
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.reducesink;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesLongSerialized;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+
+/*
+ * Specialized class for native vectorized reduce sink that is reducing on a single long key column.
+ */
+public class VectorReduceSinkLongOperator extends VectorReduceSinkCommonOperator {
+
+ private static final long serialVersionUID = 1L;
+ private static final String CLASS_NAME = VectorReduceSinkLongOperator.class.getName();
+ private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+ // The column number and type information for this one column long reduce key.
+ private transient int singleKeyColumn;
+ private transient PrimitiveTypeInfo singleKeyColumnPrimitiveTypeInfo;
+
+ // The above members are initialized by the constructor and must not be
+ // transient.
+ //---------------------------------------------------------------------------
+
+ //---------------------------------------------------------------------------
+ // Pass-thru constructors.
+ //
+
+ public VectorReduceSinkLongOperator() {
+ super();
+ }
+
+ public VectorReduceSinkLongOperator(VectorizationContext vContext, OperatorDesc conf)
+ throws HiveException {
+ super(vContext, conf);
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ super.initializeOp(hconf);
+
+ singleKeyColumn = reduceSinkKeyColumnMap[0];
+ singleKeyColumnPrimitiveTypeInfo = (PrimitiveTypeInfo) reduceSinkKeyTypeInfos[0];
+
+ serializedKeySeries =
+ new VectorKeySeriesLongSerialized<BinarySortableSerializeWrite>(
+ singleKeyColumn, singleKeyColumnPrimitiveTypeInfo, keyBinarySortableSerializeWrite);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java
new file mode 100644
index 0000000..a4ef66b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.reducesink;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesMultiSerialized;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+
+/*
+ * Specialized class for native vectorized reduce sink that is reducing on multiple key columns
+ * (or a single non-long / non-string column).
+ */
+public class VectorReduceSinkMultiKeyOperator extends VectorReduceSinkCommonOperator {
+
+ private static final long serialVersionUID = 1L;
+ private static final String CLASS_NAME = VectorReduceSinkMultiKeyOperator.class.getName();
+ private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+ // The above members are initialized by the constructor and must not be
+ // transient.
+ //---------------------------------------------------------------------------
+
+ //---------------------------------------------------------------------------
+ // Pass-thru constructors.
+ //
+
+ public VectorReduceSinkMultiKeyOperator() {
+ super();
+ }
+
+ public VectorReduceSinkMultiKeyOperator(VectorizationContext vContext, OperatorDesc conf)
+ throws HiveException {
+ super(vContext, conf);
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ super.initializeOp(hconf);
+
+ VectorKeySeriesMultiSerialized<BinarySortableSerializeWrite> serializedMultiKeySeries =
+ new VectorKeySeriesMultiSerialized<BinarySortableSerializeWrite>(
+ keyBinarySortableSerializeWrite);
+ serializedMultiKeySeries.init(reduceSinkKeyTypeInfos, reduceSinkKeyColumnMap);
+
+ serializedKeySeries = serializedMultiKeySeries;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java
new file mode 100644
index 0000000..b6cb527
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.reducesink;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesBytesSerialized;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+
+/*
+ * Specialized class for native vectorized reduce sink that is reducing on a single long key column.
+ */
+public class VectorReduceSinkStringOperator extends VectorReduceSinkCommonOperator {
+
+ private static final long serialVersionUID = 1L;
+ private static final String CLASS_NAME = VectorReduceSinkStringOperator.class.getName();
+ private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+ // The column number and type information for this one column string reduce key.
+ private transient int singleKeyColumn;
+
+ // The above members are initialized by the constructor and must not be
+ // transient.
+ //---------------------------------------------------------------------------
+
+ //---------------------------------------------------------------------------
+ // Pass-thru constructors.
+ //
+
+ public VectorReduceSinkStringOperator() {
+ super();
+ }
+
+ public VectorReduceSinkStringOperator(VectorizationContext vContext, OperatorDesc conf)
+ throws HiveException {
+ super(vContext, conf);
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ super.initializeOp(hconf);
+
+ singleKeyColumn = reduceSinkKeyColumnMap[0];
+
+ serializedKeySeries =
+ new VectorKeySeriesBytesSerialized<BinarySortableSerializeWrite>(
+ singleKeyColumn, keyBinarySortableSerializeWrite);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 4dead18..97e7013 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.optimizer.physical;
+import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -53,7 +55,10 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinLeftSemiString
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterLongOperator;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterMultiKeyOperator;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOperator;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkLongOperator;
+import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkMultiKeyOperator;
+import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkStringOperator;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
@@ -61,7 +66,10 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.InConstantType;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -89,10 +97,12 @@ import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc;
@@ -100,6 +110,8 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.udf.UDFAcos;
import org.apache.hadoop.hive.ql.udf.UDFAsin;
@@ -140,8 +152,10 @@ import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear;
import org.apache.hadoop.hive.ql.udf.UDFYear;
import org.apache.hadoop.hive.ql.udf.generic.*;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
@@ -1716,6 +1730,189 @@ public class Vectorizer implements PhysicalPlanResolver {
return specialize;
}
+ private Operator<? extends OperatorDesc> specializeReduceSinkOperator(
+ Operator<? extends OperatorDesc> op, VectorizationContext vContext, ReduceSinkDesc desc,
+ VectorReduceSinkInfo vectorReduceSinkInfo) throws HiveException {
+
+ Operator<? extends OperatorDesc> vectorOp = null;
+ Class<? extends Operator<?>> opClass = null;
+
+ Type[] reduceSinkKeyColumnVectorTypes = vectorReduceSinkInfo.getReduceSinkKeyColumnVectorTypes();
+
+ // By default, we can always use the multi-key class.
+ VectorReduceSinkDesc.ReduceSinkKeyType reduceSinkKeyType = VectorReduceSinkDesc.ReduceSinkKeyType.MULTI_KEY;
+
+ // Look for single column optimization.
+ if (reduceSinkKeyColumnVectorTypes.length == 1) {
+ LOG.info("Vectorizer vectorizeOperator groupby typeName " + vectorReduceSinkInfo.getReduceSinkKeyTypeInfos()[0]);
+ Type columnVectorType = reduceSinkKeyColumnVectorTypes[0];
+ switch (columnVectorType) {
+ case LONG:
+ {
+ PrimitiveCategory primitiveCategory =
+ ((PrimitiveTypeInfo) vectorReduceSinkInfo.getReduceSinkKeyTypeInfos()[0]).getPrimitiveCategory();
+ switch (primitiveCategory) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ reduceSinkKeyType = VectorReduceSinkDesc.ReduceSinkKeyType.LONG;
+ break;
+ default:
+ // Other integer types not supported yet.
+ break;
+ }
+ }
+ break;
+ case BYTES:
+ reduceSinkKeyType = VectorReduceSinkDesc.ReduceSinkKeyType.STRING;
+ default:
+ // Stay with multi-key.
+ break;
+ }
+ }
+
+ switch (reduceSinkKeyType) {
+ case LONG:
+ opClass = VectorReduceSinkLongOperator.class;
+ break;
+ case STRING:
+ opClass = VectorReduceSinkStringOperator.class;
+ break;
+ case MULTI_KEY:
+ opClass = VectorReduceSinkMultiKeyOperator.class;
+ break;
+ default:
+ throw new HiveException("Unknown reduce sink key type " + reduceSinkKeyType);
+ }
+
+ VectorReduceSinkDesc vectorDesc = new VectorReduceSinkDesc();
+ desc.setVectorDesc(vectorDesc);
+ vectorDesc.setReduceSinkKeyType(reduceSinkKeyType);
+ vectorDesc.setVectorReduceSinkInfo(vectorReduceSinkInfo);
+
+ vectorOp = OperatorFactory.getVectorOperator(opClass, op.getConf(), vContext);
+ LOG.info("Vectorizer vectorizeOperator reduce sink class " + vectorOp.getClass().getSimpleName());
+
+ return vectorOp;
+ }
+
+ private boolean canSpecializeReduceSink(ReduceSinkDesc desc,
+ boolean isTez, VectorizationContext vContext,
+ VectorReduceSinkInfo vectorReduceSinkInfo) throws HiveException {
+
+ if (!HiveConf.getBoolVar(hiveConf,
+ HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCESINK_NEW_ENABLED)) {
+ return false;
+ }
+
+ // Many restrictions.
+
+ if (!isTez) {
+ return false;
+ }
+
+ if (desc.getWriteType() == AcidUtils.Operation.UPDATE ||
+ desc.getWriteType() == AcidUtils.Operation.DELETE) {
+ return false;
+ }
+
+ if (desc.getBucketCols() != null && !desc.getBucketCols().isEmpty()) {
+ return false;
+ }
+
+ boolean useUniformHash = desc.getReducerTraits().contains(UNIFORM);
+ if (!useUniformHash) {
+ return false;
+ }
+
+ if (desc.getTopN() >= 0) {
+ return false;
+ }
+
+ if (desc.getDistinctColumnIndices().size() > 0) {
+ return false;
+ }
+
+ TableDesc keyTableDesc = desc.getKeySerializeInfo();
+ Class<? extends Deserializer> keySerializerClass = keyTableDesc.getDeserializerClass();
+ if (keySerializerClass != org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe.class) {
+ return false;
+ }
+
+ TableDesc valueTableDesc = desc.getValueSerializeInfo();
+ Class<? extends Deserializer> valueDeserializerClass = valueTableDesc.getDeserializerClass();
+ if (valueDeserializerClass != org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.class) {
+ return false;
+ }
+
+ // We are doing work here we'd normally do in VectorGroupByCommonOperator's constructor.
+ // So if we later decide not to specialize, we'll just waste any scratch columns allocated...
+
+ List<ExprNodeDesc> keysDescs = desc.getKeyCols();
+ VectorExpression[] allKeyExpressions = vContext.getVectorExpressions(keysDescs);
+
+ // Since a key expression can be a calculation and the key will go into a scratch column,
+ // we need the mapping and type information.
+ int[] reduceSinkKeyColumnMap = new int[allKeyExpressions.length];
+ TypeInfo[] reduceSinkKeyTypeInfos = new TypeInfo[allKeyExpressions.length];
+ Type[] reduceSinkKeyColumnVectorTypes = new Type[allKeyExpressions.length];
+ ArrayList<VectorExpression> groupByKeyExpressionsList = new ArrayList<VectorExpression>();
+ VectorExpression[] reduceSinkKeyExpressions;
+ for (int i = 0; i < reduceSinkKeyColumnMap.length; i++) {
+ VectorExpression ve = allKeyExpressions[i];
+ reduceSinkKeyColumnMap[i] = ve.getOutputColumn();
+ reduceSinkKeyTypeInfos[i] = keysDescs.get(i).getTypeInfo();
+ reduceSinkKeyColumnVectorTypes[i] =
+ VectorizationContext.getColumnVectorTypeFromTypeInfo(reduceSinkKeyTypeInfos[i]);
+ if (!IdentityExpression.isColumnOnly(ve)) {
+ groupByKeyExpressionsList.add(ve);
+ }
+ }
+ if (groupByKeyExpressionsList.size() == 0) {
+ reduceSinkKeyExpressions = null;
+ } else {
+ reduceSinkKeyExpressions = groupByKeyExpressionsList.toArray(new VectorExpression[0]);
+ }
+
+ ArrayList<ExprNodeDesc> valueDescs = desc.getValueCols();
+ VectorExpression[] allValueExpressions = vContext.getVectorExpressions(valueDescs);
+
+ int[] reduceSinkValueColumnMap = new int[valueDescs.size()];
+ TypeInfo[] reduceSinkValueTypeInfos = new TypeInfo[valueDescs.size()];
+ Type[] reduceSinkValueColumnVectorTypes = new Type[valueDescs.size()];
+ ArrayList<VectorExpression> reduceSinkValueExpressionsList = new ArrayList<VectorExpression>();
+ VectorExpression[] reduceSinkValueExpressions;
+ for (int i = 0; i < valueDescs.size(); ++i) {
+ VectorExpression ve = allValueExpressions[i];
+ reduceSinkValueColumnMap[i] = ve.getOutputColumn();
+ reduceSinkValueTypeInfos[i] = valueDescs.get(i).getTypeInfo();
+ reduceSinkValueColumnVectorTypes[i] =
+ VectorizationContext.getColumnVectorTypeFromTypeInfo(reduceSinkValueTypeInfos[i]);
+ if (!IdentityExpression.isColumnOnly(ve)) {
+ reduceSinkValueExpressionsList.add(ve);
+ }
+ }
+ if (reduceSinkValueExpressionsList.size() == 0) {
+ reduceSinkValueExpressions = null;
+ } else {
+ reduceSinkValueExpressions = reduceSinkValueExpressionsList.toArray(new VectorExpression[0]);
+ }
+
+ vectorReduceSinkInfo.setReduceSinkKeyColumnMap(reduceSinkKeyColumnMap);
+ vectorReduceSinkInfo.setReduceSinkKeyTypeInfos(reduceSinkKeyTypeInfos);
+ vectorReduceSinkInfo.setReduceSinkKeyColumnVectorTypes(reduceSinkKeyColumnVectorTypes);
+ vectorReduceSinkInfo.setReduceSinkKeyExpressions(reduceSinkKeyExpressions);
+
+ vectorReduceSinkInfo.setReduceSinkValueColumnMap(reduceSinkValueColumnMap);
+ vectorReduceSinkInfo.setReduceSinkValueTypeInfos(reduceSinkValueTypeInfos);
+ vectorReduceSinkInfo.setReduceSinkValueColumnVectorTypes(reduceSinkValueColumnVectorTypes);
+ vectorReduceSinkInfo.setReduceSinkValueExpressions(reduceSinkValueExpressions);
+
+ return true;
+ }
+
Operator<? extends OperatorDesc> vectorizeOperator(Operator<? extends OperatorDesc> op,
VectorizationContext vContext, boolean isTez) throws HiveException {
Operator<? extends OperatorDesc> vectorOp = null;
@@ -1756,11 +1953,28 @@ public class Vectorizer implements PhysicalPlanResolver {
}
}
break;
+
+ case REDUCESINK:
+ {
+ VectorReduceSinkInfo vectorReduceSinkInfo = new VectorReduceSinkInfo();
+ ReduceSinkDesc desc = (ReduceSinkDesc) op.getConf();
+ boolean specialize = canSpecializeReduceSink(desc, isTez, vContext, vectorReduceSinkInfo);
+
+ if (!specialize) {
+
+ vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext);
+
+ } else {
+
+ vectorOp = specializeReduceSinkOperator(op, vContext, desc, vectorReduceSinkInfo);
+
+ }
+ }
+ break;
case GROUPBY:
case FILTER:
case SELECT:
case FILESINK:
- case REDUCESINK:
case LIMIT:
case EXTRACT:
case EVENT:
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
index 615739e..2f69b7f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
@@ -120,6 +120,10 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
private transient boolean hasOrderBy = false;
private static transient Logger LOG = LoggerFactory.getLogger(ReduceSinkDesc.class);
+
+ // Extra parameters only for vectorization.
+ private VectorReduceSinkDesc vectorDesc;
+
public ReduceSinkDesc() {
}
@@ -146,6 +150,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
this.setNumBuckets(-1);
this.setBucketCols(null);
this.writeType = writeType;
+ this.vectorDesc = null;
}
@Override
@@ -175,9 +180,21 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
desc.reduceTraits = reduceTraits.clone();
desc.setDeduplicated(isDeduplicated);
desc.setHasOrderBy(hasOrderBy);
+ if (vectorDesc != null) {
+ throw new RuntimeException("Clone with vectorization desc not supported");
+ }
+ desc.vectorDesc = null;
return desc;
}
+ public void setVectorDesc(VectorReduceSinkDesc vectorDesc) {
+ this.vectorDesc = vectorDesc;
+ }
+
+ public VectorReduceSinkDesc getVectorDesc() {
+ return vectorDesc;
+ }
+
public java.util.ArrayList<java.lang.String> getOutputKeyColumnNames() {
return outputKeyColumnNames;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java
new file mode 100644
index 0000000..c56bff6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+/**
+ * VectorReduceSinkDesc.
+ *
+ * Extra parameters beyond ReduceSinkDesc just for the VectorReduceSinkOperator.
+ *
+ * We don't extend ReduceSinkDesc because the base OperatorDesc doesn't support
+ * clone and adding it is a lot work for little gain.
+ */
+public class VectorReduceSinkDesc extends AbstractVectorDesc {
+
+ private static long serialVersionUID = 1L;
+
+ public static enum ReduceSinkKeyType {
+ NONE,
+ LONG,
+ STRING,
+ MULTI_KEY
+ }
+
+ private ReduceSinkKeyType reduceSinkKeyType;
+
+ private VectorReduceSinkInfo vectorReduceSinkInfo;
+
+ public VectorReduceSinkDesc() {
+ reduceSinkKeyType = ReduceSinkKeyType.NONE;
+ vectorReduceSinkInfo = null;
+ }
+
+ public ReduceSinkKeyType reduceSinkKeyType() {
+ return reduceSinkKeyType;
+ }
+
+ public void setReduceSinkKeyType(ReduceSinkKeyType reduceSinkKeyType) {
+ this.reduceSinkKeyType = reduceSinkKeyType;
+ }
+
+ public void setVectorReduceSinkInfo(VectorReduceSinkInfo vectorReduceSinkInfo) {
+ this.vectorReduceSinkInfo = vectorReduceSinkInfo;
+ }
+
+ public VectorReduceSinkInfo getVectorReduceSinkInfo() {
+ return vectorReduceSinkInfo;
+ }
+}