You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/08/29 22:42:32 UTC
[2/4] hive git commit: HIVE-14621 : LLAP: memory.mode = none has NPE
(Sergey Shelukhin, reviewed by Prasanth Jayachandran)
HIVE-14621 : LLAP: memory.mode = none has NPE (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/406e935f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/406e935f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/406e935f
Branch: refs/heads/master
Commit: 406e935f27f60bb01c53d54bdb2c91429c95207e
Parents: 0705323
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Aug 29 15:37:36 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Aug 29 15:37:36 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 6 ++--
.../hive/llap/cache/EvictionDispatcher.java | 4 +--
.../hadoop/hive/llap/cache/LowLevelCache.java | 3 ++
.../hive/llap/cache/LowLevelCacheImpl.java | 5 +--
.../hadoop/hive/llap/cache/SimpleAllocator.java | 1 +
.../hive/llap/cache/SimpleBufferManager.java | 33 ++++++++++++++++--
.../hive/llap/io/api/impl/LlapIoImpl.java | 36 +++++++++-----------
.../llap/io/decode/OrcColumnVectorProducer.java | 3 +-
.../llap/io/encoded/OrcEncodedDataReader.java | 11 +++---
.../resources/llap-daemon-log4j2.properties | 7 ++--
10 files changed, 71 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 14a538b..cb0d96f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2733,10 +2733,10 @@ public class HiveConf extends Configuration {
"Whether the LLAP IO layer is enabled for non-vectorized queries that read inputs\n" +
"that can be vectorized"),
LLAP_IO_MEMORY_MODE("hive.llap.io.memory.mode", "cache",
- new StringSet("cache", "allocator", "none"),
+ new StringSet("cache", "none"),
"LLAP IO memory usage; 'cache' (the default) uses data and metadata cache with a\n" +
- "custom off-heap allocator, 'allocator' uses the custom allocator without the caches,\n" +
- "'none' doesn't use either (this mode may result in significant performance degradation)"),
+ "custom off-heap allocator, 'none' doesn't use either (this mode may result in\n" +
+ "significant performance degradation)"),
LLAP_ALLOCATOR_MIN_ALLOC("hive.llap.io.allocator.alloc.min", "16Kb", new SizeValidator(),
"Minimum allocation possible from LLAP buddy allocator. Allocations below that are\n" +
"padded to minimum allocation. For ORC, should generally be the same as the expected\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
index 91932e2..b6fd3e3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
@@ -26,10 +26,10 @@ import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
* Eviction dispatcher - uses double dispatch to route eviction notifications to correct caches.
*/
public final class EvictionDispatcher implements EvictionListener {
- private final LowLevelCacheImpl dataCache;
+ private final LowLevelCache dataCache;
private final OrcMetadataCache metadataCache;
- public EvictionDispatcher(LowLevelCacheImpl dataCache, OrcMetadataCache metadataCache) {
+ public EvictionDispatcher(LowLevelCache dataCache, OrcMetadataCache metadataCache) {
this.dataCache = dataCache;
this.metadataCache = metadataCache;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
index 1b61a6e..19c589a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
@@ -59,4 +59,7 @@ public interface LowLevelCache {
*/
long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks,
long baseOffset, Priority priority, LowLevelCacheCounters qfCounters);
+
+ /** Notifies the cache that a particular buffer should be removed due to eviction. */
+ void notifyEvicted(MemoryBuffer buffer);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index 8bc675d..ea458ca 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -67,7 +67,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
this.doAssumeGranularBlocks = doAssumeGranularBlocks;
}
- public void init() {
+ public void startThreads() {
if (cleanupInterval < 0) return;
cleanupThread = new CleanupThread(cleanupInterval);
cleanupThread.start();
@@ -368,7 +368,8 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
return fake;
}
- public final void notifyEvicted(LlapDataBuffer buffer) {
+ @Override
+ public final void notifyEvicted(MemoryBuffer buffer) {
allocator.deallocateEvicted(buffer);
newEvictions.incrementAndGet();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
index 526ff22..d8f59d1 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
@@ -68,6 +68,7 @@ public final class SimpleAllocator implements Allocator, BuddyAllocatorMXBean {
LlapDataBuffer buf = (LlapDataBuffer)buffer;
ByteBuffer bb = buf.byteBuffer;
buf.byteBuffer = null;
+ if (!bb.isDirect()) return;
Field field = cleanerField;
if (field == null) return;
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
index b188c0e..d1eee04 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
@@ -20,12 +20,15 @@ package org.apache.hadoop.hive.llap.cache;
import java.util.List;
import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
-import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
-public class SimpleBufferManager implements BufferUsageManager {
+public class SimpleBufferManager implements BufferUsageManager, LowLevelCache {
private final Allocator allocator;
private final LlapDaemonCacheMetrics metrics;
@@ -73,4 +76,30 @@ public class SimpleBufferManager implements BufferUsageManager {
public Allocator getAllocator() {
return allocator;
}
+
+ @Override
+ public DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseOffset,
+ DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData) {
+ return range; // Nothing changes - no cache.
+ }
+
+ @Override
+ public long[] putFileData(Object fileKey, DiskRange[] ranges,
+ MemoryBuffer[] chunks, long baseOffset, Priority priority,
+ LowLevelCacheCounters qfCounters) {
+ for (int i = 0; i < chunks.length; ++i) {
+ LlapDataBuffer buffer = (LlapDataBuffer)chunks[i];
+ if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.LOCKING_LOGGER.trace("Locking {} at put time (no cache)", buffer);
+ }
+ boolean canLock = lockBuffer(buffer);
+ assert canLock;
+ }
+ return null;
+ }
+
+ @Override
+ public void notifyEvicted(MemoryBuffer buffer) {
+ throw new UnsupportedOperationException("Buffer manager doesn't have cache");
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 9deef0c..8048624 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager;
import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
@@ -71,7 +72,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
public static final Logger CACHE_LOGGER = LoggerFactory.getLogger("LlapIoCache");
public static final Logger LOCKING_LOGGER = LoggerFactory.getLogger("LlapIoLocking");
- private static final String MODE_CACHE = "cache", MODE_ALLOCATOR = "allocator";
+ private static final String MODE_CACHE = "cache";
private final ColumnVectorProducer cvp;
private final ExecutorService executor;
@@ -82,9 +83,8 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
private LlapIoImpl(Configuration conf) throws IOException {
String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE);
- boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode),
- useAllocOnly = !useLowLevelCache && LlapIoImpl.MODE_ALLOCATOR.equalsIgnoreCase(ioMode);
- LOG.info("Initializing LLAP IO in {} mode", ioMode);
+ boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode);
+ LOG.info("Initializing LLAP IO in {} mode", useLowLevelCache ? LlapIoImpl.MODE_CACHE : "none");
String displayName = "LlapDaemonCacheMetrics-" + MetricsUtils.getHostName();
String sessionId = conf.get("llap.daemon.metrics.sessionid");
this.cacheMetrics = LlapDaemonCacheMetrics.create(displayName, sessionId);
@@ -109,7 +109,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
sessionId);
OrcMetadataCache metadataCache = null;
- LowLevelCacheImpl orcCache = null;
+ LowLevelCache cache = null;
BufferUsageManager bufferManager = null;
if (useLowLevelCache) {
// Memory manager uses cache policy to trigger evictions, so create the policy first.
@@ -122,23 +122,21 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
// Cache uses allocator to allocate and deallocate, create allocator and then caches.
EvictionAwareAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
this.allocator = allocator;
- orcCache = new LowLevelCacheImpl(cacheMetrics, cachePolicy, allocator, true);
+ LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl(
+ cacheMetrics, cachePolicy, allocator, true);
+ cache = cacheImpl;
boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
metadataCache = new OrcMetadataCache(memManager, cachePolicy, useGapCache);
// And finally cache policy uses cache to notify it of eviction. The cycle is complete!
- cachePolicy.setEvictionListener(new EvictionDispatcher(orcCache, metadataCache));
- cachePolicy.setParentDebugDumper(orcCache);
- orcCache.init(); // Start the cache threads.
- bufferManager = orcCache; // Cache also serves as buffer manager.
+ cachePolicy.setEvictionListener(new EvictionDispatcher(cache, metadataCache));
+ cachePolicy.setParentDebugDumper(cacheImpl);
+ cacheImpl.startThreads(); // Start the cache threads.
+ bufferManager = cacheImpl; // Cache also serves as buffer manager.
} else {
- if (useAllocOnly) {
- LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
- conf, null, cacheMetrics);
- allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
- } else {
- allocator = new SimpleAllocator(conf);
- }
- bufferManager = new SimpleBufferManager(allocator, cacheMetrics);
+ this.allocator = new SimpleAllocator(conf);
+ SimpleBufferManager sbm = new SimpleBufferManager(allocator, cacheMetrics);
+ bufferManager = sbm;
+ cache = sbm;
}
// IO thread pool. Listening is used for unhandled errors for now (TODO: remove?)
int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
@@ -148,7 +146,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build());
// TODO: this should depends on input format and be in a map, or something.
this.cvp = new OrcColumnVectorProducer(
- metadataCache, orcCache, bufferManager, conf, cacheMetrics, ioMetrics);
+ metadataCache, cache, bufferManager, conf, cacheMetrics, ioMetrics);
LOG.info("LLAP IO initialized");
registerMXBeans();
http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index 0a8e3df..12275ac 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
import org.apache.hadoop.hive.llap.cache.LowLevelCache;
-import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
@@ -48,7 +47,7 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
private LlapDaemonIOMetrics ioMetrics;
public OrcColumnVectorProducer(OrcMetadataCache metadataCache,
- LowLevelCacheImpl lowLevelCache, BufferUsageManager bufferManager,
+ LowLevelCache lowLevelCache, BufferUsageManager bufferManager,
Configuration conf, LlapDaemonCacheMetrics cacheMetrics, LlapDaemonIOMetrics ioMetrics) {
LlapIoImpl.LOG.info("Initializing ORC column vector producer");
http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 0d212ec..eb8ee6c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -835,12 +835,11 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
@Override
public DiskRangeList getFileData(Object fileKey, DiskRangeList range,
long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) {
- DiskRangeList result = (lowLevelCache == null) ? range
- : lowLevelCache.getFileData(fileKey, range, baseOffset, factory, counters, gotAllData);
+ DiskRangeList result = lowLevelCache.getFileData(
+ fileKey, range, baseOffset, factory, counters, gotAllData);
if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
- LlapIoImpl.ORC_LOGGER.trace("Disk ranges after data cache (file " + fileKey
- + ", base offset " + baseOffset + "): "
- + RecordReaderUtils.stringifyDiskRanges(range.next));
+ LlapIoImpl.ORC_LOGGER.trace("Disk ranges after data cache (file " + fileKey +
+ ", base offset " + baseOffset + "): " + RecordReaderUtils.stringifyDiskRanges(range));
}
if (gotAllData.value) return result;
return (metadataCache == null) ? range
@@ -851,7 +850,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
public long[] putFileData(Object fileKey, DiskRange[] ranges,
MemoryBuffer[] data, long baseOffset) {
if (data != null) {
- return (lowLevelCache == null) ? null : lowLevelCache.putFileData(
+ return lowLevelCache.putFileData(
fileKey, ranges, data, baseOffset, Priority.NORMAL, counters);
} else if (metadataCache != null) {
metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset);
http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/main/resources/llap-daemon-log4j2.properties
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/llap-daemon-log4j2.properties b/llap-server/src/main/resources/llap-daemon-log4j2.properties
index 422a92a..0c953d1 100644
--- a/llap-server/src/main/resources/llap-daemon-log4j2.properties
+++ b/llap-server/src/main/resources/llap-daemon-log4j2.properties
@@ -100,7 +100,10 @@ appender.query-routing.routes.route-mdc.file-mdc.app.layout.type = PatternLayout
appender.query-routing.routes.route-mdc.file-mdc.app.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n
# list of all loggers
-loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger, LlapIoImpl, LlapIoOrc, LlapIoCache, LlapIoLocking
+loggers = EncodedReader, NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger, LlapIoImpl, LlapIoOrc, LlapIoCache, LlapIoLocking
+
+logger.EncodedReader.name = org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl
+logger.EncodedReader.level = INFO
logger.LlapIoImpl.name = LlapIoImpl
logger.LlapIoImpl.level = INFO
@@ -109,7 +112,7 @@ logger.LlapIoOrc.name = LlapIoOrc
logger.LlapIoOrc.level = WARN
logger.LlapIoCache.name = LlapIoCache
-logger.LlapIOCache.level = WARN
+logger.LlapIoCache.level = WARN
logger.LlapIoLocking.name = LlapIoLocking
logger.LlapIoLocking.level = WARN