You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/05/06 02:37:58 UTC
hive git commit: HIVE-10482 : LLAP: AsertionError cannot allocate
when reading from orc (Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/llap 2fbe0fae2 -> 8a62fc9c6
HIVE-10482 : LLAP: AsertionError cannot allocate when reading from orc (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8a62fc9c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8a62fc9c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8a62fc9c
Branch: refs/heads/llap
Commit: 8a62fc9c63ee1e30e02e702e365ec754211b4de0
Parents: 2fbe0fa
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue May 5 17:37:47 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue May 5 17:37:47 2015 -0700
----------------------------------------------------------------------
.../hadoop/hive/llap/cache/Allocator.java | 10 ++-
.../hadoop/hive/llap/cache/BuddyAllocator.java | 95 +++++++++++++-------
.../hive/llap/cache/LlapOomDebugDump.java | 23 +++++
.../hive/llap/cache/LowLevelCacheImpl.java | 30 ++++++-
.../llap/cache/LowLevelCacheMemoryManager.java | 5 ++
.../hive/llap/cache/LowLevelCachePolicy.java | 3 +-
.../llap/cache/LowLevelFifoCachePolicy.java | 25 ++++++
.../llap/cache/LowLevelLrfuCachePolicy.java | 16 ++++
.../hadoop/hive/llap/cache/MemoryManager.java | 2 +-
.../hive/llap/io/api/impl/LlapIoImpl.java | 1 +
.../hive/llap/cache/TestBuddyAllocator.java | 53 ++++++-----
.../hive/llap/cache/TestLowLevelCacheImpl.java | 10 ++-
.../hive/llap/cache/TestOrcMetadataCache.java | 14 ++-
.../org/apache/hadoop/hive/llap/DebugUtils.java | 4 +
.../hive/ql/io/orc/EncodedReaderImpl.java | 17 ++--
.../apache/hadoop/hive/ql/io/orc/InStream.java | 22 +----
16 files changed, 246 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java b/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java
index 1bb64ae..4e990ef 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java
@@ -19,9 +19,15 @@ package org.apache.hadoop.hive.llap.cache;
import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
-
public interface Allocator {
- boolean allocateMultiple(LlapMemoryBuffer[] dest, int size);
+ public static class LlapCacheOutOfMemoryException extends RuntimeException {
+ public LlapCacheOutOfMemoryException(String msg) {
+ super(msg);
+ }
+
+ private static final long serialVersionUID = 268124648177151761L;
+ }
+ void allocateMultiple(LlapMemoryBuffer[] dest, int size) throws LlapCacheOutOfMemoryException;
void deallocate(LlapMemoryBuffer buffer);
boolean isDirectAlloc();
int getMaxAllocation();
http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 4cc7682..9f1472f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -89,7 +89,8 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
// TODO: would it make sense to return buffers asynchronously?
@Override
- public boolean allocateMultiple(LlapMemoryBuffer[] dest, int size) {
+ public void allocateMultiple(LlapMemoryBuffer[] dest, int size)
+ throws LlapCacheOutOfMemoryException {
assert size > 0 : "size is " + size;
if (size > maxAllocation) {
throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation);
@@ -114,29 +115,54 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
int startIndex = (int)(threadId % arenaCount), index = startIndex;
do {
int newIx = arenas[index].allocateFast(index, freeListIx, dest, ix, allocationSize);
- if (newIx == dest.length) return true;
- if (newIx != -1) { // Shouldn't happen.
+ if (newIx == dest.length) return;
+ if (newIx != -1) { // TODO: check if it can still happen; count should take care of this.
ix = newIx;
}
+ ix = newIx;
if ((++index) == arenaCount) {
index = 0;
}
} while (index != startIndex);
}
- // Then try to split bigger blocks. TODO: again, ideally we would tryLock at least once
- for (int i = 0; i < arenaCount; ++i) {
- int newIx = arenas[i].allocateWithSplit(i, freeListIx, dest, ix, allocationSize);
- if (newIx == -1) break; // Shouldn't happen.
- if (newIx == dest.length) return true;
- ix = newIx;
- }
- // Then try to allocate memory if we haven't allocated all the way to maxSize yet; very rare.
- for (int i = arenaCount; i < arenas.length; ++i) {
- ix = arenas[i].allocateWithExpand(i, freeListIx, dest, ix, allocationSize);
- if (ix == dest.length) return true;
+ // TODO: this is very hacky.
+ // We called reserveMemory so we know that somewhere in there, there's memory waiting for us.
+ // However, we have a class of rare race conditions related to the order of locking/checking of
+ // different allocation areas. Simple case - say we have 2 arenas, 256Kb available in arena 2.
+ // We look at arena 1; someone deallocs 256Kb from arena 1 and allocs the same from arena 2;
+ // we look at arena 2 and find no memory. Or, for single arena, 2 threads reserve 256k each,
+ // and a single 1Mb block is available. When the 1st thread locks the 1Mb freelist, the 2nd one
+ // might have already examined the 256k and 512k lists, finding nothing. Blocks placed by (1)
+ // into smaller lists after its split is done will not be found by (2); given that freelist
+ // locks don't overlap, (2) may even run completely between the time (1) takes out the 1Mb
+ // block and the time it returns the remaining 768Kb.
+ // Two solutions to this are some form of cross-thread helping (threads putting "demand"
+ // into some sort of queues that deallocate and split will examine), or having and "actor"
+ // allocator thread (or threads per arena).
+ // The 2nd one is probably much simpler and will allow us to get rid of a lot of sync code.
+ // But for now we will just retry 5 times 0_o
+ for (int attempt = 0; attempt < 5; ++attempt) {
+ // Try to split bigger blocks. TODO: again, ideally we would tryLock at least once
+ for (int i = 0; i < arenaCount; ++i) {
+ int newIx = arenas[i].allocateWithSplit(i, freeListIx, dest, ix, allocationSize);
+ if (newIx == -1) break; // Shouldn't happen.
+ if (newIx == dest.length) return;
+ ix = newIx;
+ }
+ if (attempt == 0) {
+ // Try to allocate memory if we haven't allocated all the way to maxSize yet; very rare.
+ for (int i = arenaCount; i < arenas.length; ++i) {
+ ix = arenas[i].allocateWithExpand(i, freeListIx, dest, ix, allocationSize);
+ if (ix == dest.length) return;
+ }
+ }
+ LlapIoImpl.LOG.warn("Failed to allocate despite reserved memory; will retry " + attempt);
}
- return false;
+ String msg = "Failed to allocate " + size + "; at " + ix + " out of " + dest.length;
+ LlapIoImpl.LOG.error(msg + "\nALLOCATOR STATE:\n" + debugDump()
+ + "\nPARENT STATE:\n" + memoryManager.debugDumpForOom());
+ throw new LlapCacheOutOfMemoryException(msg);
}
@Override
@@ -170,7 +196,6 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
for (Arena arena : arenas) {
arena.debugDump(result);
}
- result.append("\n");
return result.toString();
}
@@ -237,14 +262,6 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
// Try to get as consistent view as we can; make copy of the headers.
byte[] headers = new byte[this.headers.length];
System.arraycopy(this.headers, 0, headers, 0, headers.length);
- for (int i = 0; i < headers.length; ++i) {
- byte header = headers[i];
- if (header == 0) continue;
- int freeListIx = freeListFromHeader(header), offset = offsetFromHeaderIndex(i);
- boolean isFree = (header & 1) == 0;
- result.append("\n block " + i + " at " + offset + ": size "
- + (1 << (freeListIx + minAllocLog2)) + ", " + (isFree ? "free" : "allocated"));
- }
int allocSize = minAllocation;
for (int i = 0; i < freeLists.length; ++i, allocSize <<= 1) {
result.append("\n free list for size " + allocSize + ": ");
@@ -260,6 +277,14 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
freeList.lock.unlock();
}
}
+ for (int i = 0; i < headers.length; ++i) {
+ byte header = headers[i];
+ if (header == 0) continue;
+ int freeListIx = freeListFromHeader(header), offset = offsetFromHeaderIndex(i);
+ boolean isFree = (header & 1) == 0;
+ result.append("\n block " + i + " at " + offset + ": size "
+ + (1 << (freeListIx + minAllocLog2)) + ", " + (isFree ? "free" : "allocated"));
+ }
}
private int freeListFromHeader(byte header) {
@@ -297,25 +322,30 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
int headerStep = 1 << freeListIx;
int splitListIx = freeListIx + 1;
while (remaining > 0 && splitListIx < freeLists.length) {
- int splitWays = 1 << (splitListIx - freeListIx);
- int lastSplitBlocksRemaining = -1, lastSplitNextHeader = -1;
+ int splitWaysLog2 = (splitListIx - freeListIx);
+ assert splitWaysLog2 > 0;
+ int splitWays = 1 << splitWaysLog2; // How many ways each block splits into target size.
+ int lastSplitBlocksRemaining = -1; // How many target-sized blocks remain from last split.
+ int lastSplitNextHeader = -1; // The header index for the beginning of the remainder.
FreeList splitList = freeLists[splitListIx];
splitList.lock.lock();
try {
int headerIx = splitList.listHead;
while (headerIx >= 0 && remaining > 0) {
int origOffset = offsetFromHeaderIndex(headerIx), offset = origOffset;
- int toTake = Math.min(splitWays, remaining);
+ int toTake = Math.min(splitWays, remaining); // We split it splitWays and take toTake.
remaining -= toTake;
- lastSplitBlocksRemaining = splitWays - toTake;
+ lastSplitBlocksRemaining = splitWays - toTake; // Whatever remains.
+ // Take toTake blocks by splitting the block at origOffset.
for (; toTake > 0; ++ix, --toTake, headerIx += headerStep, offset += allocationSize) {
headers[headerIx] = headerData;
+ // TODO: this could be done out of the lock, we only need to take the blocks out.
((LlapDataBuffer)dest[ix]).initialize(arenaIx, data, offset, allocationSize);
}
- lastSplitNextHeader = headerIx;
- headerIx = data.getInt(origOffset + 4);
+ lastSplitNextHeader = headerIx; // If anything remains, this is where it starts.
+ headerIx = data.getInt(origOffset + 4); // Get next item from the free list.
}
- replaceListHeadUnderLock(splitList, headerIx);
+ replaceListHeadUnderLock(splitList, headerIx); // In the end, update free list head.
} finally {
splitList.lock.unlock();
}
@@ -449,6 +479,9 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
}
}
+ private static class Request {
+
+ }
private static class FreeList {
ReentrantLock lock = new ReentrantLock(false);
int listHead = -1; // Index of where the buffer is; in minAllocation units
http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapOomDebugDump.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapOomDebugDump.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapOomDebugDump.java
new file mode 100644
index 0000000..30bf5a9
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapOomDebugDump.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+public interface LlapOomDebugDump {
+ String debugDumpForOom();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index 4855d46..249ed56 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import com.google.common.annotations.VisibleForTesting;
-public class LowLevelCacheImpl implements LowLevelCache {
+public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
private static final int DEFAULT_CLEANUP_INTERVAL = 600;
private final EvictionAwareAllocator allocator;
private AtomicInteger newEvictions = new AtomicInteger(0);
@@ -308,6 +308,9 @@ public class LowLevelCacheImpl implements LowLevelCache {
if (buffer.declaredCachedLength != LlapDataBuffer.UNKNOWN_CACHED_LENGTH) {
cachePolicy.notifyUnlock(buffer);
} else {
+ if (DebugUtils.isTraceCachingEnabled()) {
+ LlapIoImpl.LOG.info("Deallocating " + buffer + " that was not cached");
+ }
allocator.deallocate(buffer);
}
}
@@ -469,4 +472,29 @@ public class LowLevelCacheImpl implements LowLevelCache {
public Allocator getAllocator() {
return allocator;
}
+
+ @Override
+ public String debugDumpForOom() {
+ StringBuilder sb = new StringBuilder("File cache state ");
+ for (Map.Entry<Long, FileCache> e : cache.entrySet()) {
+ if (!e.getValue().incRef()) continue;
+ try {
+ sb.append("\n file " + e.getKey());
+ for (Map.Entry<Long, LlapDataBuffer> e2 : e.getValue().cache.entrySet()) {
+ if (e2.getValue().incRef() < 0) continue;
+ try {
+ sb.append("\n [").append(e2.getKey()).append(", ")
+ .append(e2.getKey() + e2.getValue().declaredCachedLength)
+ .append(") => ").append(e2.getValue().toString())
+ .append(" alloc ").append(e2.getValue().byteBuffer.position());
+ } finally {
+ e2.getValue().decRef();
+ }
+ }
+ } finally {
+ e.getValue().decRef();
+ }
+ }
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index 405b14f..85f66f8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -99,4 +99,9 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
} while (!usedMemory.compareAndSet(oldV, oldV - memoryToRelease));
}
+ @Override
+ public String debugDumpForOom() {
+ return "cache state\n" + evictor.debugDumpForOom();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
index 85cce31..0b50749 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
@@ -20,10 +20,11 @@ package org.apache.hadoop.hive.llap.cache;
import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority;
-public interface LowLevelCachePolicy {
+public interface LowLevelCachePolicy extends LlapOomDebugDump {
void cache(LlapCacheableBuffer buffer, Priority priority);
void notifyLock(LlapCacheableBuffer buffer);
void notifyUnlock(LlapCacheableBuffer buffer);
long evictSomeBlocks(long memoryToReserve);
void setEvictionListener(EvictionListener listener);
+ void setParentDebugDumper(LlapOomDebugDump dumper);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
index 74e975c..a1ed7ea 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
@@ -35,6 +35,7 @@ public class LowLevelFifoCachePolicy implements LowLevelCachePolicy {
private final Lock lock = new ReentrantLock();
private final LinkedList<LlapCacheableBuffer> buffers;
private EvictionListener evictionListener;
+ private LlapOomDebugDump parentDebugDump;
public LowLevelFifoCachePolicy(Configuration conf) {
if (LlapIoImpl.LOGL.isInfoEnabled()) {
@@ -70,6 +71,11 @@ public class LowLevelFifoCachePolicy implements LowLevelCachePolicy {
}
@Override
+ public void setParentDebugDumper(LlapOomDebugDump dumper) {
+ this.parentDebugDump = dumper;
+ }
+
+ @Override
public long evictSomeBlocks(long memoryToReserve) {
long evicted = 0;
lock.lock();
@@ -88,4 +94,23 @@ public class LowLevelFifoCachePolicy implements LowLevelCachePolicy {
}
return evicted;
}
+
+ @Override
+ public String debugDumpForOom() {
+ StringBuilder sb = new StringBuilder("FIFO eviction list: ");
+ lock.lock();
+ try {
+ sb.append(buffers.size()).append(" elements): ");
+ Iterator<LlapCacheableBuffer> iter = buffers.iterator();
+ while (iter.hasNext()) {
+ sb.append(iter.next().toStringForCache()).append(",\n");
+ }
+ } finally {
+ lock.unlock();
+ }
+ if (parentDebugDump != null) {
+ sb.append("\n").append(parentDebugDump.debugDumpForOom());
+ }
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index f7b493d..b43b31d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -64,6 +64,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
/** Number of elements. */
private int heapSize = 0;
private EvictionListener evictionListener;
+ private LlapOomDebugDump parentDebugDump;
public LowLevelLrfuCachePolicy(Configuration conf) {
long maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
@@ -170,6 +171,12 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
}
@Override
+ public void setParentDebugDumper(LlapOomDebugDump dumper) {
+ this.parentDebugDump = dumper;
+ }
+
+
+ @Override
public long evictSomeBlocks(long memoryToReserve) {
long evicted = 0;
// In normal case, we evict the items from the list.
@@ -411,4 +418,13 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
}
return result.toString();
}
+
+ @Override
+ public String debugDumpForOom() {
+ String result = debugDumpHeap();
+ if (parentDebugDump != null) {
+ result += "\n" + parentDebugDump.debugDumpForOom();
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
index d454ec8..8e167ec 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hive.llap.cache;
-public interface MemoryManager {
+public interface MemoryManager extends LlapOomDebugDump {
boolean reserveMemory(long memoryToReserve, boolean waitForEviction);
void releaseMemory(long memUsage);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 2333331..761aefe 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -102,6 +102,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
orcCache = new LowLevelCacheImpl(cacheMetrics, cachePolicy, allocator, true);
// And finally cache policy uses cache to notify it of eviction. The cycle is complete!
cachePolicy.setEvictionListener(new EvictionDispatcher(orcCache, metadataCache));
+ cachePolicy.setParentDebugDumper(orcCache);
orcCache.init();
} else {
cachePolicy.setEvictionListener(new EvictionDispatcher(null, metadataCache));
http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 3bea70f..d35edb7 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.Random;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -30,6 +31,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.cache.Allocator.LlapCacheOutOfMemoryException;
import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.junit.Test;
@@ -47,21 +49,26 @@ public class TestBuddyAllocator {
@Override
public void releaseMemory(long memUsage) {
}
+
+ @Override
+ public String debugDumpForOom() {
+ return "";
+ }
}
@Test
- public void testVariableSizeAllocs() {
+ public void testVariableSizeAllocs() throws Exception {
testVariableSizeInternal(1, 2, 1);
}
@Test
- public void testVariableSizeMultiAllocs() {
+ public void testVariableSizeMultiAllocs() throws Exception {
testVariableSizeInternal(3, 2, 3);
testVariableSizeInternal(5, 2, 5);
}
@Test
- public void testSameSizes() {
+ public void testSameSizes() throws Exception {
int min = 3, max = 8, maxAlloc = 1 << max;
Configuration conf = createConf(1 << min, maxAlloc, maxAlloc, maxAlloc);
BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager(),
@@ -72,7 +79,7 @@ public class TestBuddyAllocator {
}
@Test
- public void testMultipleArenas() {
+ public void testMultipleArenas() throws Exception {
int max = 8, maxAlloc = 1 << max, allocLog2 = max - 1, arenaCount = 5;
Configuration conf = createConf(1 << 3, maxAlloc, maxAlloc, maxAlloc * arenaCount);
BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager(),
@@ -88,26 +95,29 @@ public class TestBuddyAllocator {
LlapDaemonCacheMetrics.create("test", "1"));
ExecutorService executor = Executors.newFixedThreadPool(3);
final CountDownLatch cdlIn = new CountDownLatch(3), cdlOut = new CountDownLatch(1);
- FutureTask<Object> upTask = new FutureTask<Object>(new Runnable() {
- public void run() {
+ FutureTask<Void> upTask = new FutureTask<Void>(new Callable<Void>() {
+ public Void call() throws Exception {
syncThreadStart(cdlIn, cdlOut);
allocateUp(a, min, max, allocsPerSize, false);
allocateUp(a, min, max, allocsPerSize, true);
+ return null;
}
- }, null), downTask = new FutureTask<Object>(new Runnable() {
- public void run() {
+ }), downTask = new FutureTask<Void>(new Callable<Void>() {
+ public Void call() throws Exception {
syncThreadStart(cdlIn, cdlOut);
allocateDown(a, min, max, allocsPerSize, false);
allocateDown(a, min, max, allocsPerSize, true);
+ return null;
}
- }, null), sameTask = new FutureTask<Object>(new Runnable() {
- public void run() {
+ }), sameTask = new FutureTask<Void>(new Callable<Void>() {
+ public Void call() throws Exception {
syncThreadStart(cdlIn, cdlOut);
for (int i = min; i <= max; ++i) {
allocSameSize(a, (1 << (max - i)) * allocsPerSize, i);
}
+ return null;
}
- }, null);
+ });
executor.execute(sameTask);
executor.execute(upTask);
executor.execute(downTask);
@@ -131,7 +141,8 @@ public class TestBuddyAllocator {
}
}
- private void testVariableSizeInternal(int allocCount, int arenaSizeMult, int arenaCount) {
+ private void testVariableSizeInternal(
+ int allocCount, int arenaSizeMult, int arenaCount) throws Exception {
int min = 3, max = 8, maxAlloc = 1 << max, arenaSize = maxAlloc * arenaSizeMult;
Configuration conf = createConf(1 << min, maxAlloc, arenaSize, arenaSize * arenaCount);
BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager(),
@@ -144,7 +155,7 @@ public class TestBuddyAllocator {
allocateDown(a, min, max, allocCount, true);
}
- private void allocSameSize(BuddyAllocator a, int allocCount, int sizeLog2) {
+ private void allocSameSize(BuddyAllocator a, int allocCount, int sizeLog2) throws Exception {
LlapMemoryBuffer[][] allocs = new LlapMemoryBuffer[allocCount][];
long[][] testValues = new long[allocCount][];
for (int j = 0; j < allocCount; ++j) {
@@ -153,8 +164,8 @@ public class TestBuddyAllocator {
deallocUpOrDown(a, false, allocs, testValues);
}
- private void allocateUp(
- BuddyAllocator a, int min, int max, int allocPerSize, boolean isSameOrderDealloc) {
+ private void allocateUp(BuddyAllocator a, int min, int max, int allocPerSize,
+ boolean isSameOrderDealloc) throws Exception {
int sizes = max - min + 1;
LlapMemoryBuffer[][] allocs = new LlapMemoryBuffer[sizes][];
// Put in the beginning; relies on the knowledge of internal implementation. Pave?
@@ -165,8 +176,8 @@ public class TestBuddyAllocator {
deallocUpOrDown(a, isSameOrderDealloc, allocs, testValues);
}
- private void allocateDown(
- BuddyAllocator a, int min, int max, int allocPerSize, boolean isSameOrderDealloc) {
+ private void allocateDown(BuddyAllocator a, int min, int max, int allocPerSize,
+ boolean isSameOrderDealloc) throws Exception {
int sizes = max - min + 1;
LlapMemoryBuffer[][] allocs = new LlapMemoryBuffer[sizes][];
// Put in the beginning; relies on the knowledge of internal implementation. Pave?
@@ -178,13 +189,15 @@ public class TestBuddyAllocator {
}
private void allocateAndUseBuffer(BuddyAllocator a, LlapMemoryBuffer[][] allocs,
- long[][] testValues, int allocCount, int index, int sizeLog2) {
+ long[][] testValues, int allocCount, int index, int sizeLog2) throws Exception {
allocs[index] = new LlapMemoryBuffer[allocCount];
testValues[index] = new long[allocCount];
int size = (1 << sizeLog2) - 1;
- if (!a.allocateMultiple(allocs[index], size)) {
+ try {
+ a.allocateMultiple(allocs[index], size);
+ } catch (LlapCacheOutOfMemoryException ex) {
LOG.error("Failed to allocate " + allocCount + " of " + size + "; " + a.debugDump());
- fail();
+ throw ex;
}
// LOG.info("Allocated " + allocCount + " of " + size + "; " + a.debugDump());
for (int j = 0; j < allocCount; ++j) {
http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index 47bdf1e..2c87ec1 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -54,13 +54,12 @@ public class TestLowLevelCacheImpl {
private static class DummyAllocator implements EvictionAwareAllocator {
@Override
- public boolean allocateMultiple(LlapMemoryBuffer[] dest, int size) {
+ public void allocateMultiple(LlapMemoryBuffer[] dest, int size) {
for (int i = 0; i < dest.length; ++i) {
LlapDataBuffer buf = new LlapDataBuffer();
buf.initialize(0, null, -1, size);
dest[i] = buf;
}
- return true;
}
@Override
@@ -101,6 +100,13 @@ public class TestLowLevelCacheImpl {
public void setEvictionListener(EvictionListener listener) {
}
+
+ public String debugDumpForOom() {
+ return "";
+ }
+
+ public void setParentDebugDumper(LlapOomDebugDump dumper) {
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
index 9d769c5..513aedf 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
@@ -28,8 +28,6 @@ import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
import org.junit.Test;
public class TestOrcMetadataCache {
- private static final Log LOG = LogFactory.getLog(TestOrcMetadataCache.class);
-
private static class DummyCachePolicy implements LowLevelCachePolicy {
public DummyCachePolicy() {
}
@@ -49,6 +47,13 @@ public class TestOrcMetadataCache {
public void setEvictionListener(EvictionListener listener) {
}
+
+ public String debugDumpForOom() {
+ return "";
+ }
+
+ public void setParentDebugDumper(LlapOomDebugDump dumper) {
+ }
}
private static class DummyMemoryManager implements MemoryManager {
@@ -64,6 +69,11 @@ public class TestOrcMetadataCache {
public void releaseMemory(long memUsage) {
--allocs;
}
+
+ @Override
+ public String debugDumpForOom() {
+ return "";
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
index 8dc62d8..fc58cf3 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
@@ -35,6 +35,10 @@ public class DebugUtils {
return isTraceOrcEnabled; // TODO: temporary, should be hardcoded false
}
+ public static boolean isTraceRangesEnabled() {
+ return true; // TODO: temporary, should be hardcoded false
+ }
+
public static boolean isTraceLockingEnabled() {
return false;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
index 016f470..e931d09 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
@@ -336,12 +336,14 @@ public class EncodedReaderImpl implements EncodedReader {
// 2. Now, read all of the ranges from cache or disk.
toRead = new DiskRangeListMutateHelper(listToRead.get());
- if (DebugUtils.isTraceOrcEnabled()) {
+ if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
+ && LOG.isInfoEnabled()) {
LOG.info("Resulting disk ranges to read (file " + fileId + "): "
+ RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
cache.getFileData(fileId, toRead.next, stripeOffset, InStream.CC_FACTORY);
- if (DebugUtils.isTraceOrcEnabled()) {
+ if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
+ && LOG.isInfoEnabled()) {
LOG.info("Disk ranges after cache (file " + fileId + ", base offset " + stripeOffset
+ "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
@@ -350,7 +352,8 @@ public class EncodedReaderImpl implements EncodedReader {
RecordReaderUtils.readDiskRanges(
file, zcr, stripeOffset, toRead.next, cache.getAllocator().isDirectAlloc());
- if (DebugUtils.isTraceOrcEnabled()) {
+ if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
+ && LOG.isInfoEnabled()) {
LOG.info("Disk ranges after disk read (file " + fileId + ", base offset " + stripeOffset
+ "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
@@ -369,7 +372,8 @@ public class EncodedReaderImpl implements EncodedReader {
}
}
}
- if (DebugUtils.isTraceOrcEnabled()) {
+ if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
+ && LOG.isInfoEnabled()) {
LOG.info("Disk ranges after pre-read (file " + fileId + ", base offset "
+ stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
@@ -454,8 +458,9 @@ public class EncodedReaderImpl implements EncodedReader {
}
releaseContexts(colCtxs);
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Disk ranges after processing all the data "
+ if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
+ && LOG.isInfoEnabled()) {
+ LOG.info("Disk ranges after preparing all the data "
+ RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
index e04ee4d..b7633ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
@@ -756,10 +756,7 @@ public abstract class InStream extends InputStream {
targetBuffers[ix] = chunk.buffer;
++ix;
}
- boolean canAlloc = cache.getAllocator().allocateMultiple(targetBuffers, bufferSize);
- if (!canAlloc) {
- throw new AssertionError("Cannot allocate");
- }
+ cache.getAllocator().allocateMultiple(targetBuffers, bufferSize);
// 4. Now decompress (or copy) the data into cache buffers.
for (ProcCacheChunk chunk : toDecompress) {
@@ -1017,11 +1014,8 @@ public abstract class InStream extends InputStream {
cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
++ix;
}
- boolean canAlloc = cache.getAllocator().allocateMultiple(
+ cache.getAllocator().allocateMultiple(
targetBuffers, (int)(partCount == 1 ? streamLen : partSize));
- if (!canAlloc) {
- throw new AssertionError("Cannot allocate");
- }
// 4. Now copy the data into cache buffers.
ix = 0;
@@ -1070,11 +1064,7 @@ public abstract class InStream extends InputStream {
// We thought we had the entire part to cache, but we don't; convert start to
// non-cached. Since we are at the first gap, the previous stuff must be contiguous.
singleAlloc[0] = null;
- boolean canAlloc = cache.getAllocator().allocateMultiple(
- singleAlloc, (int)(candidateEnd - partOffset));
- if (!canAlloc) {
- throw new AssertionError("Cannot allocate");
- }
+ cache.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset));
LlapMemoryBuffer buffer = singleAlloc[0];
cache.notifyReused(buffer);
@@ -1088,11 +1078,7 @@ public abstract class InStream extends InputStream {
private static TrackedCacheChunk copyAndReplaceUncompressedToNonCached(
BufferChunk bc, LowLevelCache cache, LlapMemoryBuffer[] singleAlloc) {
singleAlloc[0] = null;
- boolean canAlloc = cache.getAllocator().allocateMultiple(singleAlloc, bc.getLength());
- if (!canAlloc) {
- throw new AssertionError("Cannot allocate");
- }
-
+ cache.getAllocator().allocateMultiple(singleAlloc, bc.getLength());
LlapMemoryBuffer buffer = singleAlloc[0];
cache.notifyReused(buffer);
ByteBuffer dest = buffer.getByteBufferRaw();