You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/31 21:17:20 UTC
hive git commit: HIVE-20244 : forward port HIVE-19704 to master
(Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master 87ce36b45 -> 5b2124b99
HIVE-20244 : forward port HIVE-19704 to master (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5b2124b9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5b2124b9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5b2124b9
Branch: refs/heads/master
Commit: 5b2124b99fee059d3e8ce71e8ac646a9faed84d5
Parents: 87ce36b
Author: sergey <se...@apache.org>
Authored: Tue Jul 31 14:14:18 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Tue Jul 31 14:14:18 2018 -0700
----------------------------------------------------------------------
.../hadoop/hive/llap/cache/BuddyAllocator.java | 17 +++--
.../llap/cache/LowLevelCacheMemoryManager.java | 51 +++++++------
.../hadoop/hive/llap/cache/MemoryManager.java | 4 +-
.../llap/io/encoded/OrcEncodedDataReader.java | 14 ++--
.../llap/io/encoded/SerDeEncodedDataReader.java | 34 ++++++---
.../hive/llap/io/metadata/MetadataCache.java | 76 ++++++++++++--------
.../hive/llap/cache/TestBuddyAllocator.java | 3 +-
.../llap/cache/TestLowLevelLrfuCachePolicy.java | 12 ++--
.../hive/llap/cache/TestOrcMetadataCache.java | 19 +++--
.../hadoop/hive/llap/LlapCacheAwareFs.java | 2 +
.../hive/ql/io/orc/encoded/EncodedReader.java | 3 +
.../ql/io/orc/encoded/EncodedReaderImpl.java | 31 +++++---
.../ql/io/orc/encoded/StoppableAllocator.java | 30 ++++++++
.../vector/VectorizedParquetRecordReader.java | 5 +-
.../hive/common/io/FileMetadataCache.java | 17 +++--
15 files changed, 223 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index e3ce2e7..a27964f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.llap.cache;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
@@ -45,9 +46,10 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator;
public final class BuddyAllocator
- implements EvictionAwareAllocator, BuddyAllocatorMXBean, LlapIoDebugDump {
+ implements EvictionAwareAllocator, StoppableAllocator, BuddyAllocatorMXBean, LlapIoDebugDump {
private final Arena[] arenas;
private final AtomicInteger allocatedArenas = new AtomicInteger(0);
@@ -224,16 +226,23 @@ public final class BuddyAllocator
return (int)arenaSizeVal;
}
+
+ @VisibleForTesting
@Override
public void allocateMultiple(MemoryBuffer[] dest, int size)
throws AllocatorOutOfMemoryException {
- allocateMultiple(dest, size, null);
+ allocateMultiple(dest, size, null, null);
}
- // TODO: would it make sense to return buffers asynchronously?
@Override
public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory)
throws AllocatorOutOfMemoryException {
+ allocateMultiple(dest, size, factory, null);
+ }
+
+ @Override
+ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory, AtomicBoolean isStopped)
+ throws AllocatorOutOfMemoryException {
assert size > 0 : "size is " + size;
if (size > maxAllocation) {
throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation);
@@ -243,7 +252,7 @@ public final class BuddyAllocator
int allocationSize = 1 << allocLog2;
// If using async, we could also reserve one by one.
- memoryManager.reserveMemory(dest.length << allocLog2);
+ memoryManager.reserveMemory(dest.length << allocLog2, isStopped);
for (int i = 0; i < dest.length; ++i) {
if (dest[i] != null) continue;
// Note: this is backward compat only. Should be removed with createUnallocated.
http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index 4297cfc..c5b5bf2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -18,14 +18,14 @@
package org.apache.hadoop.hive.llap.cache;
-import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Implementation of memory manager for low level cache. Note that memory is released during
* reserve most of the time, by calling the evictor to evict some memory. releaseMemory is
@@ -49,21 +49,28 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
}
}
+ public static class ReserveFailedException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+ public ReserveFailedException(AtomicBoolean isStopped) {
+ super("Cannot reserve memory"
+ + (Thread.currentThread().isInterrupted() ? "; thread interrupted" : "")
+ + ((isStopped != null && isStopped.get()) ? "; thread stopped" : ""));
+ }
+ }
@Override
- public void reserveMemory(final long memoryToReserve) {
- boolean result = reserveMemory(memoryToReserve, true);
+ public void reserveMemory(final long memoryToReserve, AtomicBoolean isStopped) {
+ boolean result = reserveMemory(memoryToReserve, true, isStopped);
if (result) return;
// Can only happen if there's no evictor, or if thread is interrupted.
- throw new RuntimeException("Cannot reserve memory"
- + (Thread.currentThread().isInterrupted() ? "; thread interrupted" : ""));
+ throw new ReserveFailedException(isStopped);
}
@VisibleForTesting
- public boolean reserveMemory(final long memoryToReserve, boolean waitForEviction) {
+ public boolean reserveMemory(final long memoryToReserve,
+ boolean waitForEviction, AtomicBoolean isStopped) {
// TODO: if this cannot evict enough, it will spin infinitely. Terminate at some point?
int badCallCount = 0;
- int nextLog = 4;
long evictedTotalMetric = 0, reservedTotalMetric = 0, remainingToReserve = memoryToReserve;
boolean result = true;
while (remainingToReserve > 0) {
@@ -79,21 +86,23 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
// TODO: for one-block case, we could move notification for the last block out of the loop.
long evicted = evictor.evictSomeBlocks(remainingToReserve);
if (evicted == 0) {
+ ++badCallCount;
if (!waitForEviction) {
result = false;
break;
}
- ++badCallCount;
- if (badCallCount == nextLog) {
- LlapIoImpl.LOG.warn("Cannot evict blocks for " + badCallCount + " calls; cache full?");
- nextLog <<= 1;
- try {
- Thread.sleep(Math.min(1000, nextLog));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- result = false;
- break;
- }
+
+ if (isStopped != null && isStopped.get()) {
+ result = false;
+ break;
+ }
+ try {
+ Thread.sleep(badCallCount > 9 ? 1000 : (1 << badCallCount));
+ } catch (InterruptedException e) {
+ LlapIoImpl.LOG.warn("Thread interrupted"); // We currently don't expect this.
+ Thread.currentThread().interrupt();
+ result = false;
+ break;
}
continue;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
index 542041d..fedade5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.hive.llap.cache;
+import java.util.concurrent.atomic.AtomicBoolean;
+
public interface MemoryManager {
void releaseMemory(long memUsage);
void updateMaxSize(long maxSize);
- void reserveMemory(long memoryToReserve);
+ void reserveMemory(long memoryToReserve, AtomicBoolean isStopped);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index b76b0de..e8a3b40 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -23,6 +23,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -173,7 +174,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
* Contains only stripes that are read, and only columns included. null => read all RGs.
*/
private boolean[][] stripeRgs;
- private volatile boolean isStopped = false;
+ private AtomicBoolean isStopped = new AtomicBoolean(false);
@SuppressWarnings("unused")
private volatile boolean isPaused = false;
@@ -240,7 +241,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
@Override
public void stop() {
LOG.debug("Encoded reader is being stopped");
- isStopped = true;
+ isStopped.set(true);
}
@Override
@@ -436,6 +437,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
stripeReader = orcReader.encodedReader(
fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool, cacheTag);
stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled());
+ stripeReader.setStopped(isStopped);
}
private void recordReaderTime(long startTime) {
@@ -454,7 +456,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
}
private boolean processStop() {
- if (!isStopped) return false;
+ if (!isStopped.get()) return false;
LOG.info("Encoded data reader is stopping");
tracePool.offer(trace);
cleanupReaders();
@@ -584,7 +586,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
ensureOrcReader();
ByteBuffer tailBufferBb = orcReader.getSerializedFileFooter();
if (hasCache) {
- tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb, cacheTag);
+ tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb, cacheTag, isStopped);
metadataCache.decRefBuffer(tailBuffers); // We don't use the cache's copy of the buffer.
}
FileTail ft = orcReader.getFileTail();
@@ -677,7 +679,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
assert footerRange.next == null; // Can only happens w/zcr for a single input buffer.
if (hasCache) {
LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail(
- stripeKey, footerRange.getData().duplicate(), cacheTag);
+ stripeKey, footerRange.getData().duplicate(), cacheTag, isStopped);
metadataCache.decRefBuffer(cacheBuf); // We don't use this one.
}
ByteBuffer bb = footerRange.getData().duplicate();
@@ -918,7 +920,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
return lowLevelCache.putFileData(
fileKey, ranges, data, baseOffset, Priority.NORMAL, counters, tag);
} else if (metadataCache != null) {
- metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset);
+ metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset, isStopped);
}
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 5b54af5..2576175 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -68,6 +69,7 @@ import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.Writer;
import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -150,7 +152,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private final String cacheTag;
private final FileSystem fs;
- private volatile boolean isStopped = false;
+ private AtomicBoolean isStopped = new AtomicBoolean(false);
private final Deserializer sourceSerDe;
private final InputFormat<?, ?> sourceInputFormat;
private final Reporter reporter;
@@ -245,7 +247,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
@Override
public void stop() {
LlapIoImpl.LOG.debug("Encoded reader is being stopped");
- isStopped = true;
+ isStopped.set(true);
}
@Override
@@ -344,16 +346,18 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private final Map<StreamName, OutputReceiver> streams = new HashMap<>();
private final Map<Integer, List<CacheOutputReceiver>> colStreams = new HashMap<>();
private final boolean doesSourceHaveIncludes;
+ private final AtomicBoolean isStopped;
public CacheWriter(BufferUsageManager bufferManager, List<Integer> columnIds,
boolean[] writerIncludes, boolean doesSourceHaveIncludes,
- Allocator.BufferObjectFactory bufferFactory) {
+ Allocator.BufferObjectFactory bufferFactory, AtomicBoolean isStopped) {
this.bufferManager = bufferManager;
assert writerIncludes != null; // Taken care of on higher level.
this.writerIncludes = writerIncludes;
this.doesSourceHaveIncludes = doesSourceHaveIncludes;
this.columnIds = columnIds;
this.bufferFactory = bufferFactory;
+ this.isStopped = isStopped;
startStripe();
}
@@ -440,7 +444,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
if (LlapIoImpl.LOG.isTraceEnabled()) {
LlapIoImpl.LOG.trace("Creating cache receiver for " + name);
}
- CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, bufferFactory, name);
+ CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, name, bufferFactory, isStopped);
or = cor;
List<CacheOutputReceiver> list = colStreams.get(name.getColumn());
if (list == null) {
@@ -597,12 +601,17 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private List<MemoryBuffer> buffers = null;
private int lastBufferPos = -1;
private boolean suppressed = false;
+ private final AtomicBoolean isStopped;
+ private final StoppableAllocator allocator;
public CacheOutputReceiver(BufferUsageManager bufferManager,
- BufferObjectFactory bufferFactory, StreamName name) {
+ StreamName name, BufferObjectFactory bufferFactory, AtomicBoolean isStopped) {
this.bufferManager = bufferManager;
this.bufferFactory = bufferFactory;
+ Allocator alloc = bufferManager.getAllocator();
+ this.allocator = alloc instanceof StoppableAllocator ? (StoppableAllocator) alloc : null;
this.name = name;
+ this.isStopped = isStopped;
}
public void clear() {
@@ -617,6 +626,15 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
lastBufferPos = -1;
}
+ private void allocateMultiple(MemoryBuffer[] dest, int size) {
+ if (allocator != null) {
+ allocator.allocateMultiple(dest, size, bufferFactory, isStopped);
+ } else {
+ bufferManager.getAllocator().allocateMultiple(dest, size, bufferFactory);
+ }
+ }
+
+
@Override
public void output(ByteBuffer buffer) throws IOException {
// TODO: avoid put() by working directly in OutStream?
@@ -640,7 +658,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
boolean isNewBuffer = (lastBufferPos == -1);
if (isNewBuffer) {
MemoryBuffer[] dest = new MemoryBuffer[1];
- bufferManager.getAllocator().allocateMultiple(dest, size, bufferFactory);
+ allocateMultiple(dest, size);
LlapSerDeDataBuffer newBuffer = (LlapSerDeDataBuffer)dest[0];
bb = newBuffer.getByteBufferRaw();
lastBufferPos = bb.position();
@@ -1417,7 +1435,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
// TODO: move this into ctor? EW would need to create CacheWriter then
List<Integer> cwColIds = writer.isOnlyWritingIncludedColumns() ? splitColumnIds : columnIds;
writer.init(new CacheWriter(bufferManager, cwColIds, splitIncludes,
- writer.isOnlyWritingIncludedColumns(), bufferFactory), daemonConf, split.getPath());
+ writer.isOnlyWritingIncludedColumns(), bufferFactory, isStopped), daemonConf, split.getPath());
if (writer instanceof VectorDeserializeOrcWriter) {
VectorDeserializeOrcWriter asyncWriter = (VectorDeserializeOrcWriter)writer;
asyncWriter.startAsync(new AsyncCacheDataCallback());
@@ -1673,7 +1691,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
private boolean processStop() {
- if (!isStopped) return false;
+ if (!isStopped.get()) return false;
LlapIoImpl.LOG.info("SerDe-based data reader is stopping");
cleanup(true);
return true;
http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
index 426d599..2b3bca6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
@@ -28,11 +28,13 @@ import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
import org.apache.hadoop.hive.llap.cache.LlapAllocatorBuffer;
@@ -43,6 +45,7 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
+import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator;
public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
private final ConcurrentHashMap<Object, LlapBufferOrBuffers> metadata =
@@ -51,10 +54,10 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
private final ConcurrentHashMap<Object, OrcFileEstimateErrors> estimateErrors;
private final MemoryManager memoryManager;
private final LowLevelCachePolicy policy;
- private final EvictionAwareAllocator allocator;
+ private final BuddyAllocator allocator;
private final LlapDaemonCacheMetrics metrics;
- public MetadataCache(EvictionAwareAllocator allocator, MemoryManager memoryManager,
+ public MetadataCache(BuddyAllocator allocator, MemoryManager memoryManager,
LowLevelCachePolicy policy, boolean useEstimateCache, LlapDaemonCacheMetrics metrics) {
this.memoryManager = memoryManager;
this.allocator = allocator;
@@ -64,7 +67,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
? new ConcurrentHashMap<Object, OrcFileEstimateErrors>() : null;
}
- public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset) {
+ public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset, AtomicBoolean isStopped) {
if (estimateErrors == null) return;
OrcFileEstimateErrors errorData = estimateErrors.get(fileKey);
boolean isNew = false;
@@ -76,7 +79,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
errorData.addError(range.getOffset(), range.getLength(), baseOffset);
}
long memUsage = errorData.estimateMemoryUsage();
- memoryManager.reserveMemory(memUsage);
+ memoryManager.reserveMemory(memUsage, isStopped);
OrcFileEstimateErrors old = estimateErrors.putIfAbsent(fileKey, errorData);
if (old != null) {
errorData = old;
@@ -150,34 +153,49 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
}
@Override
- public LlapBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer) {
- return putInternal(fileKey, tailBuffer, null);
+ public MemoryBufferOrBuffers putFileMetadata(Object fileKey,
+ ByteBuffer tailBuffer) {
+ return putInternal(fileKey, tailBuffer, null, null);
}
@Override
- public LlapBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag) {
- return putInternal(fileKey, tailBuffer, tag);
+ public MemoryBufferOrBuffers putFileMetadata(Object fileKey,
+ ByteBuffer tailBuffer, String tag) {
+ return putInternal(fileKey, tailBuffer, tag, null);
+ }
+
+ @Override
+ public MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length,
+ InputStream is) throws IOException {
+ return putFileMetadata(fileKey, length, is, null, null);
}
public LlapBufferOrBuffers putStripeTail(
- OrcBatchKey stripeKey, ByteBuffer tailBuffer, String tag) {
- return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer, tag);
+ OrcBatchKey stripeKey, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) {
+ return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer, tag, isStopped);
}
@Override
- public LlapBufferOrBuffers putFileMetadata(
- Object fileKey, int length, InputStream is) throws IOException {
- return putFileMetadata(fileKey, length, is, null);
+ public MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length,
+ InputStream is, String tag) throws IOException {
+ return putFileMetadata(fileKey, length, is, tag, null);
+ }
+
+
+ @Override
+ public LlapBufferOrBuffers putFileMetadata(Object fileKey,
+ ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) {
+ return putInternal(fileKey, tailBuffer, tag, isStopped);
}
@Override
- public LlapBufferOrBuffers putFileMetadata(
- Object fileKey, int length, InputStream is, String tag) throws IOException {
+ public LlapBufferOrBuffers putFileMetadata(Object fileKey, int length, InputStream is,
+ String tag, AtomicBoolean isStopped) throws IOException {
LlapBufferOrBuffers result = null;
while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
LlapBufferOrBuffers oldVal = metadata.get(fileKey);
if (oldVal == null) {
- result = wrapBbForFile(result, fileKey, length, is, tag);
+ result = wrapBbForFile(result, fileKey, length, is, tag, isStopped);
if (!lockBuffer(result, false)) {
throw new AssertionError("Cannot lock a newly created value " + result);
}
@@ -198,7 +216,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
@SuppressWarnings({ "rawtypes", "unchecked" })
private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result,
- Object fileKey, int length, InputStream stream, String tag) throws IOException {
+ Object fileKey, int length, InputStream stream, String tag, AtomicBoolean isStopped) throws IOException {
if (result != null) return result;
int maxAlloc = allocator.getMaxAllocation();
LlapMetadataBuffer<Object>[] largeBuffers = null;
@@ -207,7 +225,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
for (int i = 0; i < largeBuffers.length; ++i) {
largeBuffers[i] = new LlapMetadataBuffer<Object>(fileKey, tag);
}
- allocator.allocateMultiple(largeBuffers, maxAlloc, null);
+ allocator.allocateMultiple(largeBuffers, maxAlloc, null, isStopped);
for (int i = 0; i < largeBuffers.length; ++i) {
readIntoCacheBuffer(stream, maxAlloc, largeBuffers[i]);
}
@@ -218,7 +236,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
} else {
LlapMetadataBuffer<Object>[] smallBuffer = new LlapMetadataBuffer[1];
smallBuffer[0] = new LlapMetadataBuffer(fileKey, tag);
- allocator.allocateMultiple(smallBuffer, length, null);
+ allocator.allocateMultiple(smallBuffer, length, null, isStopped);
readIntoCacheBuffer(stream, smallSize, smallBuffer[0]);
if (largeBuffers == null) {
return smallBuffer[0]; // This is the overwhelmingly common case.
@@ -243,12 +261,12 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
bb.position(pos);
}
- private <T> LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, String tag) {
+ private <T> LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) {
LlapBufferOrBuffers result = null;
while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
LlapBufferOrBuffers oldVal = metadata.get(key);
if (oldVal == null) {
- result = wrapBb(result, key, tailBuffer, tag);
+ result = wrapBb(result, key, tailBuffer, tag, isStopped);
oldVal = metadata.putIfAbsent(key, result);
if (oldVal == null) {
cacheInPolicy(result); // Cached successfully, add to policy.
@@ -306,11 +324,11 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
}
private <T> LlapBufferOrBuffers wrapBb(
- LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, String tag) {
+ LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) {
if (result != null) return result;
if (tailBuffer.remaining() <= allocator.getMaxAllocation()) {
// The common case by far.
- return wrapSmallBb(new LlapMetadataBuffer<T>(key, tag), tailBuffer);
+ return wrapSmallBb(new LlapMetadataBuffer<T>(key, tag), tailBuffer, isStopped);
} else {
int allocCount = determineAllocCount(tailBuffer);
@SuppressWarnings("unchecked")
@@ -318,22 +336,24 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
for (int i = 0; i < allocCount; ++i) {
results[i] = new LlapMetadataBuffer<T>(key, tag);
}
- wrapLargeBb(results, tailBuffer);
+ wrapLargeBb(results, tailBuffer, isStopped);
return new LlapMetadataBuffers<T>(results);
}
}
- private <T extends LlapAllocatorBuffer> T wrapSmallBb(T result, ByteBuffer tailBuffer) {
+ private <T extends LlapAllocatorBuffer> T wrapSmallBb(T result, ByteBuffer tailBuffer,
+ AtomicBoolean isStopped) {
// Note: we pass in null factory because we allocate objects here. We could also pass a
// per-call factory that would set fileKey; or set it after put.
- allocator.allocateMultiple(new MemoryBuffer[] { result }, tailBuffer.remaining(), null);
+ allocator.allocateMultiple(new MemoryBuffer[] { result }, tailBuffer.remaining(), null, isStopped);
return putBufferToDest(tailBuffer.duplicate(), result);
}
- private <T extends LlapAllocatorBuffer> void wrapLargeBb(T[] results, ByteBuffer tailBuffer) {
+ private <T extends LlapAllocatorBuffer> void wrapLargeBb(T[] results, ByteBuffer tailBuffer,
+ AtomicBoolean isStopped) {
// Note: we pass in null factory because we allocate objects here. We could also pass a
// per-call factory that would set fileKey; or set it after put.
- allocator.allocateMultiple(results, allocator.getMaxAllocation(), null);
+ allocator.allocateMultiple(results, allocator.getMaxAllocation(), null, isStopped);
ByteBuffer src = tailBuffer.duplicate();
int pos = src.position(), remaining = src.remaining();
for (int i = 0; i < results.length; ++i) {
http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 1e6f3ac..b3179c0 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hive.common.io.Allocator.AllocatorOutOfMemoryException;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
@@ -58,7 +59,7 @@ public class TestBuddyAllocator {
static class DummyMemoryManager implements MemoryManager {
@Override
- public void reserveMemory(long memoryToReserve) {
+ public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) {
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index 6eb2eb5..923042d 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -85,7 +85,7 @@ public class TestLowLevelLrfuCachePolicy {
listLock.unlock();
}
// Now try to evict with locked buffer still in the list.
- mm.reserveMemory(1, false);
+ mm.reserveMemory(1, false, null);
assertSame(buffer2, et.evicted.get(0));
unlock(lrfu, buffer1);
}
@@ -237,7 +237,7 @@ public class TestLowLevelLrfuCachePolicy {
// Lock the lowest priority buffer; try to evict - we'll evict some other buffer.
LlapDataBuffer locked = inserted.get(0);
lock(lrfu, locked);
- mm.reserveMemory(1, false);
+ mm.reserveMemory(1, false, null);
LlapDataBuffer evicted = et.evicted.get(0);
assertNotNull(evicted);
assertTrue(evicted.isInvalid());
@@ -248,7 +248,7 @@ public class TestLowLevelLrfuCachePolicy {
// Buffers in test are fakes not linked to cache; notify cache policy explicitly.
public boolean cache(LowLevelCacheMemoryManager mm,
LowLevelLrfuCachePolicy lrfu, EvictionTracker et, LlapDataBuffer buffer) {
- if (mm != null && !mm.reserveMemory(1, false)) {
+ if (mm != null && !mm.reserveMemory(1, false, null)) {
return false;
}
buffer.incRef();
@@ -337,7 +337,7 @@ public class TestLowLevelLrfuCachePolicy {
lock(lrfu, buf);
}
assertEquals(heapSize, m.cacheUsed.get());
- assertFalse(mm.reserveMemory(1, false));
+ assertFalse(mm.reserveMemory(1, false, null));
if (!et.evicted.isEmpty()) {
assertTrue("Got " + et.evicted.get(0), et.evicted.isEmpty());
}
@@ -362,13 +362,13 @@ public class TestLowLevelLrfuCachePolicy {
// Evict all blocks.
et.evicted.clear();
for (int i = 0; i < inserted.size(); ++i) {
- assertTrue(mm.reserveMemory(1, false));
+ assertTrue(mm.reserveMemory(1, false, null));
if (cacheUsed != null) {
assertEquals(inserted.size(), cacheUsed.get());
}
}
// The map should now be empty.
- assertFalse(mm.reserveMemory(1, false));
+ assertFalse(mm.reserveMemory(1, false, null));
if (cacheUsed != null) {
assertEquals(inserted.size(), cacheUsed.get());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
index df20f20..aa9d6ed 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
@@ -21,10 +21,14 @@ import static org.junit.Assert.*;
import java.nio.ByteBuffer;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers;
@@ -75,8 +79,11 @@ public class TestOrcMetadataCache {
}
private static class DummyMemoryManager implements MemoryManager {
+ private int allocs;
+
@Override
- public void reserveMemory(long memoryToReserve) {
+ public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) {
+ ++allocs;
}
@Override
@@ -102,11 +109,11 @@ public class TestOrcMetadataCache {
ByteBuffer smallBuffer = ByteBuffer.allocate(MAX_ALLOC - 1);
rdm.nextBytes(smallBuffer.array());
- LlapBufferOrBuffers result = cache.putFileMetadata(fileKey1, smallBuffer);
+ LlapBufferOrBuffers result = cache.putFileMetadata(fileKey1, smallBuffer, null, null);
cache.decRefBuffer(result);
ByteBuffer cacheBuf = result.getSingleBuffer().getByteBufferDup();
assertEquals(smallBuffer, cacheBuf);
- result = cache.putFileMetadata(fileKey1, smallBuffer);
+ result = cache.putFileMetadata(fileKey1, smallBuffer, null, null);
cache.decRefBuffer(result);
cacheBuf = result.getSingleBuffer().getByteBufferDup();
assertEquals(smallBuffer, cacheBuf);
@@ -120,7 +127,7 @@ public class TestOrcMetadataCache {
ByteBuffer largeBuffer = ByteBuffer.allocate((int)(MAX_ALLOC * 2.5));
rdm.nextBytes(largeBuffer.array());
- result = cache.putFileMetadata(fileKey1, largeBuffer);
+ result = cache.putFileMetadata(fileKey1, largeBuffer, null, null);
cache.decRefBuffer(result);
assertNull(result.getSingleBuffer());
assertEquals(largeBuffer, extractResultBbs(result));
@@ -162,13 +169,13 @@ public class TestOrcMetadataCache {
Object fileKey1 = new Object();
// Note: incomplete CBs are always an exact match.
- cache.putIncompleteCbs(fileKey1, new DiskRange[] { new DiskRangeList(0, 3) }, 0);
+ cache.putIncompleteCbs(fileKey1, new DiskRange[] { new DiskRangeList(0, 3) }, 0, null);
cp.verifyEquals(1);
DiskRangeList result = cache.getIncompleteCbs(
fileKey1, new DiskRangeList(0, 3), 0, gotAllData);
assertTrue(gotAllData.value);
verifyResult(result, INCOMPLETE, 0, 3);
- cache.putIncompleteCbs(fileKey1, new DiskRange[] { new DiskRangeList(5, 6) }, 0);
+ cache.putIncompleteCbs(fileKey1, new DiskRange[] { new DiskRangeList(5, 6) }, 0, null);
cp.verifyEquals(3);
DiskRangeList ranges = new DiskRangeList(0, 3);
ranges.insertAfter(new DiskRangeList(4, 6));
http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
index dcb24b80..8370aa6 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
@@ -284,6 +284,7 @@ public class LlapCacheAwareFs extends FileSystem {
int extraOffsetInChunk = 0;
if (maxAlloc < chunkLength) {
largeBuffers = new MemoryBuffer[largeBufCount];
+ // Note: we don't use StoppableAllocator here - this is not on an IO thread.
allocator.allocateMultiple(largeBuffers, maxAlloc, cache.getDataBufferFactory());
for (int i = 0; i < largeBuffers.length; ++i) {
// By definition here we copy up to the limit of the buffer.
@@ -301,6 +302,7 @@ public class LlapCacheAwareFs extends FileSystem {
largeBuffers = null;
if (smallSize > 0) {
smallBuffer = new MemoryBuffer[1];
+ // Note: we don't use StoppableAllocator here - this is not on an IO thread.
allocator.allocateMultiple(smallBuffer, smallSize, cache.getDataBufferFactory());
ByteBuffer bb = smallBuffer[0].getByteBufferRaw();
copyDiskDataToCacheBuffer(array,
http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
index f6b949e..f3699f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io.orc.encoded;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.orc.StripeInformation;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
@@ -68,4 +69,6 @@ public interface EncodedReader {
void readIndexStreams(OrcIndex index, StripeInformation stripe,
List<OrcProto.Stream> streams, boolean[] included, boolean[] sargColumns)
throws IOException;
+
+ void setStopped(AtomicBoolean isStopped);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 63d1387..1b11e0e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -25,11 +25,13 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.Pool;
import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
+import org.apache.hadoop.hive.common.io.Allocator;
import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
@@ -146,6 +148,8 @@ class EncodedReaderImpl implements EncodedReader {
private final TypeDescription fileSchema;
private final WriterVersion version;
private final String tag;
+ private AtomicBoolean isStopped;
+ private StoppableAllocator allocator;
public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types,
TypeDescription fileSchema, org.apache.orc.CompressionKind kind, WriterVersion version,
@@ -162,6 +166,8 @@ class EncodedReaderImpl implements EncodedReader {
this.bufferSize = bufferSize;
this.rowIndexStride = strideRate;
this.cacheWrapper = cacheWrapper;
+ Allocator alloc = cacheWrapper.getAllocator();
+ this.allocator = alloc instanceof StoppableAllocator ? (StoppableAllocator) alloc : null;
this.dataReader = dataReader;
this.trace = trace;
this.tag = tag;
@@ -897,8 +903,7 @@ class EncodedReaderImpl implements EncodedReader {
}
boolean isAllocated = false;
try {
- cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize,
- cacheWrapper.getDataBufferFactory());
+ allocateMultiple(targetBuffers, bufferSize);
isAllocated = true;
} finally {
// toDecompress/targetBuffers contents are actually already added to some structures that
@@ -1206,8 +1211,7 @@ class EncodedReaderImpl implements EncodedReader {
cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
++ix;
}
- cacheWrapper.getAllocator().allocateMultiple(targetBuffers,
- (int)(partCount == 1 ? streamLen : partSize), cacheWrapper.getDataBufferFactory());
+ allocateMultiple(targetBuffers, (int)(partCount == 1 ? streamLen : partSize));
// 4. Now copy the data into cache buffers.
ix = 0;
@@ -1260,8 +1264,7 @@ class EncodedReaderImpl implements EncodedReader {
// non-cached. Since we are at the first gap, the previous stuff must be contiguous.
singleAlloc[0] = null;
trace.logPartialUncompressedData(partOffset, candidateEnd, true);
- cacheWrapper.getAllocator().allocateMultiple(
- singleAlloc, (int)(candidateEnd - partOffset), cacheWrapper.getDataBufferFactory());
+ allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset));
MemoryBuffer buffer = singleAlloc[0];
cacheWrapper.reuseBuffer(buffer);
ByteBuffer dest = buffer.getByteBufferRaw();
@@ -1270,12 +1273,19 @@ class EncodedReaderImpl implements EncodedReader {
return tcc;
}
+ private void allocateMultiple(MemoryBuffer[] dest, int size) {
+ if (allocator != null) {
+ allocator.allocateMultiple(dest, size, cacheWrapper.getDataBufferFactory(), isStopped);
+ } else {
+ cacheWrapper.getAllocator().allocateMultiple(dest, size, cacheWrapper.getDataBufferFactory());
+ }
+ }
+
private CacheChunk copyAndReplaceUncompressedToNonCached(
BufferChunk bc, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) {
singleAlloc[0] = null;
trace.logPartialUncompressedData(bc.getOffset(), bc.getEnd(), false);
- cacheWrapper.getAllocator().allocateMultiple(
- singleAlloc, bc.getLength(), cacheWrapper.getDataBufferFactory());
+ allocateMultiple(singleAlloc, bc.getLength());
MemoryBuffer buffer = singleAlloc[0];
cacheWrapper.reuseBuffer(buffer);
ByteBuffer dest = buffer.getByteBufferRaw();
@@ -2152,4 +2162,9 @@ class EncodedReaderImpl implements EncodedReader {
return false;
}
}
+
+ @Override
+ public void setStopped(AtomicBoolean isStopped) {
+ this.isStopped = isStopped;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
new file mode 100644
index 0000000..0806d78
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc.encoded;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+
+public interface StoppableAllocator extends Allocator {
+ /** Stoppable allocate method specific to branch-2. */
+ void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory,
+ AtomicBoolean isStopped) throws AllocatorOutOfMemoryException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index f64efe2..8c49056 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -253,7 +253,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
colsToInclude = ColumnProjectionUtils.getReadColumnIDs(configuration);
requestedSchema = DataWritableReadSupport
.getRequestedSchema(indexAccess, columnNamesList, columnTypesList, fileSchema, configuration);
-
+
Path path = wrapPathForCache(file, cacheKey, configuration, blocks, cacheTag);
this.reader = new ParquetFileReader(
configuration, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());
@@ -317,7 +317,8 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
if (LOG.isInfoEnabled()) {
LOG.info("Caching the footer of length " + footerLength + " for " + cacheKey);
}
- footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream, tag);
+ // Note: we don't pass in isStopped here - this is not on an IO thread.
+ footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream, tag, null);
try {
return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter);
} finally {
http://git-wip-us.apache.org/repos/asf/hive/blob/5b2124b9/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
index d1da7f5..e4aa888 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.common.io;
import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.io.IOException;
import java.io.InputStream;
@@ -39,6 +40,13 @@ public interface FileMetadataCache {
@Deprecated
MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer);
+ @Deprecated
+ MemoryBufferOrBuffers putFileMetadata(
+ Object fileKey, int length, InputStream is, String tag) throws IOException;
+
+ @Deprecated
+ MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag);
+
/**
* Releases the buffer returned from getFileMetadata or putFileMetadata method.
* @param buffer The buffer to release.
@@ -54,8 +62,9 @@ public interface FileMetadataCache {
* @return The buffer or buffers representing the cached footer.
* The caller must decref this buffer when done.
*/
- MemoryBufferOrBuffers putFileMetadata(
- Object fileKey, int length, InputStream is, String tag) throws IOException;
+ MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer,
+ String tag, AtomicBoolean isStopped);
- MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag);
-}
\ No newline at end of file
+ MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length,
+ InputStream is, String tag, AtomicBoolean isStopped) throws IOException;
+}