You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/05/02 17:58:01 UTC
[2/2] hive git commit: HIVE-16546: LLAP: Fail map join tasks if hash
table memory exceeds threshold (Prasanth Jayachandran reviewed by Sergey
Shelukhin)
HIVE-16546: LLAP: Fail map join tasks if hash table memory exceeds threshold (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0ffff404
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0ffff404
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0ffff404
Branch: refs/heads/master
Commit: 0ffff404088a428da752a60a0847f51845e618ff
Parents: c2637e6
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Tue May 2 10:53:19 2017 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Tue May 2 10:53:19 2017 -0700
----------------------------------------------------------------------
.../hadoop/hive/common/MemoryEstimate.java | 29 ++++++++++
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 ++
.../llap/IncrementalObjectSizeEstimator.java | 4 +-
.../UDAFTemplates/VectorUDAFAvg.txt | 2 +-
.../UDAFTemplates/VectorUDAFMinMax.txt | 2 +-
.../UDAFTemplates/VectorUDAFMinMaxDecimal.txt | 2 +-
.../VectorUDAFMinMaxIntervalDayTime.txt | 2 +-
.../UDAFTemplates/VectorUDAFMinMaxString.txt | 4 +-
.../UDAFTemplates/VectorUDAFMinMaxTimestamp.txt | 2 +-
.../UDAFTemplates/VectorUDAFSum.txt | 2 +-
.../UDAFTemplates/VectorUDAFVar.txt | 2 +-
.../UDAFTemplates/VectorUDAFVarDecimal.txt | 4 +-
.../mapjoin/MapJoinMemoryExhaustionError.java | 28 ++++++++++
.../MapJoinMemoryExhaustionException.java | 29 ----------
.../mapjoin/MapJoinMemoryExhaustionHandler.java | 6 +-
.../hadoop/hive/ql/exec/mr/MapredLocalTask.java | 5 +-
.../persistence/BytesBytesMultiHashMap.java | 17 +++++-
.../ql/exec/persistence/HashMapWrapper.java | 10 +++-
.../persistence/HybridHashTableContainer.java | 5 ++
.../persistence/MapJoinBytesTableContainer.java | 58 +++++++++++++++++++-
.../exec/persistence/MapJoinTableContainer.java | 3 +-
.../hive/ql/exec/tez/HashTableLoader.java | 42 ++++++++++++--
.../hadoop/hive/ql/exec/tez/TezProcessor.java | 11 +++-
.../vector/VectorAggregationBufferBatch.java | 4 +-
.../ql/exec/vector/VectorGroupByOperator.java | 2 +-
.../aggregates/VectorAggregateExpression.java | 2 +-
.../aggregates/VectorUDAFAvgDecimal.java | 2 +-
.../aggregates/VectorUDAFAvgTimestamp.java | 2 +-
.../aggregates/VectorUDAFBloomFilter.java | 4 +-
.../aggregates/VectorUDAFBloomFilterMerge.java | 2 +-
.../expressions/aggregates/VectorUDAFCount.java | 2 +-
.../aggregates/VectorUDAFCountMerge.java | 2 +-
.../aggregates/VectorUDAFCountStar.java | 2 +-
.../aggregates/VectorUDAFStdPopTimestamp.java | 2 +-
.../aggregates/VectorUDAFStdSampTimestamp.java | 2 +-
.../aggregates/VectorUDAFSumDecimal.java | 2 +-
.../aggregates/VectorUDAFVarPopTimestamp.java | 2 +-
.../aggregates/VectorUDAFVarSampTimestamp.java | 2 +-
.../fast/VectorMapJoinFastBytesHashMap.java | 5 ++
.../VectorMapJoinFastBytesHashMultiSet.java | 5 ++
.../fast/VectorMapJoinFastBytesHashSet.java | 5 ++
.../fast/VectorMapJoinFastBytesHashTable.java | 6 ++
.../fast/VectorMapJoinFastHashTable.java | 7 +++
.../fast/VectorMapJoinFastHashTableLoader.java | 47 +++++++++++++++-
.../mapjoin/fast/VectorMapJoinFastKeyStore.java | 11 +++-
.../fast/VectorMapJoinFastLongHashMap.java | 9 ++-
.../fast/VectorMapJoinFastLongHashMultiSet.java | 5 ++
.../fast/VectorMapJoinFastLongHashSet.java | 5 ++
.../fast/VectorMapJoinFastLongHashTable.java | 15 +++++
.../fast/VectorMapJoinFastMultiKeyHashMap.java | 5 ++
.../VectorMapJoinFastMultiKeyHashMultiSet.java | 4 ++
.../fast/VectorMapJoinFastMultiKeyHashSet.java | 5 +-
.../fast/VectorMapJoinFastStringHashMap.java | 9 +++
.../VectorMapJoinFastStringHashMultiSet.java | 8 +++
.../fast/VectorMapJoinFastStringHashSet.java | 8 +++
.../fast/VectorMapJoinFastTableContainer.java | 16 +++++-
.../fast/VectorMapJoinFastValueStore.java | 8 ++-
.../hashtable/VectorMapJoinHashTable.java | 3 +-
.../VectorMapJoinOptimizedHashSet.java | 5 ++
.../VectorMapJoinOptimizedHashTable.java | 9 +++
.../VectorMapJoinOptimizedStringHashSet.java | 8 +++
.../hive/ql/optimizer/ConvertJoinMapJoin.java | 3 +-
.../hive/ql/optimizer/MapJoinProcessor.java | 8 +--
.../calcite/translator/HiveOpConverter.java | 2 +-
.../physical/GenMRSkewJoinProcessor.java | 3 +-
.../physical/GenSparkSkewJoinProcessor.java | 3 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 2 +-
.../apache/hadoop/hive/ql/plan/JoinDesc.java | 17 +++++-
.../apache/hadoop/hive/ql/plan/MapJoinDesc.java | 12 ++--
.../ql/udf/generic/GenericUDAFComputeStats.java | 22 ++++----
.../TestMapJoinMemoryExhaustionHandler.java | 4 +-
.../apache/hadoop/hive/serde2/WriteBuffers.java | 25 ++++++++-
.../hadoop/hive/ql/util/JavaDataModel.java | 26 ++++-----
73 files changed, 509 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java b/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java
new file mode 100644
index 0000000..36ae56f
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+/**
+ * Interface that can be used to provide size estimates based on data structures held in memory for an object instance.
+ */
+public interface MemoryEstimate {
+ /**
+ * Returns estimated memory size based {@link org.apache.hadoop.hive.ql.util.JavaDataModel}
+ *
+ * @return estimated memory size in bytes
+ */
+ long getEstimatedMemorySize();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ea8485d..84398c6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3161,6 +3161,10 @@ public class HiveConf extends Configuration {
"hive.llap.mapjoin.memory.oversubscribe.factor amount of memory can be borrowed based on which mapjoin\n" +
"conversion decision will be made). This is only an upper bound. Lower bound is determined by number of\n" +
"executors and configured max concurrency."),
+ LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL("hive.llap.mapjoin.memory.monitor.check.interval", 100000L,
+ "Check memory usage of mapjoin hash tables after every interval of this many rows. If map join hash table\n" +
+ "memory usage exceeds (hive.auto.convert.join.noconditionaltask.size * hive.hash.table.inflation.factor)\n" +
+ "when running in LLAP, tasks will get killed and not retried. Set the value to 0 to disable this feature."),
LLAP_DAEMON_AM_REPORTER_MAX_THREADS("hive.llap.daemon.am-reporter.max.threads", 4,
"Maximum number of threads to be used for AM reporter. If this is lower than number of\n" +
"executors in llap daemon, it would be set to number of executors at runtime.",
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
index ff6e7ce..6cf8dbb 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
@@ -116,7 +116,7 @@ public class IncrementalObjectSizeEstimator {
addToProcessing(byType, stack, fieldObj, fieldClass);
}
}
- estimator.directSize = JavaDataModel.alignUp(
+ estimator.directSize = (int) JavaDataModel.alignUp(
estimator.directSize, memoryModel.memoryAlign());
}
}
@@ -454,7 +454,7 @@ public class IncrementalObjectSizeEstimator {
if (len != 0) {
int elementSize = getPrimitiveSize(e.field.getType().getComponentType());
arraySize += elementSize * len;
- arraySize = JavaDataModel.alignUp(arraySize, memoryModel.memoryAlign());
+ arraySize = (int) JavaDataModel.alignUp(arraySize, memoryModel.memoryAlign());
}
referencedSize += arraySize;
break;
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt
index 4393c3b..46cbb5b 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt
@@ -471,7 +471,7 @@ public class <ClassName> extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt
index 7468c2f..2261e1b 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt
@@ -442,7 +442,7 @@ public class <ClassName> extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt
index 57b7ea5..58d2d22 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt
@@ -458,7 +458,7 @@ public class <ClassName> extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt
index 749e97e..515692e 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt
@@ -441,7 +441,7 @@ public class <ClassName> extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt
index 9dfc147..c210e4c 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt
@@ -81,7 +81,7 @@ public class <ClassName> extends VectorAggregateExpression {
@Override
public int getVariableSize() {
JavaDataModel model = JavaDataModel.get();
- return model.lengthForByteArrayOfSize(bytes.length);
+ return (int) model.lengthForByteArrayOfSize(bytes.length);
}
@Override
@@ -388,7 +388,7 @@ public class <ClassName> extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt
index 32ecb34..074aefd 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt
@@ -443,7 +443,7 @@ public class <ClassName> extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt
index bd0f14d..a89ae0a 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt
@@ -433,7 +433,7 @@ public class <ClassName> extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object(),
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt
index dc9d4b1..1e3516b 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt
@@ -513,7 +513,7 @@ public class <ClassName> extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt
index 01062a9..b3ec7e9 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt
@@ -467,7 +467,7 @@ public class <ClassName> extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
@@ -488,4 +488,4 @@ public class <ClassName> extends VectorAggregateExpression {
public void setInputExpression(VectorExpression inputExpression) {
this.inputExpression = inputExpression;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java
new file mode 100644
index 0000000..4ad4f98
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.mapjoin;
+
+/**
+ * When this Error is thrown, better not retry.
+ */
+public class MapJoinMemoryExhaustionError extends Error {
+ private static final long serialVersionUID = 3678353959830506881L;
+ public MapJoinMemoryExhaustionError(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
deleted file mode 100644
index dbe00b6..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.mapjoin;
-
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-
-
-public class MapJoinMemoryExhaustionException extends HiveException {
- private static final long serialVersionUID = 3678353959830506881L;
- public MapJoinMemoryExhaustionException(String msg) {
- super(msg);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
index 7fc3226..d5e81e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
@@ -86,17 +86,17 @@ public class MapJoinMemoryExhaustionHandler {
*
* @param tableContainerSize currently table container size
* @param numRows number of rows processed
- * @throws MapJoinMemoryExhaustionException
+ * @throws MapJoinMemoryExhaustionError
*/
public void checkMemoryStatus(long tableContainerSize, long numRows)
- throws MapJoinMemoryExhaustionException {
+ throws MapJoinMemoryExhaustionError {
long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
double percentage = (double) usedMemory / (double) maxHeapSize;
String msg = Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t"
+ tableContainerSize + "\tMemory usage:\t" + usedMemory + "\tpercentage:\t" + percentageNumberFormat.format(percentage);
console.printInfo(msg);
if(percentage > maxMemoryUsage) {
- throw new MapJoinMemoryExhaustionException(msg);
+ throw new MapJoinMemoryExhaustionError(msg);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
index 595d1bd..c5d4f9a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
@@ -60,7 +60,7 @@ import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -69,7 +69,6 @@ import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.session.OperationLog;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -385,7 +384,7 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab
+ Utilities.showTime(elapsed) + " sec.");
} catch (Throwable throwable) {
if (throwable instanceof OutOfMemoryError
- || (throwable instanceof MapJoinMemoryExhaustionException)) {
+ || (throwable instanceof MapJoinMemoryExhaustionError)) {
l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable);
return 3;
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
index 04e24bd..360b639 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
@@ -24,6 +24,8 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.common.MemoryEstimate;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -46,7 +48,7 @@ import com.google.common.annotations.VisibleForTesting;
* Initially inspired by HPPC LongLongOpenHashMap; however, the code is almost completely reworked
* and there's very little in common left save for quadratic probing (and that with some changes).
*/
-public final class BytesBytesMultiHashMap {
+public final class BytesBytesMultiHashMap implements MemoryEstimate {
public static final Logger LOG = LoggerFactory.getLogger(BytesBytesMultiHashMap.class);
/*
@@ -521,7 +523,18 @@ public final class BytesBytesMultiHashMap {
* @return number of bytes
*/
public long memorySize() {
- return writeBuffers.size() + refs.length * 8 + 100;
+ return getEstimatedMemorySize();
+ }
+
+ @Override
+ public long getEstimatedMemorySize() {
+ JavaDataModel jdm = JavaDataModel.get();
+ long size = 0;
+ size += writeBuffers.getEstimatedMemorySize();
+ size += jdm.lengthForLongArrayOfSize(refs.length);
+ // 11 primitive1 fields, 2 refs above with alignment
+ size += JavaDataModel.alignUp(15 * jdm.primitive1(), jdm.memoryAlign());
+ return size;
}
public void seal() {
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
index a3bccc6..adf1a90 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.io.Writable;
public class HashMapWrapper extends AbstractMapJoinTableContainer implements Serializable {
private static final long serialVersionUID = 1L;
protected static final Logger LOG = LoggerFactory.getLogger(HashMapWrapper.class);
-
+ private static final long DEFAULT_HASHMAP_ENTRY_SIZE = 1024L;
// default threshold for using main memory based HashMap
private static final int THRESHOLD = 1000000;
private static final float LOADFACTOR = 0.75f;
@@ -140,6 +140,14 @@ public class HashMapWrapper extends AbstractMapJoinTableContainer implements Ser
return new GetAdaptor(keyTypeFromLoader);
}
+ @Override
+ public long getEstimatedMemorySize() {
+ // TODO: Key and Values are Object[] which can be eagerly deserialized or lazily deserialized. To accurately
+ // estimate the entry size, every possible Objects in Key, Value should implement MemoryEstimate interface which
+ // is very intrusive. So assuming default entry size here.
+ return size() * DEFAULT_HASHMAP_ENTRY_SIZE;
+ }
+
private class GetAdaptor implements ReusableGetAdaptor {
private Object[] currentKey;
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index 04e89e8..6523f00 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -118,6 +118,11 @@ public class HybridHashTableContainer
private final String spillLocalDirs;
+ @Override
+ public long getEstimatedMemorySize() {
+ return memoryUsed;
+ }
+
/**
* This class encapsulates the triplet together since they are closely related to each other
* The triplet: hashmap (either in memory or on disk), small table container, big table container
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
index c86e5f5..014d17a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.MemoryEstimate;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
import org.apache.hadoop.hive.ql.exec.JoinUtil;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper;
import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
@@ -72,6 +74,11 @@ public class MapJoinBytesTableContainer
implements MapJoinTableContainer, MapJoinTableContainerDirectAccess {
private static final Logger LOG = LoggerFactory.getLogger(MapJoinTableContainer.class);
+ // TODO: For object inspector fields, assigning 16KB for now. To better estimate the memory size every
+ // object inspectors have to implement MemoryEstimate interface which is a lot of change with little benefit compared
+ // to writing an instrumentation agent for object size estimation
+ public static final long DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE = 16 * 1024L;
+
private final BytesBytesMultiHashMap hashMap;
/** The OI used to deserialize values. We never deserialize keys. */
private LazyBinaryStructObjectInspector internalValueOi;
@@ -147,7 +154,7 @@ public class MapJoinBytesTableContainer
this.notNullMarkers = notNullMarkers;
}
- public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource {
+ public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource, MemoryEstimate {
void setKeyValue(Writable key, Writable val) throws SerDeException;
/** Get hash value from the key. */
int getHashFromKey() throws SerDeException;
@@ -216,6 +223,22 @@ public class MapJoinBytesTableContainer
public int getHashFromKey() throws SerDeException {
throw new UnsupportedOperationException("Not supported for MapJoinBytesTableContainer");
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ JavaDataModel jdm = JavaDataModel.get();
+ long size = 0;
+ size += keySerDe == null ? 0 : jdm.object();
+ size += valSerDe == null ? 0 : jdm.object();
+ size += keySoi == null ? 0 : DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
+ size += valSoi == null ? 0 : DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
+ size += keyOis == null ? 0 : jdm.arrayList() + keyOis.size() * DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
+ size += valOis == null ? 0 : jdm.arrayList() + valOis.size() * DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
+ size += keyObjs == null ? 0 : jdm.array() + keyObjs.length * jdm.object();
+ size += valObjs == null ? 0 : jdm.array() + valObjs.length * jdm.object();
+ size += jdm.primitive1();
+ return size;
+ }
}
static class LazyBinaryKvWriter implements KeyValueHelper {
@@ -319,6 +342,15 @@ public class MapJoinBytesTableContainer
aliasFilter &= filterGetter.getShort();
return aliasFilter;
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ JavaDataModel jdm = JavaDataModel.get();
+ long size = 0;
+ size += (4 * jdm.object());
+ size += jdm.primitive1();
+ return size;
+ }
}
/*
@@ -361,6 +393,15 @@ public class MapJoinBytesTableContainer
int keyLength = key.getLength();
return HashCodeUtil.murmurHash(keyBytes, 0, keyLength);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ JavaDataModel jdm = JavaDataModel.get();
+ long size = 0;
+ size += jdm.object() + (key == null ? 0 : key.getCapacity());
+ size += jdm.object() + (val == null ? 0 : val.getCapacity());
+ return size;
+ }
}
@Override
@@ -768,4 +809,19 @@ public class MapJoinBytesTableContainer
public int size() {
return hashMap.size();
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ JavaDataModel jdm = JavaDataModel.get();
+ long size = 0;
+ size += hashMap.getEstimatedMemorySize();
+ size += directWriteHelper == null ? 0 : directWriteHelper.getEstimatedMemorySize();
+ size += writeHelper == null ? 0 : writeHelper.getEstimatedMemorySize();
+ size += sortableSortOrders == null ? 0 : jdm.lengthForBooleanArrayOfSize(sortableSortOrders.length);
+ size += nullMarkers == null ? 0 : jdm.lengthForByteArrayOfSize(nullMarkers.length);
+ size += notNullMarkers == null ? 0 : jdm.lengthForByteArrayOfSize(notNullMarkers.length);
+ size += jdm.arrayList(); // empty list
+ size += DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
+ return size;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
index 6d71fef..5ca5ff6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.common.MemoryEstimate;
import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper;
import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
@@ -31,7 +32,7 @@ import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Writable;
-public interface MapJoinTableContainer {
+public interface MapJoinTableContainer extends MemoryEstimate {
/**
* Retrieve rows from hashtable key by key, one key at a time, w/o copying the structures
* for each key. "Old" HashMapWrapper will still create/retrieve new objects for java HashMap;
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
index 7b13e90..7011d23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -146,7 +147,20 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
}
nwayConf.setNumberOfPartitions(numPartitions);
}
-
+ final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR);
+ final long memoryCheckInterval = HiveConf.getLongVar(hconf,
+ HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL);
+ final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
+ long numEntries = 0;
+ long noCondTaskSize = desc.getNoConditionalTaskSize();
+ boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0;
+ if (!doMemCheck) {
+ LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " +
+ "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval);
+ } else {
+ LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ",
+ noCondTaskSize, inflationFactor);
+ }
for (int pos = 0; pos < mapJoinTables.length; pos++) {
if (pos == desc.getPosBigTable()) {
continue;
@@ -205,12 +219,32 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
tableContainer = new HashMapWrapper(hconf, keyCount);
}
- LOG.info("Using tableContainer " + tableContainer.getClass().getSimpleName());
+ LOG.info("Using tableContainer: " + tableContainer.getClass().getSimpleName());
tableContainer.setSerde(keyCtx, valCtx);
while (kvReader.next()) {
- tableContainer.putRow(
- (Writable)kvReader.getCurrentKey(), (Writable)kvReader.getCurrentValue());
+ tableContainer.putRow((Writable) kvReader.getCurrentKey(), (Writable) kvReader.getCurrentValue());
+ numEntries++;
+ if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) {
+ final long estMemUsage = tableContainer.getEstimatedMemorySize();
+ final long threshold = (long) (inflationFactor * noCondTaskSize);
+ // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory
+ // available for container/executor
+ final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable());
+ if (estMemUsage > effectiveThreshold) {
+ String msg = "Hash table loading exceeded memory limits." +
+ " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize +
+ " inflationFactor: " + inflationFactor + " threshold: " + threshold +
+ " effectiveThreshold: " + effectiveThreshold;
+ LOG.error(msg);
+ throw new MapJoinMemoryExhaustionError(msg);
+ } else {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Checking hash table loader memory usage.. numEntries: {} estimatedMemoryUsage: {} " +
+ "effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold);
+ }
+ }
+ }
}
tableContainer.seal();
LOG.info("Finished loading hashtable using " + tableContainer.getClass() + ". Small table position: " + pos);
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index 486d43a..4242262 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
+import org.apache.tez.runtime.api.TaskFailureType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -41,6 +43,8 @@ import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
+import com.google.common.base.Throwables;
+
/**
* Hive processor for Tez that forms the vertices in Tez and processes the data.
* Does what ExecMapper and ExecReducer does for hive in MR framework.
@@ -189,8 +193,11 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
} catch (Throwable t) {
originalThrowable = t;
} finally {
- if (originalThrowable != null && originalThrowable instanceof Error) {
- LOG.error(StringUtils.stringifyException(originalThrowable));
+ if (originalThrowable != null && (originalThrowable instanceof Error ||
+ Throwables.getRootCause(originalThrowable) instanceof Error)) {
+ LOG.error("Cannot recover from this FATAL error", StringUtils.stringifyException(originalThrowable));
+ getContext().reportFailure(TaskFailureType.FATAL, originalThrowable,
+ "Cannot recover from this error");
throw new RuntimeException(originalThrowable);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
index 630046d..84128e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
@@ -57,7 +57,7 @@ public class VectorAggregationBufferBatch {
/**
* Memory consumed by a set of aggregation buffers
*/
- private int aggregatorsFixedSize;
+ private long aggregatorsFixedSize;
/**
* Array of indexes for aggregators that have variable size
@@ -76,7 +76,7 @@ public class VectorAggregationBufferBatch {
* Returns the fixed size consumed by the aggregation buffers
* @return
*/
- public int getAggregatorsFixedSize() {
+ public long getAggregatorsFixedSize() {
return aggregatorsFixedSize;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index 5b4c7c3..30916a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -286,7 +286,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
/**
* Total per hashtable entry fixed memory (does not depend on key/agg values).
*/
- private int fixedHashEntrySize;
+ private long fixedHashEntrySize;
/**
* Average per hashtable entry variable size memory (depends on key/agg value).
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
index 0866f63..7ab4473 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
@@ -52,7 +52,7 @@ public abstract class VectorAggregateExpression implements Serializable {
public abstract Object evaluateOutput(AggregationBuffer agg) throws HiveException;
public abstract ObjectInspector getOutputObjectInspector();
- public abstract int getAggregationBufferFixedSize();
+ public abstract long getAggregationBufferFixedSize();
public boolean hasVariableSize() {
return false;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java
index 74e25ae..4aac9d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java
@@ -492,7 +492,7 @@ public class VectorUDAFAvgDecimal extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java
index 483d9dc..365dcf6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java
@@ -464,7 +464,7 @@ public class VectorUDAFAvgTimestamp extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
index 2139eae..52b05ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
@@ -383,7 +383,7 @@ public class VectorUDAFBloomFilter extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
if (bitSetSize < 0) {
// Not pretty, but we need a way to get the size
try {
@@ -396,7 +396,7 @@ public class VectorUDAFBloomFilter extends VectorAggregateExpression {
// BloomFilter: object(BitSet: object(data: long[]), numBits: int, numHashFunctions: int)
JavaDataModel model = JavaDataModel.get();
- int bloomFilterSize = JavaDataModel.alignUp(model.object() + model.lengthForLongArrayOfSize(bitSetSize),
+ long bloomFilterSize = JavaDataModel.alignUp(model.object() + model.lengthForLongArrayOfSize(bitSetSize),
model.memoryAlign());
return JavaDataModel.alignUp(
model.object() + bloomFilterSize + model.primitive1() + model.primitive1(),
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
index d2446d5..b986eb4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
@@ -339,7 +339,7 @@ public class VectorUDAFBloomFilterMerge extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
if (aggBufferSize < 0) {
// Not pretty, but we need a way to get the size
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
index 494febc..cadb6dd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
@@ -259,7 +259,7 @@ public class VectorUDAFCount extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java
index dec88cb..c489f8f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java
@@ -385,7 +385,7 @@ public class VectorUDAFCountMerge extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java
index 337ba0a..3b66030 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java
@@ -142,7 +142,7 @@ public class VectorUDAFCountStar extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java
index 8cd3506..5388d37 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java
@@ -508,7 +508,7 @@ public class VectorUDAFStdPopTimestamp extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java
index 61d6977..1769dc0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java
@@ -508,7 +508,7 @@ public class VectorUDAFStdSampTimestamp extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java
index b10f66f..a37e3f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java
@@ -431,7 +431,7 @@ public class VectorUDAFSumDecimal extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object(),
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java
index 2709b07..61cdeaa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java
@@ -508,7 +508,7 @@ public class VectorUDAFVarPopTimestamp extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java
index 03dce1e..c375461 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java
@@ -508,7 +508,7 @@ public class VectorUDAFVarSampTimestamp extends VectorAggregateExpression {
}
@Override
- public int getAggregationBufferFixedSize() {
+ public long getAggregationBufferFixedSize() {
JavaDataModel model = JavaDataModel.get();
return JavaDataModel.alignUp(
model.object() +
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
index 6242daf..b5eab8b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
@@ -107,4 +107,9 @@ public abstract class VectorMapJoinFastBytesHashMap
// Share the same write buffers with our value store.
keyStore = new VectorMapJoinFastKeyStore(valueStore.writeBuffers());
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize() + valueStore.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
index 1a41961..e779762 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
@@ -97,4 +97,9 @@ public abstract class VectorMapJoinFastBytesHashMultiSet
keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
index 331867c..d493319 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
@@ -84,4 +84,9 @@ public abstract class VectorMapJoinFastBytesHashSet
keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
index b93e977..10bc902 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashTable;
@@ -218,4 +219,9 @@ public abstract class VectorMapJoinFastBytesHashTable
super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
allocateBucketArray();
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize() + JavaDataModel.get().lengthForLongArrayOfSize(slotTriples.length);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
index b6db3bc..1f182ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable;
@@ -88,4 +89,10 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab
public int size() {
return keysAssigned;
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ JavaDataModel jdm = JavaDataModel.get();
+ return JavaDataModel.alignUp(10L * jdm.primitive1() + jdm.primitive2(), jdm.memoryAlign());
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
index 49ecdd1..b015e43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Map;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -28,7 +29,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
-import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.exec.tez.TezContext;
@@ -68,6 +68,21 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
Map<Integer, String> parentToInput = desc.getParentToInput();
Map<Integer, Long> parentKeyCounts = desc.getParentKeyCounts();
+ final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR);
+ final long memoryCheckInterval = HiveConf.getLongVar(hconf,
+ HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL);
+ final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
+ long numEntries = 0;
+ long noCondTaskSize = desc.getNoConditionalTaskSize();
+ boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0;
+ if (!doMemCheck) {
+ LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " +
+ "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval);
+ } else {
+ LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ",
+ noCondTaskSize, inflationFactor);
+ }
+
for (int pos = 0; pos < mapJoinTables.length; pos++) {
if (pos == desc.getPosBigTable()) {
continue;
@@ -93,15 +108,41 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
+ LOG.info("Using vectorMapJoinFastTableContainer: " + vectorMapJoinFastTableContainer.getClass().getSimpleName());
+
vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
while (kvReader.next()) {
vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
(BytesWritable)kvReader.getCurrentValue());
+ numEntries++;
+ if (doMemCheck && numEntries >= memoryCheckInterval) {
+ if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) {
+ final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
+ final long threshold = (long) (inflationFactor * noCondTaskSize);
+ // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory
+ // available for container/executor
+ final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable());
+ if (estMemUsage > effectiveThreshold) {
+ String msg = "VectorMapJoin Hash table loading exceeded memory limits." +
+ " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize +
+ " inflationFactor: " + inflationFactor + " threshold: " + threshold +
+ " effectiveThreshold: " + effectiveThreshold;
+ LOG.error(msg);
+ throw new MapJoinMemoryExhaustionError(msg);
+ } else {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Checking vector mapjoin hash table loader memory usage.. numEntries: {} " +
+ "estimatedMemoryUsage: {} effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold);
+ }
+ }
+ }
+ }
}
vectorMapJoinFastTableContainer.seal();
- mapJoinTables[pos] = (MapJoinTableContainer) vectorMapJoinFastTableContainer;
-
+ mapJoinTables[pos] = vectorMapJoinFastTableContainer;
+ LOG.info("Finished loading hashtable using " + vectorMapJoinFastTableContainer.getClass() +
+ ". Small table position: " + pos);
} catch (IOException e) {
throw new HiveException(e);
} catch (SerDeException e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
index be51693..3e9ff84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
@@ -18,13 +18,14 @@
package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+import org.apache.hadoop.hive.common.MemoryEstimate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.serde2.WriteBuffers;
// Optimized for sequential key lookup.
-public class VectorMapJoinFastKeyStore {
+public class VectorMapJoinFastKeyStore implements MemoryEstimate {
private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastKeyStore.class.getName());
@@ -165,4 +166,12 @@ public class VectorMapJoinFastKeyStore {
this.writeBuffers = writeBuffers;
unsafeReadPos = new WriteBuffers.Position();
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ long size = 0;
+ size += writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize();
+ size += unsafeReadPos == null ? 0 : unsafeReadPos.getEstimatedMemorySize();
+ return size;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
index 6fe98f9..d4847b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
import java.io.IOException;
+import org.apache.hadoop.hive.common.MemoryEstimate;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.JoinUtil;
@@ -37,7 +39,7 @@ import com.google.common.annotations.VisibleForTesting;
*/
public class VectorMapJoinFastLongHashMap
extends VectorMapJoinFastLongHashTable
- implements VectorMapJoinLongHashMap {
+ implements VectorMapJoinLongHashMap, MemoryEstimate {
public static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMap.class);
@@ -112,4 +114,9 @@ public class VectorMapJoinFastLongHashMap
initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
valueStore = new VectorMapJoinFastValueStore(writeBuffersSize);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize() + valueStore.getEstimatedMemorySize();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
index 9140aee..566cfa2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
@@ -100,4 +100,9 @@ public class VectorMapJoinFastLongHashMultiSet
super(minMaxEnabled, isOuterJoin, hashTableKeyType,
initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
index d3efb11..fb7ae62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
@@ -96,4 +96,9 @@ public class VectorMapJoinFastLongHashSet
super(minMaxEnabled, isOuterJoin, hashTableKeyType,
initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
index 8bfa07c..54e667c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.JoinUtil;
@@ -280,4 +281,18 @@ public abstract class VectorMapJoinFastLongHashTable
min = Long.MAX_VALUE;
max = Long.MIN_VALUE;
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ JavaDataModel jdm = JavaDataModel.get();
+ long size = super.getEstimatedMemorySize();
+ size += slotPairs == null ? 0 : jdm.lengthForLongArrayOfSize(slotPairs.length);
+ size += (2 * jdm.primitive2());
+ size += (2 * jdm.primitive1());
+ size += jdm.object();
+ // adding 16KB constant memory for keyBinarySortableDeserializeRead as the rabit hole is deep to implement
+ // MemoryEstimate interface, also it is constant overhead
+ size += (16 * 1024L);
+ return size;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
index add4788..eb08aa9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
@@ -53,4 +53,9 @@ public class VectorMapJoinFastMultiKeyHashMap
int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) {
super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
index faefdbb..56964bc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
@@ -52,4 +52,8 @@ public class VectorMapJoinFastMultiKeyHashMultiSet
super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
}
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
index 5328910..46bafe0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
@@ -52,5 +52,8 @@ public class VectorMapJoinFastMultiKeyHashSet
super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
}
-
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
index f13034f..d04590a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
@@ -43,4 +43,13 @@ public class VectorMapJoinFastStringHashMap extends VectorMapJoinFastBytesHashMa
super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ long size = 0;
+ // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement
+ // MemoryEstimate interface, also it is constant overhead
+ size += (16 * 1024L);
+ return super.getEstimatedMemorySize() + size;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
index 53ad7b4..b24bfdf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
@@ -43,4 +43,12 @@ public class VectorMapJoinFastStringHashMultiSet extends VectorMapJoinFastBytesH
super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement
+ // MemoryEstimate interface, also it is constant overhead
+ long size = (16 * 1024L);
+ return super.getEstimatedMemorySize() + size;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
index 723c729..75fae25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
@@ -43,4 +43,12 @@ public class VectorMapJoinFastStringHashSet extends VectorMapJoinFastBytesHashSe
super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement
+ // MemoryEstimate interface, also it is constant overhead
+ long size = (16 * 1024L);
+ return super.getEstimatedMemorySize() + size;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
index 05f1cf1..2fe4b93 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -26,7 +27,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
-import org.apache.hadoop.hive.ql.exec.tez.HashTableLoader;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinTableContainer;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.tez.runtime.library.api.KeyValueReader;
/**
* HashTableLoader for Tez constructs the hashtable from records read from
@@ -46,7 +45,7 @@ import org.apache.tez.runtime.library.api.KeyValueReader;
*/
public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContainer {
- private static final Logger LOG = LoggerFactory.getLogger(HashTableLoader.class.getName());
+ private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastTableContainer.class.getName());
private final MapJoinDesc desc;
private final Configuration hconf;
@@ -219,6 +218,17 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
}
@Override
+ public long getEstimatedMemorySize() {
+ JavaDataModel jdm = JavaDataModel.get();
+ long size = 0;
+ size += vectorMapJoinFastHashTable.getEstimatedMemorySize();
+ size += (4 * jdm.primitive1());
+ size += (2 * jdm.object());
+ size += (jdm.primitive2());
+ return size;
+ }
+
+ @Override
public void setSerde(MapJoinObjectSerDeContext keyCtx, MapJoinObjectSerDeContext valCtx)
throws SerDeException {
// Do nothing in this case.
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java
index f9c5b34..3cd06e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+import org.apache.hadoop.hive.common.MemoryEstimate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
@@ -30,7 +31,7 @@ import com.google.common.base.Preconditions;
// Supports random access.
-public class VectorMapJoinFastValueStore {
+public class VectorMapJoinFastValueStore implements MemoryEstimate {
private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastValueStore.class.getName());
@@ -113,6 +114,11 @@ public class VectorMapJoinFastValueStore {
return writeBuffers;
}
+ @Override
+ public long getEstimatedMemorySize() {
+ return writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize();
+ }
+
public static class HashMapResult extends VectorMapJoinHashMapResult {
private VectorMapJoinFastValueStore valueStore;
http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
index c7e585c..9cc9ad4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable;
import java.io.IOException;
+import org.apache.hadoop.hive.common.MemoryEstimate;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.BytesWritable;
@@ -28,7 +29,7 @@ import org.apache.hadoop.io.BytesWritable;
* Root interface for a vector map join hash table (which could be a hash map, hash multi-set, or
* hash set).
*/
-public interface VectorMapJoinHashTable {
+public interface VectorMapJoinHashTable extends MemoryEstimate {
/*