You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/21 20:36:33 UTC

hive git commit: HIVE-20127 : fix some issues with LLAP Parquet cache (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master 788daf609 -> cce3a0521


HIVE-20127 : fix some issues with LLAP Parquet cache (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cce3a052
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cce3a052
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cce3a052

Branch: refs/heads/master
Commit: cce3a05213bb7cea48587f0dfbcaadd2e076db2c
Parents: 788daf6
Author: sergey <se...@apache.org>
Authored: Sat Jul 21 13:33:40 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Sat Jul 21 13:33:40 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/llap/LlapCacheAwareFs.java   | 12 +++++++++---
 .../org/apache/hadoop/hive/ql/io/HiveInputFormat.java   |  7 ++++---
 2 files changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cce3a052/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
index f68ebd7..dcb24b80 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
@@ -224,6 +224,7 @@ public class LlapCacheAwareFs extends FileSystem {
           int offsetFromReadStart = (int)(from - readStartPos), candidateSize = (int)(to - from);
           ByteBuffer data = drl.getData().duplicate();
           data.get(array, arrayOffset + offsetFromReadStart, candidateSize);
+          cache.releaseBuffer(((CacheChunk)drl).getBuffer());
           sizeRead += candidateSize;
           drl = drl.next;
         }
@@ -250,6 +251,7 @@ public class LlapCacheAwareFs extends FileSystem {
         if (candidate.hasData()) {
           ByteBuffer data = candidate.getData().duplicate();
           data.get(array, arrayOffset + offsetFromReadStart, candidateSize);
+          cache.releaseBuffer(((CacheChunk)candidate).getBuffer());
           sizeRead += candidateSize;
           continue;
         }
@@ -269,6 +271,10 @@ public class LlapCacheAwareFs extends FileSystem {
           long chunkFrom = Math.max(from, missingChunk.getKey()),
               chunkTo = Math.min(to, missingChunk.getValue()),
               chunkLength = chunkTo - chunkFrom;
+          // TODO: if we allow partial reads (right now we disable this), we'd have to handle it here.
+          //       chunksInThisRead should probably be changed to be a struct array indicating both
+          //       partial and full sizes for each chunk; then the partial ones could be merged
+          //       with the previous partial ones, and any newly-full chunks put in the cache.
           MemoryBuffer[] largeBuffers = null, smallBuffer = null, newCacheData = null;
           try {
             int largeBufCount = (int) (chunkLength / maxAlloc);
@@ -364,12 +370,12 @@ public class LlapCacheAwareFs extends FileSystem {
         int maxAlloc, long from, long to) {
       Map.Entry<Long, Long> firstMissing = chunkIndex.floorEntry(from);
       if (firstMissing == null) {
-        throw new AssertionError("No lower bound for offset " + from);
+        throw new AssertionError("No lower bound for start offset " + from);
       }
       if (firstMissing.getValue() <= from
           || ((from - firstMissing.getKey()) % maxAlloc) != 0) {
         // The data does not belong to a recognized chunk, or is split wrong.
-        throw new AssertionError("Lower bound for offset " + from + " is ["
+        throw new AssertionError("Lower bound for start offset " + from + " is ["
             + firstMissing.getKey() + ", " + firstMissing.getValue() + ")");
       }
       SortedMap<Long, Long> missingChunks = chunkIndex.subMap(firstMissing.getKey(), to);
@@ -381,7 +387,7 @@ public class LlapCacheAwareFs extends FileSystem {
       if (lastMissingEnd < to
           || (to != lastMissingEnd && ((to - lastMissingOffset) % maxAlloc) != 0)) {
         // The data does not belong to a recognized chunk, or is split wrong.
-        throw new AssertionError("Lower bound for offset " + to + " is ["
+        throw new AssertionError("Lower bound for end offset " + to + " is ["
             + lastMissingOffset + ", " + lastMissingEnd + ")");
       }
       return missingChunks;

http://git-wip-us.apache.org/repos/asf/hive/blob/cce3a052/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index ec8527e..3cb7ab5 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -274,7 +274,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       if (part == null) {
         if (isCacheOnly) {
           LOG.info("Using cache only because there's no partition spec for SerDe-based IF");
-          injectLlapCaches(inputFormat, llapIo);
+          injectLlapCaches(inputFormat, llapIo, conf);
         } else {
           LOG.info("Not using LLAP IO because there's no partition spec for SerDe-based IF");
         }
@@ -294,7 +294,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       }
     }
     if (isCacheOnly) {
-      injectLlapCaches(inputFormat, llapIo);
+      injectLlapCaches(inputFormat, llapIo, conf);
     }
     return inputFormat;
   }
@@ -320,8 +320,9 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
   }
 
   public static void injectLlapCaches(InputFormat<WritableComparable, Writable> inputFormat,
-      LlapIo<VectorizedRowBatch> llapIo) {
+      LlapIo<VectorizedRowBatch> llapIo, Configuration conf) {
     LOG.info("Injecting LLAP caches into " + inputFormat.getClass().getCanonicalName());
+    conf.setInt("parquet.read.allocation.size", 1024*1024*1024); // Disable buffer splitting for now.
     llapIo.initCacheOnlyInputFormat(inputFormat);
   }