You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/21 20:36:33 UTC
hive git commit: HIVE-20127 : fix some issues with LLAP Parquet cache
(Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master 788daf609 -> cce3a0521
HIVE-20127 : fix some issues with LLAP Parquet cache (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cce3a052
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cce3a052
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cce3a052
Branch: refs/heads/master
Commit: cce3a05213bb7cea48587f0dfbcaadd2e076db2c
Parents: 788daf6
Author: sergey <se...@apache.org>
Authored: Sat Jul 21 13:33:40 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Sat Jul 21 13:33:40 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/llap/LlapCacheAwareFs.java | 12 +++++++++---
.../org/apache/hadoop/hive/ql/io/HiveInputFormat.java | 7 ++++---
2 files changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cce3a052/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
index f68ebd7..dcb24b80 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
@@ -224,6 +224,7 @@ public class LlapCacheAwareFs extends FileSystem {
int offsetFromReadStart = (int)(from - readStartPos), candidateSize = (int)(to - from);
ByteBuffer data = drl.getData().duplicate();
data.get(array, arrayOffset + offsetFromReadStart, candidateSize);
+ cache.releaseBuffer(((CacheChunk)drl).getBuffer());
sizeRead += candidateSize;
drl = drl.next;
}
@@ -250,6 +251,7 @@ public class LlapCacheAwareFs extends FileSystem {
if (candidate.hasData()) {
ByteBuffer data = candidate.getData().duplicate();
data.get(array, arrayOffset + offsetFromReadStart, candidateSize);
+ cache.releaseBuffer(((CacheChunk)candidate).getBuffer());
sizeRead += candidateSize;
continue;
}
@@ -269,6 +271,10 @@ public class LlapCacheAwareFs extends FileSystem {
long chunkFrom = Math.max(from, missingChunk.getKey()),
chunkTo = Math.min(to, missingChunk.getValue()),
chunkLength = chunkTo - chunkFrom;
+ // TODO: if we allow partial reads (right now we disable this), we'd have to handle it here.
+ // chunksInThisRead should probably be changed to be a struct array indicating both
+ // partial and full sizes for each chunk; then the partial ones could be merged
+ // with the previous partial ones, and any newly-full chunks put in the cache.
MemoryBuffer[] largeBuffers = null, smallBuffer = null, newCacheData = null;
try {
int largeBufCount = (int) (chunkLength / maxAlloc);
@@ -364,12 +370,12 @@ public class LlapCacheAwareFs extends FileSystem {
int maxAlloc, long from, long to) {
Map.Entry<Long, Long> firstMissing = chunkIndex.floorEntry(from);
if (firstMissing == null) {
- throw new AssertionError("No lower bound for offset " + from);
+ throw new AssertionError("No lower bound for start offset " + from);
}
if (firstMissing.getValue() <= from
|| ((from - firstMissing.getKey()) % maxAlloc) != 0) {
// The data does not belong to a recognized chunk, or is split wrong.
- throw new AssertionError("Lower bound for offset " + from + " is ["
+ throw new AssertionError("Lower bound for start offset " + from + " is ["
+ firstMissing.getKey() + ", " + firstMissing.getValue() + ")");
}
SortedMap<Long, Long> missingChunks = chunkIndex.subMap(firstMissing.getKey(), to);
@@ -381,7 +387,7 @@ public class LlapCacheAwareFs extends FileSystem {
if (lastMissingEnd < to
|| (to != lastMissingEnd && ((to - lastMissingOffset) % maxAlloc) != 0)) {
// The data does not belong to a recognized chunk, or is split wrong.
- throw new AssertionError("Lower bound for offset " + to + " is ["
+ throw new AssertionError("Lower bound for end offset " + to + " is ["
+ lastMissingOffset + ", " + lastMissingEnd + ")");
}
return missingChunks;
http://git-wip-us.apache.org/repos/asf/hive/blob/cce3a052/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index ec8527e..3cb7ab5 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -274,7 +274,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
if (part == null) {
if (isCacheOnly) {
LOG.info("Using cache only because there's no partition spec for SerDe-based IF");
- injectLlapCaches(inputFormat, llapIo);
+ injectLlapCaches(inputFormat, llapIo, conf);
} else {
LOG.info("Not using LLAP IO because there's no partition spec for SerDe-based IF");
}
@@ -294,7 +294,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
}
}
if (isCacheOnly) {
- injectLlapCaches(inputFormat, llapIo);
+ injectLlapCaches(inputFormat, llapIo, conf);
}
return inputFormat;
}
@@ -320,8 +320,9 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
}
public static void injectLlapCaches(InputFormat<WritableComparable, Writable> inputFormat,
- LlapIo<VectorizedRowBatch> llapIo) {
+ LlapIo<VectorizedRowBatch> llapIo, Configuration conf) {
LOG.info("Injecting LLAP caches into " + inputFormat.getClass().getCanonicalName());
+ conf.setInt("parquet.read.allocation.size", 1024*1024*1024); // Disable buffer splitting for now.
llapIo.initCacheOnlyInputFormat(inputFormat);
}