You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:31:52 UTC
[05/51] [partial] hive git commit: HIVE-14671 : merge master into
hive-14535 (Wei Zheng)
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
index 49ecdd1..b015e43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Map;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -28,7 +29,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
-import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.exec.tez.TezContext;
@@ -68,6 +68,21 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
Map<Integer, String> parentToInput = desc.getParentToInput();
Map<Integer, Long> parentKeyCounts = desc.getParentKeyCounts();
+ final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR);
+ final long memoryCheckInterval = HiveConf.getLongVar(hconf,
+ HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL);
+ final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
+ long numEntries = 0;
+ long noCondTaskSize = desc.getNoConditionalTaskSize();
+ boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0;
+ if (!doMemCheck) {
+ LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " +
+ "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval);
+ } else {
+ LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ",
+ noCondTaskSize, inflationFactor);
+ }
+
for (int pos = 0; pos < mapJoinTables.length; pos++) {
if (pos == desc.getPosBigTable()) {
continue;
@@ -93,15 +108,41 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
+ LOG.info("Using vectorMapJoinFastTableContainer: " + vectorMapJoinFastTableContainer.getClass().getSimpleName());
+
vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
while (kvReader.next()) {
vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
(BytesWritable)kvReader.getCurrentValue());
+ numEntries++;
+ if (doMemCheck && numEntries >= memoryCheckInterval) {
+ if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) {
+ final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
+ final long threshold = (long) (inflationFactor * noCondTaskSize);
+ // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory
+ // available for container/executor
+ final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable());
+ if (estMemUsage > effectiveThreshold) {
+ String msg = "VectorMapJoin Hash table loading exceeded memory limits." +
+ " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize +
+ " inflationFactor: " + inflationFactor + " threshold: " + threshold +
+ " effectiveThreshold: " + effectiveThreshold;
+ LOG.error(msg);
+ throw new MapJoinMemoryExhaustionError(msg);
+ } else {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Checking vector mapjoin hash table loader memory usage.. numEntries: {} " +
+ "estimatedMemoryUsage: {} effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold);
+ }
+ }
+ }
+ }
}
vectorMapJoinFastTableContainer.seal();
- mapJoinTables[pos] = (MapJoinTableContainer) vectorMapJoinFastTableContainer;
-
+ mapJoinTables[pos] = vectorMapJoinFastTableContainer;
+ LOG.info("Finished loading hashtable using " + vectorMapJoinFastTableContainer.getClass() +
+ ". Small table position: " + pos);
} catch (IOException e) {
throw new HiveException(e);
} catch (SerDeException e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
index be51693..3e9ff84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
@@ -18,13 +18,14 @@
package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+import org.apache.hadoop.hive.common.MemoryEstimate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.serde2.WriteBuffers;
// Optimized for sequential key lookup.
-public class VectorMapJoinFastKeyStore {
+public class VectorMapJoinFastKeyStore implements MemoryEstimate {
private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastKeyStore.class.getName());
@@ -165,4 +166,12 @@ public class VectorMapJoinFastKeyStore {
this.writeBuffers = writeBuffers;
unsafeReadPos = new WriteBuffers.Position();
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ long size = 0;
+ size += writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize();
+ size += unsafeReadPos == null ? 0 : unsafeReadPos.getEstimatedMemorySize();
+ return size;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
index 6fe98f9..d4847b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
import java.io.IOException;
+import org.apache.hadoop.hive.common.MemoryEstimate;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.JoinUtil;
@@ -37,7 +39,7 @@ import com.google.common.annotations.VisibleForTesting;
*/
public class VectorMapJoinFastLongHashMap
extends VectorMapJoinFastLongHashTable
- implements VectorMapJoinLongHashMap {
+ implements VectorMapJoinLongHashMap, MemoryEstimate {
public static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMap.class);
@@ -112,4 +114,9 @@ public class VectorMapJoinFastLongHashMap
initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
valueStore = new VectorMapJoinFastValueStore(writeBuffersSize);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize() + valueStore.getEstimatedMemorySize();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
index 9140aee..566cfa2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
@@ -100,4 +100,9 @@ public class VectorMapJoinFastLongHashMultiSet
super(minMaxEnabled, isOuterJoin, hashTableKeyType,
initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
index d3efb11..fb7ae62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
@@ -96,4 +96,9 @@ public class VectorMapJoinFastLongHashSet
super(minMaxEnabled, isOuterJoin, hashTableKeyType,
initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
index 8bfa07c..54e667c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.JoinUtil;
@@ -280,4 +281,18 @@ public abstract class VectorMapJoinFastLongHashTable
min = Long.MAX_VALUE;
max = Long.MIN_VALUE;
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ JavaDataModel jdm = JavaDataModel.get();
+ long size = super.getEstimatedMemorySize();
+ size += slotPairs == null ? 0 : jdm.lengthForLongArrayOfSize(slotPairs.length);
+ size += (2 * jdm.primitive2());
+ size += (2 * jdm.primitive1());
+ size += jdm.object();
+ // adding 16KB constant memory for keyBinarySortableDeserializeRead as the rabit hole is deep to implement
+ // MemoryEstimate interface, also it is constant overhead
+ size += (16 * 1024L);
+ return size;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
index add4788..eb08aa9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
@@ -53,4 +53,9 @@ public class VectorMapJoinFastMultiKeyHashMap
int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) {
super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
index faefdbb..56964bc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
@@ -52,4 +52,8 @@ public class VectorMapJoinFastMultiKeyHashMultiSet
super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
}
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
index 5328910..46bafe0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
@@ -52,5 +52,8 @@ public class VectorMapJoinFastMultiKeyHashSet
super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
}
-
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
index f13034f..d04590a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
@@ -43,4 +43,13 @@ public class VectorMapJoinFastStringHashMap extends VectorMapJoinFastBytesHashMa
super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ long size = 0;
+ // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement
+ // MemoryEstimate interface, also it is constant overhead
+ size += (16 * 1024L);
+ return super.getEstimatedMemorySize() + size;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
index 53ad7b4..b24bfdf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
@@ -43,4 +43,12 @@ public class VectorMapJoinFastStringHashMultiSet extends VectorMapJoinFastBytesH
super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement
+ // MemoryEstimate interface, also it is constant overhead
+ long size = (16 * 1024L);
+ return super.getEstimatedMemorySize() + size;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
index 723c729..75fae25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
@@ -43,4 +43,12 @@ public class VectorMapJoinFastStringHashSet extends VectorMapJoinFastBytesHashSe
super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement
+ // MemoryEstimate interface, also it is constant overhead
+ long size = (16 * 1024L);
+ return super.getEstimatedMemorySize() + size;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
index 05f1cf1..2fe4b93 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -26,7 +27,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
-import org.apache.hadoop.hive.ql.exec.tez.HashTableLoader;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinTableContainer;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.tez.runtime.library.api.KeyValueReader;
/**
* HashTableLoader for Tez constructs the hashtable from records read from
@@ -46,7 +45,7 @@ import org.apache.tez.runtime.library.api.KeyValueReader;
*/
public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContainer {
- private static final Logger LOG = LoggerFactory.getLogger(HashTableLoader.class.getName());
+ private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastTableContainer.class.getName());
private final MapJoinDesc desc;
private final Configuration hconf;
@@ -219,6 +218,17 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
}
@Override
+ public long getEstimatedMemorySize() {
+ JavaDataModel jdm = JavaDataModel.get();
+ long size = 0;
+ size += vectorMapJoinFastHashTable.getEstimatedMemorySize();
+ size += (4 * jdm.primitive1());
+ size += (2 * jdm.object());
+ size += (jdm.primitive2());
+ return size;
+ }
+
+ @Override
public void setSerde(MapJoinObjectSerDeContext keyCtx, MapJoinObjectSerDeContext valCtx)
throws SerDeException {
// Do nothing in this case.
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java
index f9c5b34..3cd06e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+import org.apache.hadoop.hive.common.MemoryEstimate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
@@ -30,7 +31,7 @@ import com.google.common.base.Preconditions;
// Supports random access.
-public class VectorMapJoinFastValueStore {
+public class VectorMapJoinFastValueStore implements MemoryEstimate {
private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastValueStore.class.getName());
@@ -113,6 +114,11 @@ public class VectorMapJoinFastValueStore {
return writeBuffers;
}
+ @Override
+ public long getEstimatedMemorySize() {
+ return writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize();
+ }
+
public static class HashMapResult extends VectorMapJoinHashMapResult {
private VectorMapJoinFastValueStore valueStore;
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
index c7e585c..9cc9ad4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable;
import java.io.IOException;
+import org.apache.hadoop.hive.common.MemoryEstimate;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.BytesWritable;
@@ -28,7 +29,7 @@ import org.apache.hadoop.io.BytesWritable;
* Root interface for a vector map join hash table (which could be a hash map, hash multi-set, or
* hash set).
*/
-public interface VectorMapJoinHashTable {
+public interface VectorMapJoinHashTable extends MemoryEstimate {
/*
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java
index 93a89d7..1560807 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java
@@ -75,4 +75,9 @@ public class VectorMapJoinOptimizedHashSet
MapJoinTableContainer originalTableContainer, ReusableGetAdaptor hashMapRowGetter) {
super(originalTableContainer, hashMapRowGetter);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ return super.getEstimatedMemorySize();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
index 5fe7861..5275e1a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.optimized;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.JoinUtil;
@@ -96,4 +97,12 @@ public abstract class VectorMapJoinOptimizedHashTable implements VectorMapJoinHa
public int size() {
return originalTableContainer.size();
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ long size = 0;
+ size += originalTableContainer == null ? 0 : originalTableContainer.getEstimatedMemorySize();
+ size += (2 * JavaDataModel.get().object());
+ return size;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java
index f921b9c..4b46ce0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java
@@ -60,4 +60,12 @@ public class VectorMapJoinOptimizedStringHashSet
super(originalTableContainer, hashMapRowGetter);
stringCommon = new VectorMapJoinOptimizedStringCommon(isOuterJoin);
}
+
+ @Override
+ public long getEstimatedMemorySize() {
+ // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement
+ // MemoryEstimate interface, also it is constant overhead
+ long size = (16 * 1024L);
+ return super.getEstimatedMemorySize() + size;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
index 42ca4b7..fc5aea5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
@@ -102,36 +102,29 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re
//---------------------------------------------------------------------------
// Whether there is to be a tag added to the end of each key and the tag value.
- private transient boolean reduceSkipTag;
- private transient byte reduceTagByte;
+ protected transient boolean reduceSkipTag;
+ protected transient byte reduceTagByte;
// Binary sortable key serializer.
protected transient BinarySortableSerializeWrite keyBinarySortableSerializeWrite;
- // The serialized all null key and its hash code.
- private transient byte[] nullBytes;
- private transient int nullKeyHashCode;
-
// Lazy binary value serializer.
- private transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
+ protected transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
// This helper object serializes LazyBinary format reducer values from columns of a row
// in a vectorized row batch.
- private transient VectorSerializeRow<LazyBinarySerializeWrite> valueVectorSerializeRow;
+ protected transient VectorSerializeRow<LazyBinarySerializeWrite> valueVectorSerializeRow;
// The output buffer used to serialize a value into.
- private transient Output valueOutput;
+ protected transient Output valueOutput;
// The hive key and bytes writable value needed to pass the key and value to the collector.
- private transient HiveKey keyWritable;
- private transient BytesWritable valueBytesWritable;
+ protected transient HiveKey keyWritable;
+ protected transient BytesWritable valueBytesWritable;
// Where to write our key and value pairs.
private transient OutputCollector out;
- // The object that determines equal key series.
- protected transient VectorKeySeriesSerialized serializedKeySeries;
-
private transient long numRows = 0;
private transient long cntr = 1;
private transient long logEveryNRows = 0;
@@ -158,6 +151,8 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re
VectorizationContext vContext, OperatorDesc conf) throws HiveException {
this(ctx);
+ LOG.info("VectorReduceSinkCommonOperator constructor");
+
ReduceSinkDesc desc = (ReduceSinkDesc) conf;
this.conf = desc;
vectorDesc = (VectorReduceSinkDesc) desc.getVectorDesc();
@@ -247,6 +242,46 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
+ if (isLogDebugEnabled) {
+ LOG.debug("useUniformHash " + vectorReduceSinkInfo.getUseUniformHash());
+
+ LOG.debug("reduceSinkKeyColumnMap " +
+ (vectorReduceSinkInfo.getReduceSinkKeyColumnMap() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkKeyColumnMap())));
+ LOG.debug("reduceSinkKeyTypeInfos " +
+ (vectorReduceSinkInfo.getReduceSinkKeyTypeInfos() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkKeyTypeInfos())));
+ LOG.debug("reduceSinkKeyColumnVectorTypes " +
+ (vectorReduceSinkInfo.getReduceSinkKeyColumnVectorTypes() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkKeyColumnVectorTypes())));
+ LOG.debug("reduceSinkKeyExpressions " +
+ (vectorReduceSinkInfo.getReduceSinkKeyExpressions() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkKeyExpressions())));
+
+ LOG.debug("reduceSinkValueColumnMap " +
+ (vectorReduceSinkInfo.getReduceSinkValueColumnMap() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkValueColumnMap())));
+ LOG.debug("reduceSinkValueTypeInfos " +
+ (vectorReduceSinkInfo.getReduceSinkValueTypeInfos() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkValueTypeInfos())));
+ LOG.debug("reduceSinkValueColumnVectorTypes " +
+ (vectorReduceSinkInfo.getReduceSinkValueColumnVectorTypes() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkValueColumnVectorTypes())));
+ LOG.debug("reduceSinkValueExpressions " +
+ (vectorReduceSinkInfo.getReduceSinkValueExpressions() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkValueExpressions())));
+
+ LOG.debug("reduceSinkBucketColumnMap " +
+ (vectorReduceSinkInfo.getReduceSinkBucketColumnMap() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkBucketColumnMap())));
+ LOG.debug("reduceSinkBucketTypeInfos " +
+ (vectorReduceSinkInfo.getReduceSinkBucketTypeInfos() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkBucketTypeInfos())));
+ LOG.debug("reduceSinkBucketColumnVectorTypes " +
+ (vectorReduceSinkInfo.getReduceSinkBucketColumnVectorTypes() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkBucketColumnVectorTypes())));
+ LOG.debug("reduceSinkBucketExpressions " +
+ (vectorReduceSinkInfo.getReduceSinkBucketExpressions() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkBucketExpressions())));
+
+ LOG.debug("reduceSinkPartitionColumnMap " +
+ (vectorReduceSinkInfo.getReduceSinkPartitionColumnMap() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkPartitionColumnMap())));
+ LOG.debug("reduceSinkPartitionTypeInfos " +
+ (vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos())));
+ LOG.debug("reduceSinkPartitionColumnVectorTypes " +
+ (vectorReduceSinkInfo.getReduceSinkPartitionColumnVectorTypes() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkPartitionColumnVectorTypes())));
+ LOG.debug("reduceSinkPartitionExpressions " +
+ (vectorReduceSinkInfo.getReduceSinkPartitionExpressions() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkPartitionExpressions())));
+ }
+
if (LOG.isDebugEnabled()) {
// Determine the name of our map or reduce task for debug tracing.
BaseWork work = Utilities.getMapWork(hconf);
@@ -280,21 +315,6 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re
keyBinarySortableSerializeWrite = new BinarySortableSerializeWrite(columnSortOrder,
columnNullMarker, columnNotNullMarker);
- // Create all nulls key.
- try {
- Output nullKeyOutput = new Output();
- keyBinarySortableSerializeWrite.set(nullKeyOutput);
- for (int i = 0; i < reduceSinkKeyColumnMap.length; i++) {
- keyBinarySortableSerializeWrite.writeNull();
- }
- int nullBytesLength = nullKeyOutput.getLength();
- nullBytes = new byte[nullBytesLength];
- System.arraycopy(nullKeyOutput.getData(), 0, nullBytes, 0, nullBytesLength);
- nullKeyHashCode = HashCodeUtil.calculateBytesHashCode(nullBytes, 0, nullBytesLength);
- } catch (Exception e) {
- throw new HiveException(e);
- }
-
valueLazyBinarySerializeWrite = new LazyBinarySerializeWrite(reduceSinkValueColumnMap.length);
valueVectorSerializeRow =
@@ -312,101 +332,6 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re
batchCounter = 0;
}
- @Override
- public void process(Object row, int tag) throws HiveException {
-
- try {
- VectorizedRowBatch batch = (VectorizedRowBatch) row;
-
- batchCounter++;
-
- if (batch.size == 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty");
- }
- return;
- }
-
- // Perform any key expressions. Results will go into scratch columns.
- if (reduceSinkKeyExpressions != null) {
- for (VectorExpression ve : reduceSinkKeyExpressions) {
- ve.evaluate(batch);
- }
- }
-
- // Perform any value expressions. Results will go into scratch columns.
- if (reduceSinkValueExpressions != null) {
- for (VectorExpression ve : reduceSinkValueExpressions) {
- ve.evaluate(batch);
- }
- }
-
- serializedKeySeries.processBatch(batch);
-
- boolean selectedInUse = batch.selectedInUse;
- int[] selected = batch.selected;
-
- int keyLength;
- int logical;
- int end;
- int batchIndex;
- do {
- if (serializedKeySeries.getCurrentIsAllNull()) {
-
- // Use the same logic as ReduceSinkOperator.toHiveKey.
- //
- if (tag == -1 || reduceSkipTag) {
- keyWritable.set(nullBytes, 0, nullBytes.length);
- } else {
- keyWritable.setSize(nullBytes.length + 1);
- System.arraycopy(nullBytes, 0, keyWritable.get(), 0, nullBytes.length);
- keyWritable.get()[nullBytes.length] = reduceTagByte;
- }
- keyWritable.setDistKeyLength(nullBytes.length);
- keyWritable.setHashCode(nullKeyHashCode);
-
- } else {
-
- // One serialized key for 1 or more rows for the duplicate keys.
- // LOG.info("reduceSkipTag " + reduceSkipTag + " tag " + tag + " reduceTagByte " + (int) reduceTagByte + " keyLength " + serializedKeySeries.getSerializedLength());
- // LOG.info("process offset " + serializedKeySeries.getSerializedStart() + " length " + serializedKeySeries.getSerializedLength());
- keyLength = serializedKeySeries.getSerializedLength();
- if (tag == -1 || reduceSkipTag) {
- keyWritable.set(serializedKeySeries.getSerializedBytes(),
- serializedKeySeries.getSerializedStart(), keyLength);
- } else {
- keyWritable.setSize(keyLength + 1);
- System.arraycopy(serializedKeySeries.getSerializedBytes(),
- serializedKeySeries.getSerializedStart(), keyWritable.get(), 0, keyLength);
- keyWritable.get()[keyLength] = reduceTagByte;
- }
- keyWritable.setDistKeyLength(keyLength);
- keyWritable.setHashCode(serializedKeySeries.getCurrentHashCode());
- }
-
- logical = serializedKeySeries.getCurrentLogical();
- end = logical + serializedKeySeries.getCurrentDuplicateCount();
- do {
- batchIndex = (selectedInUse ? selected[logical] : logical);
-
- valueLazyBinarySerializeWrite.reset();
- valueVectorSerializeRow.serializeWrite(batch, batchIndex);
-
- valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength());
-
- collect(keyWritable, valueBytesWritable);
- } while (++logical < end);
-
- if (!serializedKeySeries.next()) {
- break;
- }
- } while (true);
-
- } catch (Exception e) {
- throw new HiveException(e);
- }
- }
-
protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException {
// Since this is a terminal operator, update counters explicitly -
// forward is not called
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java
index 325f773..0bc1cd1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
/*
* Specialized class for native vectorized reduce sink that is reducing on a single long key column.
*/
-public class VectorReduceSinkLongOperator extends VectorReduceSinkCommonOperator {
+public class VectorReduceSinkLongOperator extends VectorReduceSinkUniformHashOperator {
private static final long serialVersionUID = 1L;
private static final String CLASS_NAME = VectorReduceSinkLongOperator.class.getName();
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java
index 2027187..1cca94d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerialize
* Specialized class for native vectorized reduce sink that is reducing on multiple key columns
* (or a single non-long / non-string column).
*/
-public class VectorReduceSinkMultiKeyOperator extends VectorReduceSinkCommonOperator {
+public class VectorReduceSinkMultiKeyOperator extends VectorReduceSinkUniformHashOperator {
private static final long serialVersionUID = 1L;
private static final String CLASS_NAME = VectorReduceSinkMultiKeyOperator.class.getName();
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java
new file mode 100644
index 0000000..6312c44
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.reducesink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hive.common.util.HashCodeUtil;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is uniform hash (common) operator class for native vectorized reduce sink.
+ */
+public class VectorReduceSinkObjectHashOperator extends VectorReduceSinkCommonOperator {
+
+ private static final long serialVersionUID = 1L;
+ private static final String CLASS_NAME = VectorReduceSinkObjectHashOperator.class.getName();
+ private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+ protected int[] reduceSinkBucketColumnMap;
+ protected TypeInfo[] reduceSinkBucketTypeInfos;
+
+ protected VectorExpression[] reduceSinkBucketExpressions;
+
+ protected int[] reduceSinkPartitionColumnMap;
+ protected TypeInfo[] reduceSinkPartitionTypeInfos;
+
+ protected VectorExpression[] reduceSinkPartitionExpressions;
+
+ // The above members are initialized by the constructor and must not be
+ // transient.
+ //---------------------------------------------------------------------------
+
+ protected transient Output keyOutput;
+ protected transient VectorSerializeRow<BinarySortableSerializeWrite> keyVectorSerializeRow;
+
+ private transient boolean hasBuckets;
+ private transient int numBuckets;
+ private transient ObjectInspector[] bucketObjectInspectors;
+ private transient VectorExtractRow bucketVectorExtractRow;
+ private transient Object[] bucketFieldValues;
+
+ private transient boolean isPartitioned;
+ private transient ObjectInspector[] partitionObjectInspectors;
+ private transient VectorExtractRow partitionVectorExtractRow;
+ private transient Object[] partitionFieldValues;
+ private transient Random nonPartitionRandom;
+
+ /** Kryo ctor. */
+ protected VectorReduceSinkObjectHashOperator() {
+ super();
+ }
+
+ public VectorReduceSinkObjectHashOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
+ public VectorReduceSinkObjectHashOperator(CompilationOpContext ctx,
+ VectorizationContext vContext, OperatorDesc conf) throws HiveException {
+ super(ctx, vContext, conf);
+
+ LOG.info("VectorReduceSinkObjectHashOperator constructor vectorReduceSinkInfo " + vectorReduceSinkInfo);
+
+ // This the is Object Hash class variation.
+ Preconditions.checkState(!vectorReduceSinkInfo.getUseUniformHash());
+
+ reduceSinkBucketColumnMap = vectorReduceSinkInfo.getReduceSinkBucketColumnMap();
+ reduceSinkBucketTypeInfos = vectorReduceSinkInfo.getReduceSinkBucketTypeInfos();
+ reduceSinkBucketExpressions = vectorReduceSinkInfo.getReduceSinkBucketExpressions();
+
+ reduceSinkPartitionColumnMap = vectorReduceSinkInfo.getReduceSinkPartitionColumnMap();
+ reduceSinkPartitionTypeInfos = vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos();
+ reduceSinkPartitionExpressions = vectorReduceSinkInfo.getReduceSinkPartitionExpressions();
+ }
+
+ private ObjectInspector[] getObjectInspectorArray(TypeInfo[] typeInfos) {
+ final int size = typeInfos.length;
+ ObjectInspector[] objectInspectors = new ObjectInspector[size];
+ for(int i = 0; i < size; i++) {
+ TypeInfo typeInfo = typeInfos[i];
+ ObjectInspector standardWritableObjectInspector =
+ TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(typeInfo);
+ objectInspectors[i] = standardWritableObjectInspector;
+ }
+ return objectInspectors;
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ super.initializeOp(hconf);
+
+ keyOutput = new Output();
+ keyBinarySortableSerializeWrite.set(keyOutput);
+ keyVectorSerializeRow =
+ new VectorSerializeRow<BinarySortableSerializeWrite>(
+ keyBinarySortableSerializeWrite);
+ keyVectorSerializeRow.init(reduceSinkKeyTypeInfos, reduceSinkKeyColumnMap);
+
+ hasBuckets = false;
+ isPartitioned = false;
+ numBuckets = 0;
+
+ // Object Hash.
+
+ numBuckets = conf.getNumBuckets();
+ hasBuckets = (numBuckets > 0);
+
+ if (hasBuckets) {
+ bucketObjectInspectors = getObjectInspectorArray(reduceSinkBucketTypeInfos);
+ bucketVectorExtractRow = new VectorExtractRow();
+ bucketVectorExtractRow.init(reduceSinkBucketTypeInfos, reduceSinkBucketColumnMap);
+ bucketFieldValues = new Object[reduceSinkBucketTypeInfos.length];
+ }
+
+ isPartitioned = (conf.getPartitionCols() != null);
+ if (!isPartitioned) {
+ nonPartitionRandom = new Random(12345);
+ } else {
+ partitionObjectInspectors = getObjectInspectorArray(reduceSinkPartitionTypeInfos);
+ LOG.debug("*NEW* partitionObjectInspectors " + Arrays.toString(partitionObjectInspectors));
+ partitionVectorExtractRow = new VectorExtractRow();
+ partitionVectorExtractRow.init(reduceSinkPartitionTypeInfos, reduceSinkPartitionColumnMap);
+ partitionFieldValues = new Object[reduceSinkPartitionTypeInfos.length];
+ }
+ }
+
+ @Override
+ public void process(Object row, int tag) throws HiveException {
+
+ try {
+
+ VectorizedRowBatch batch = (VectorizedRowBatch) row;
+
+ batchCounter++;
+
+ if (batch.size == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty");
+ }
+ return;
+ }
+
+ // Perform any key expressions. Results will go into scratch columns.
+ if (reduceSinkKeyExpressions != null) {
+ for (VectorExpression ve : reduceSinkKeyExpressions) {
+ ve.evaluate(batch);
+ }
+ }
+
+ // Perform any value expressions. Results will go into scratch columns.
+ if (reduceSinkValueExpressions != null) {
+ for (VectorExpression ve : reduceSinkValueExpressions) {
+ ve.evaluate(batch);
+ }
+ }
+
+ // Perform any bucket expressions. Results will go into scratch columns.
+ if (reduceSinkBucketExpressions != null) {
+ for (VectorExpression ve : reduceSinkBucketExpressions) {
+ ve.evaluate(batch);
+ }
+ }
+
+ // Perform any partition expressions. Results will go into scratch columns.
+ if (reduceSinkPartitionExpressions != null) {
+ for (VectorExpression ve : reduceSinkPartitionExpressions) {
+ ve.evaluate(batch);
+ }
+ }
+
+ final boolean selectedInUse = batch.selectedInUse;
+ int[] selected = batch.selected;
+
+ final int size = batch.size;
+ for (int logical = 0; logical < size; logical++) {
+ final int batchIndex = (selectedInUse ? selected[logical] : logical);
+
+ final int hashCode;
+ if (!hasBuckets) {
+ if (!isPartitioned) {
+ hashCode = nonPartitionRandom.nextInt();
+ } else {
+ partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues);
+ hashCode =
+ ObjectInspectorUtils.getBucketHashCode(
+ partitionFieldValues, partitionObjectInspectors);
+ }
+ } else {
+ bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues);
+ final int bucketNum =
+ ObjectInspectorUtils.getBucketNumber(
+ bucketFieldValues, bucketObjectInspectors, numBuckets);
+ if (!isPartitioned) {
+ hashCode = nonPartitionRandom.nextInt() * 31 + bucketNum;
+ } else {
+ partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues);
+ hashCode =
+ ObjectInspectorUtils.getBucketHashCode(
+ partitionFieldValues, partitionObjectInspectors) * 31 + bucketNum;
+ }
+ }
+
+ keyBinarySortableSerializeWrite.reset();
+ keyVectorSerializeRow.serializeWrite(batch, batchIndex);
+
+ // One serialized key for 1 or more rows for the duplicate keys.
+ final int keyLength = keyOutput.getLength();
+ if (tag == -1 || reduceSkipTag) {
+ keyWritable.set(keyOutput.getData(), 0, keyLength);
+ } else {
+ keyWritable.setSize(keyLength + 1);
+ System.arraycopy(keyOutput.getData(), 0, keyWritable.get(), 0, keyLength);
+ keyWritable.get()[keyLength] = reduceTagByte;
+ }
+ keyWritable.setDistKeyLength(keyLength);
+ keyWritable.setHashCode(hashCode);
+
+ valueLazyBinarySerializeWrite.reset();
+ valueVectorSerializeRow.serializeWrite(batch, batchIndex);
+
+ valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength());
+
+ collect(keyWritable, valueBytesWritable);
+ }
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java
index b655e6e..a838f4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
/*
* Specialized class for native vectorized reduce sink that is reducing on a single long key column.
*/
-public class VectorReduceSinkStringOperator extends VectorReduceSinkCommonOperator {
+public class VectorReduceSinkStringOperator extends VectorReduceSinkUniformHashOperator {
private static final long serialVersionUID = 1L;
private static final String CLASS_NAME = VectorReduceSinkStringOperator.class.getName();
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java
new file mode 100644
index 0000000..2dfa721
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.reducesink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hive.common.util.HashCodeUtil;
+
+/**
+ * This class is uniform hash (common) operator class for native vectorized reduce sink.
+ */
+public abstract class VectorReduceSinkUniformHashOperator extends VectorReduceSinkCommonOperator {
+
+ private static final long serialVersionUID = 1L;
+ private static final String CLASS_NAME = VectorReduceSinkUniformHashOperator.class.getName();
+ private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+ // The above members are initialized by the constructor and must not be
+ // transient.
+ //---------------------------------------------------------------------------
+
+ // The serialized all null key and its hash code.
+ private transient byte[] nullBytes;
+ private transient int nullKeyHashCode;
+
+ // The object that determines equal key series.
+ protected transient VectorKeySeriesSerialized serializedKeySeries;
+
+
+ /** Kryo ctor. */
+ protected VectorReduceSinkUniformHashOperator() {
+ super();
+ }
+
+ public VectorReduceSinkUniformHashOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
+ public VectorReduceSinkUniformHashOperator(CompilationOpContext ctx,
+ VectorizationContext vContext, OperatorDesc conf) throws HiveException {
+ super(ctx, vContext, conf);
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ super.initializeOp(hconf);
+
+ // Create all nulls key.
+ try {
+ Output nullKeyOutput = new Output();
+ keyBinarySortableSerializeWrite.set(nullKeyOutput);
+ for (int i = 0; i < reduceSinkKeyColumnMap.length; i++) {
+ keyBinarySortableSerializeWrite.writeNull();
+ }
+ int nullBytesLength = nullKeyOutput.getLength();
+ nullBytes = new byte[nullBytesLength];
+ System.arraycopy(nullKeyOutput.getData(), 0, nullBytes, 0, nullBytesLength);
+ nullKeyHashCode = HashCodeUtil.calculateBytesHashCode(nullBytes, 0, nullBytesLength);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ @Override
+ public void process(Object row, int tag) throws HiveException {
+
+ try {
+ VectorizedRowBatch batch = (VectorizedRowBatch) row;
+
+ batchCounter++;
+
+ if (batch.size == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty");
+ }
+ return;
+ }
+
+ // Perform any key expressions. Results will go into scratch columns.
+ if (reduceSinkKeyExpressions != null) {
+ for (VectorExpression ve : reduceSinkKeyExpressions) {
+ ve.evaluate(batch);
+ }
+ }
+
+ // Perform any value expressions. Results will go into scratch columns.
+ if (reduceSinkValueExpressions != null) {
+ for (VectorExpression ve : reduceSinkValueExpressions) {
+ ve.evaluate(batch);
+ }
+ }
+
+ serializedKeySeries.processBatch(batch);
+
+ boolean selectedInUse = batch.selectedInUse;
+ int[] selected = batch.selected;
+
+ int keyLength;
+ int logical;
+ int end;
+ int batchIndex;
+ do {
+ if (serializedKeySeries.getCurrentIsAllNull()) {
+
+ // Use the same logic as ReduceSinkOperator.toHiveKey.
+ //
+ if (tag == -1 || reduceSkipTag) {
+ keyWritable.set(nullBytes, 0, nullBytes.length);
+ } else {
+ keyWritable.setSize(nullBytes.length + 1);
+ System.arraycopy(nullBytes, 0, keyWritable.get(), 0, nullBytes.length);
+ keyWritable.get()[nullBytes.length] = reduceTagByte;
+ }
+ keyWritable.setDistKeyLength(nullBytes.length);
+ keyWritable.setHashCode(nullKeyHashCode);
+
+ } else {
+
+ // One serialized key for 1 or more rows for the duplicate keys.
+ // LOG.info("reduceSkipTag " + reduceSkipTag + " tag " + tag + " reduceTagByte " + (int) reduceTagByte + " keyLength " + serializedKeySeries.getSerializedLength());
+ // LOG.info("process offset " + serializedKeySeries.getSerializedStart() + " length " + serializedKeySeries.getSerializedLength());
+ keyLength = serializedKeySeries.getSerializedLength();
+ if (tag == -1 || reduceSkipTag) {
+ keyWritable.set(serializedKeySeries.getSerializedBytes(),
+ serializedKeySeries.getSerializedStart(), keyLength);
+ } else {
+ keyWritable.setSize(keyLength + 1);
+ System.arraycopy(serializedKeySeries.getSerializedBytes(),
+ serializedKeySeries.getSerializedStart(), keyWritable.get(), 0, keyLength);
+ keyWritable.get()[keyLength] = reduceTagByte;
+ }
+ keyWritable.setDistKeyLength(keyLength);
+ keyWritable.setHashCode(serializedKeySeries.getCurrentHashCode());
+ }
+
+ logical = serializedKeySeries.getCurrentLogical();
+ end = logical + serializedKeySeries.getCurrentDuplicateCount();
+ do {
+ batchIndex = (selectedInUse ? selected[logical] : logical);
+
+ valueLazyBinarySerializeWrite.reset();
+ valueVectorSerializeRow.serializeWrite(batch, batchIndex);
+
+ valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength());
+
+ collect(keyWritable, valueBytesWritable);
+ } while (++logical < end);
+
+ if (!serializedKeySeries.next()) {
+ break;
+ }
+ } while (true);
+
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
index 6582cdd..c23d202 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
@@ -46,13 +46,12 @@ import org.apache.hadoop.mapred.Counters.Group;
* Each session uses a new object, which creates a new file.
*/
public class HiveHistoryImpl implements HiveHistory{
+ private static final Logger LOG = LoggerFactory.getLogger("hive.ql.exec.HiveHistoryImpl");
PrintWriter histStream; // History File stream
String histFileName; // History file name
- private static final Logger LOG = LoggerFactory.getLogger("hive.ql.exec.HiveHistoryImpl");
-
private static final Random randGen = new Random();
private LogHelper console;
@@ -305,7 +304,7 @@ public class HiveHistoryImpl implements HiveHistory{
/**
* write out counters.
*/
- static ThreadLocal<Map<String,String>> ctrMapFactory =
+ static final ThreadLocal<Map<String,String>> ctrMapFactory =
new ThreadLocal<Map<String, String>>() {
@Override
protected Map<String,String> initialValue() {
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java
index 2f0bd88..4380fe3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,66 +18,26 @@
package org.apache.hadoop.hive.ql.hooks;
-import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-public class HookUtils {
- /**
- * Returns the hooks specified in a configuration variable. The hooks are returned
- * in a list in the order they were specified in the configuration variable.
- *
- * @param conf Configuration object
- * @param hookConfVar The configuration variable specifying a comma separated list
- * of the hook class names.
- * @param clazz The super type of the hooks.
- * @return A list of the hooks cast as the type specified in clazz,
- * in the order they are listed in the value of hookConfVar
- * @throws ClassNotFoundException
- * @throws IllegalAccessException
- * @throws InstantiationException
- */
- public static <T extends Hook> List<T> getHooks(HiveConf conf,
- ConfVars hookConfVar, Class<T> clazz)
- throws InstantiationException, IllegalAccessException, ClassNotFoundException {
- String csHooks = conf.getVar(hookConfVar);
- List<T> hooks = new ArrayList<T>();
- if (csHooks == null) {
- return hooks;
- }
- csHooks = csHooks.trim();
- if (csHooks.equals("")) {
- return hooks;
- }
-
- String[] hookClasses = csHooks.split(",");
- for (String hookClass : hookClasses) {
- T hook = (T) Class.forName(hookClass.trim(), true,
- Utilities.getSessionSpecifiedClassLoader()).newInstance();
- hooks.add(hook);
- }
-
- return hooks;
- }
+public class HookUtils {
public static String redactLogString(HiveConf conf, String logString)
- throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+ throws InstantiationException, IllegalAccessException, ClassNotFoundException {
String redactedString = logString;
if (conf != null && logString != null) {
- List<Redactor> queryRedactors = getHooks(conf, ConfVars.QUERYREDACTORHOOKS, Redactor.class);
+ List<Redactor> queryRedactors = new HooksLoader(conf).getHooks(ConfVars.QUERYREDACTORHOOKS);
for (Redactor redactor : queryRedactors) {
redactor.setConf(conf);
redactedString = redactor.redactQuery(redactedString);
}
}
-
return redactedString;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/HooksLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HooksLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HooksLoader.java
new file mode 100644
index 0000000..0008726
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HooksLoader.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+
+/**
+ * A loader class for {@link Hook}s. The class provides a way to create and instantiate {@link Hook} objects. The
+ * methodology for how hooks are loaded is left up to the individual methods.
+ */
+public class HooksLoader {
+
+ private final HiveConf conf;
+
+ /**
+ * Creates a new {@link HooksLoader} that uses the specified {@link HiveConf} to load the {@link Hook}s.
+ *
+ * @param conf the {@link HiveConf} to use when loading the {@link Hook}s
+ */
+ public HooksLoader(HiveConf conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Delegates to {@link #getHooks(HiveConf.ConfVars)} and prints the to the specified {@link SessionState.LogHelper} if
+ * a {@link ClassNotFoundException} is thrown.
+ *
+ * @param hookConfVar the configuration variable specifying a comma separated list of the hook class names
+ * @param console the {@link SessionState.LogHelper} to print to if a {@link ClassNotFoundException} is thrown by the
+ * {@link #getHooks(HiveConf.ConfVars)} method
+ *
+ * @return a list of the hooks objects, in the order they are listed in the value of hookConfVar
+ *
+ * @throws ClassNotFoundException if the specified class names could not be found
+ * @throws IllegalAccessException if the specified class names could not be accessed
+ * @throws InstantiationException if the specified class names could not be instantiated
+ */
+ public <T extends Hook> List<T> getHooks(HiveConf.ConfVars hookConfVar, SessionState.LogHelper console)
+ throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+ try {
+ return getHooks(hookConfVar);
+ } catch (ClassNotFoundException e) {
+ console.printError(hookConfVar.varname + " Class not found: " + e.getMessage());
+ throw e;
+ }
+ }
+
+ /**
+ * Returns the hooks specified in a configuration variable. The hooks are returned in a list in the order they were
+ * specified in the configuration variable. The value of the specified conf variable should be a comma separated list
+ * of class names where each class implements the {@link Hook} interface. The method uses reflection to an instance
+ * of each class and then returns them in a {@link List}.
+ *
+ * @param hookConfVar The configuration variable specifying a comma separated list of the hook class names
+ *
+ * @return a list of the hooks objects, in the order they are listed in the value of hookConfVar
+ *
+ * @throws ClassNotFoundException if the specified class names could not be found
+ * @throws IllegalAccessException if the specified class names could not be accessed
+ * @throws InstantiationException if the specified class names could not be instantiated
+ */
+ public <T extends Hook> List<T> getHooks(HiveConf.ConfVars hookConfVar)
+ throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+ String csHooks = conf.getVar(hookConfVar);
+ ImmutableList.Builder<T> hooks = ImmutableList.builder();
+ if (csHooks == null) {
+ return ImmutableList.of();
+ }
+
+ csHooks = csHooks.trim();
+ if (csHooks.isEmpty()) {
+ return ImmutableList.of();
+ }
+
+ String[] hookClasses = csHooks.split(",");
+ for (String hookClass : hookClasses) {
+ T hook = (T) Class.forName(hookClass.trim(), true,
+ Utilities.getSessionSpecifiedClassLoader()).newInstance();
+ hooks.add(hook);
+ }
+
+ return hooks.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
index 2806c54..7305436 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
@@ -27,6 +27,7 @@ import java.util.Set;
import org.apache.commons.collections.SetUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.common.StringInternUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -403,7 +404,7 @@ public class LineageInfo implements Serializable {
* @param expr the expr to set
*/
public void setExpr(String expr) {
- this.expr = expr;
+ this.expr = StringInternUtils.internIfNotNull(expr);
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecOrcRowGroupCountPrinter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecOrcRowGroupCountPrinter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecOrcRowGroupCountPrinter.java
index 18ef325..ac79ceb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecOrcRowGroupCountPrinter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecOrcRowGroupCountPrinter.java
@@ -60,10 +60,10 @@ public class PostExecOrcRowGroupCountPrinter implements ExecuteWithHookContext {
if (counters != null) {
for (CounterGroup group : counters) {
if (group.getName().equals(LlapIOCounters.class.getName())) {
- console.printError(tezTask.getId() + " LLAP IO COUNTERS:");
+ console.printInfo(tezTask.getId() + " LLAP IO COUNTERS:", false);
for (TezCounter counter : group) {
if (counter.getDisplayName().equals(LlapIOCounters.SELECTED_ROWGROUPS.name())) {
- console.printError(" " + counter.getDisplayName() + ": " + counter.getValue());
+ console.printInfo(" " + counter.getDisplayName() + ": " + counter.getValue(), false);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java
index 412f45c..45bd6e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java
@@ -62,25 +62,25 @@ public class PostExecTezSummaryPrinter implements ExecuteWithHookContext {
String hiveCountersGroup = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
for (CounterGroup group : counters) {
if (hiveCountersGroup.equals(group.getDisplayName())) {
- console.printError(tezTask.getId() + " HIVE COUNTERS:");
+ console.printInfo(tezTask.getId() + " HIVE COUNTERS:", false);
for (TezCounter counter : group) {
- console.printError(" " + counter.getDisplayName() + ": " + counter.getValue());
+ console.printInfo(" " + counter.getDisplayName() + ": " + counter.getValue(), false);
}
} else if (group.getName().equals(FileSystemCounter.class.getName())) {
- console.printError(tezTask.getId() + " FILE SYSTEM COUNTERS:");
+ console.printInfo(tezTask.getId() + " FILE SYSTEM COUNTERS:", false);
for (TezCounter counter : group) {
// HDFS counters should be relatively consistent across test runs when compared to
// local file system counters
if (counter.getName().contains("HDFS")) {
- console.printError(" " + counter.getDisplayName() + ": " + counter.getValue());
+ console.printInfo(" " + counter.getDisplayName() + ": " + counter.getValue(), false);
}
}
} else if (group.getName().equals(LlapIOCounters.class.getName())) {
- console.printError(tezTask.getId() + " LLAP IO COUNTERS:");
+ console.printInfo(tezTask.getId() + " LLAP IO COUNTERS:", false);
List<String> testSafeCounters = LlapIOCounters.testSafeCounterNames();
for (TezCounter counter : group) {
if (testSafeCounters.contains(counter.getDisplayName())) {
- console.printError(" " + counter.getDisplayName() + ": " + counter.getValue());
+ console.printInfo(" " + counter.getDisplayName() + ": " + counter.getValue(), false);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java
index b4fc125..3e74396 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java
@@ -116,8 +116,8 @@ public class PostExecutePrinter implements ExecuteWithHookContext {
}
if (queryState != null) {
- console.printError("POSTHOOK: query: " + queryState.getQueryString().trim());
- console.printError("POSTHOOK: type: " + queryState.getCommandType());
+ console.printInfo("POSTHOOK: query: " + queryState.getQueryString().trim(), false);
+ console.printInfo("POSTHOOK: type: " + queryState.getCommandType(), false);
}
PreExecutePrinter.printEntities(console, inputs, "POSTHOOK: Input: ");
@@ -167,7 +167,7 @@ public class PostExecutePrinter implements ExecuteWithHookContext {
}
sb.append("]");
- console.printError(sb.toString());
+ console.printInfo(sb.toString(), false);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java
index 232c62d..20acfb1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java
@@ -65,8 +65,8 @@ public class PreExecutePrinter implements ExecuteWithHookContext {
}
if (queryState != null) {
- console.printError("PREHOOK: query: " + queryState.getQueryString().trim());
- console.printError("PREHOOK: type: " + queryState.getCommandType());
+ console.printInfo("PREHOOK: query: " + queryState.getQueryString().trim(), false);
+ console.printInfo("PREHOOK: type: " + queryState.getCommandType(), false);
}
printEntities(console, inputs, "PREHOOK: Input: ");
@@ -80,7 +80,7 @@ public class PreExecutePrinter implements ExecuteWithHookContext {
}
Collections.sort(strings);
for (String s : strings) {
- console.printError(prefix + s);
+ console.printInfo(prefix + s, false);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookContextImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookContextImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookContextImpl.java
index 5340848..1845121 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookContextImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookContextImpl.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.hive.ql.hooks;
import org.apache.hadoop.hive.conf.HiveConf;
+
public class QueryLifeTimeHookContextImpl implements QueryLifeTimeHookContext {
+
private HiveConf conf;
private String command;
- private HookContext hc = null;
+ private HookContext hc;
@Override
public HiveConf getHiveConf() {
@@ -54,4 +56,34 @@ public class QueryLifeTimeHookContextImpl implements QueryLifeTimeHookContext {
public void setHookContext(HookContext hc) {
this.hc = hc;
}
+
+ public static class Builder {
+
+ private HiveConf conf;
+ private String command;
+ private HookContext hc;
+
+ public Builder withHiveConf(HiveConf conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ public Builder withCommand(String command) {
+ this.command = command;
+ return this;
+ }
+
+ public Builder withHookContext(HookContext hc) {
+ this.hc = hc;
+ return this;
+ }
+
+ public QueryLifeTimeHookContextImpl build() {
+ QueryLifeTimeHookContextImpl queryLifeTimeHookContext = new QueryLifeTimeHookContextImpl();
+ queryLifeTimeHookContext.setHiveConf(this.conf);
+ queryLifeTimeHookContext.setCommand(this.command);
+ queryLifeTimeHookContext.setHookContext(this.hc);
+ return queryLifeTimeHookContext;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookWithParseHooks.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookWithParseHooks.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookWithParseHooks.java
new file mode 100644
index 0000000..787590d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookWithParseHooks.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.hooks;
+
+
+/**
+ * Extension of {@link QueryLifeTimeHook} that has hooks for pre and post parsing of a query.
+ */
+public interface QueryLifeTimeHookWithParseHooks extends QueryLifeTimeHook {
+
+ /**
+ * Invoked before a query enters the parse phase.
+ *
+ * @param ctx the context for the hook
+ */
+ void beforeParse(QueryLifeTimeHookContext ctx);
+
+ /**
+ * Invoked after a query parsing. Note: if 'hasError' is true,
+ * the query won't enter the following compilation phase.
+ *
+ * @param ctx the context for the hook
+ * @param hasError whether any error occurred during compilation.
+ */
+ void afterParse(QueryLifeTimeHookContext ctx, boolean hasError);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java
index a1408e9..2c3ba7f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java
@@ -26,10 +26,8 @@ import org.slf4j.LoggerFactory;
* Holds index related constants
*/
public class HiveIndex {
-
public static final Logger l4j = LoggerFactory.getLogger("HiveIndex");
-
- public static String INDEX_TABLE_CREATETIME = "hive.index.basetbl.dfs.lastModifiedTime";
+ public static final String INDEX_TABLE_CREATETIME = "hive.index.basetbl.dfs.lastModifiedTime";
public static enum IndexType {
AGGREGATE_TABLE("aggregate", AggregateIndexHandler.class.getName()),