You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/08/29 22:42:32 UTC

[2/4] hive git commit: HIVE-14621 : LLAP: memory.mode = none has NPE (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

HIVE-14621 : LLAP: memory.mode = none has NPE (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/406e935f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/406e935f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/406e935f

Branch: refs/heads/master
Commit: 406e935f27f60bb01c53d54bdb2c91429c95207e
Parents: 0705323
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Aug 29 15:37:36 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Aug 29 15:37:36 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  6 ++--
 .../hive/llap/cache/EvictionDispatcher.java     |  4 +--
 .../hadoop/hive/llap/cache/LowLevelCache.java   |  3 ++
 .../hive/llap/cache/LowLevelCacheImpl.java      |  5 +--
 .../hadoop/hive/llap/cache/SimpleAllocator.java |  1 +
 .../hive/llap/cache/SimpleBufferManager.java    | 33 ++++++++++++++++--
 .../hive/llap/io/api/impl/LlapIoImpl.java       | 36 +++++++++-----------
 .../llap/io/decode/OrcColumnVectorProducer.java |  3 +-
 .../llap/io/encoded/OrcEncodedDataReader.java   | 11 +++---
 .../resources/llap-daemon-log4j2.properties     |  7 ++--
 10 files changed, 71 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 14a538b..cb0d96f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2733,10 +2733,10 @@ public class HiveConf extends Configuration {
         "Whether the LLAP IO layer is enabled for non-vectorized queries that read inputs\n" +
         "that can be vectorized"),
     LLAP_IO_MEMORY_MODE("hive.llap.io.memory.mode", "cache",
-        new StringSet("cache", "allocator", "none"),
+        new StringSet("cache", "none"),
         "LLAP IO memory usage; 'cache' (the default) uses data and metadata cache with a\n" +
-        "custom off-heap allocator, 'allocator' uses the custom allocator without the caches,\n" +
-        "'none' doesn't use either (this mode may result in significant performance degradation)"),
+        "custom off-heap allocator, 'none' doesn't use either (this mode may result in\n" +
+        "significant performance degradation)"),
     LLAP_ALLOCATOR_MIN_ALLOC("hive.llap.io.allocator.alloc.min", "16Kb", new SizeValidator(),
         "Minimum allocation possible from LLAP buddy allocator. Allocations below that are\n" +
         "padded to minimum allocation. For ORC, should generally be the same as the expected\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
index 91932e2..b6fd3e3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
@@ -26,10 +26,10 @@ import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
  * Eviction dispatcher - uses double dispatch to route eviction notifications to correct caches.
  */
 public final class EvictionDispatcher implements EvictionListener {
-  private final LowLevelCacheImpl dataCache;
+  private final LowLevelCache dataCache;
   private final OrcMetadataCache metadataCache;
 
-  public EvictionDispatcher(LowLevelCacheImpl dataCache, OrcMetadataCache metadataCache) {
+  public EvictionDispatcher(LowLevelCache dataCache, OrcMetadataCache metadataCache) {
     this.dataCache = dataCache;
     this.metadataCache = metadataCache;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
index 1b61a6e..19c589a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
@@ -59,4 +59,7 @@ public interface LowLevelCache {
    */
   long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks,
       long baseOffset, Priority priority, LowLevelCacheCounters qfCounters);
+
+  /** Notifies the cache that a particular buffer should be removed due to eviction. */
+  void notifyEvicted(MemoryBuffer buffer);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index 8bc675d..ea458ca 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -67,7 +67,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
     this.doAssumeGranularBlocks = doAssumeGranularBlocks;
   }
 
-  public void init() {
+  public void startThreads() {
     if (cleanupInterval < 0) return;
     cleanupThread = new CleanupThread(cleanupInterval);
     cleanupThread.start();
@@ -368,7 +368,8 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
     return fake;
   }
 
-  public final void notifyEvicted(LlapDataBuffer buffer) {
+  @Override
+  public final void notifyEvicted(MemoryBuffer buffer) {
     allocator.deallocateEvicted(buffer);
     newEvictions.incrementAndGet();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
index 526ff22..d8f59d1 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
@@ -68,6 +68,7 @@ public final class SimpleAllocator implements Allocator, BuddyAllocatorMXBean {
     LlapDataBuffer buf = (LlapDataBuffer)buffer;
     ByteBuffer bb = buf.byteBuffer;
     buf.byteBuffer = null;
+    if (!bb.isDirect()) return;
     Field field = cleanerField;
     if (field == null) return;
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
index b188c0e..d1eee04 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
@@ -20,12 +20,15 @@ package org.apache.hadoop.hive.llap.cache;
 import java.util.List;
 
 import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
-import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 
-public class SimpleBufferManager implements BufferUsageManager {
+public class SimpleBufferManager implements BufferUsageManager, LowLevelCache {
   private final Allocator allocator;
   private final LlapDaemonCacheMetrics metrics;
 
@@ -73,4 +76,30 @@ public class SimpleBufferManager implements BufferUsageManager {
   public Allocator getAllocator() {
     return allocator;
   }
+
+  @Override
+  public DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseOffset,
+      DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData) {
+    return range; // Nothing changes - no cache.
+  }
+
+  @Override
+  public long[] putFileData(Object fileKey, DiskRange[] ranges,
+      MemoryBuffer[] chunks, long baseOffset, Priority priority,
+      LowLevelCacheCounters qfCounters) {
+    for (int i = 0; i < chunks.length; ++i) {
+      LlapDataBuffer buffer = (LlapDataBuffer)chunks[i];
+      if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+        LlapIoImpl.LOCKING_LOGGER.trace("Locking {} at put time (no cache)", buffer);
+      }
+      boolean canLock = lockBuffer(buffer);
+      assert canLock;
+    }
+    return null;
+  }
+
+  @Override
+  public void notifyEvicted(MemoryBuffer buffer) {
+    throw new UnsupportedOperationException("Buffer manager doesn't have cache");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 9deef0c..8048624 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
 import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
 import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
 import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
 import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager;
 import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
@@ -71,7 +72,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
   public static final Logger CACHE_LOGGER = LoggerFactory.getLogger("LlapIoCache");
   public static final Logger LOCKING_LOGGER = LoggerFactory.getLogger("LlapIoLocking");
 
-  private static final String MODE_CACHE = "cache", MODE_ALLOCATOR = "allocator";
+  private static final String MODE_CACHE = "cache";
 
   private final ColumnVectorProducer cvp;
   private final ExecutorService executor;
@@ -82,9 +83,8 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
 
   private LlapIoImpl(Configuration conf) throws IOException {
     String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE);
-    boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode),
-        useAllocOnly = !useLowLevelCache && LlapIoImpl.MODE_ALLOCATOR.equalsIgnoreCase(ioMode);
-    LOG.info("Initializing LLAP IO in {} mode", ioMode);
+    boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode);
+    LOG.info("Initializing LLAP IO in {} mode", useLowLevelCache ? LlapIoImpl.MODE_CACHE : "none");
     String displayName = "LlapDaemonCacheMetrics-" + MetricsUtils.getHostName();
     String sessionId = conf.get("llap.daemon.metrics.sessionid");
     this.cacheMetrics = LlapDaemonCacheMetrics.create(displayName, sessionId);
@@ -109,7 +109,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
         sessionId);
 
     OrcMetadataCache metadataCache = null;
-    LowLevelCacheImpl orcCache = null;
+    LowLevelCache cache = null;
     BufferUsageManager bufferManager = null;
     if (useLowLevelCache) {
       // Memory manager uses cache policy to trigger evictions, so create the policy first.
@@ -122,23 +122,21 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
       // Cache uses allocator to allocate and deallocate, create allocator and then caches.
       EvictionAwareAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
       this.allocator = allocator;
-      orcCache = new LowLevelCacheImpl(cacheMetrics, cachePolicy, allocator, true);
+      LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl(
+          cacheMetrics, cachePolicy, allocator, true);
+      cache = cacheImpl;
       boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
       metadataCache = new OrcMetadataCache(memManager, cachePolicy, useGapCache);
       // And finally cache policy uses cache to notify it of eviction. The cycle is complete!
-      cachePolicy.setEvictionListener(new EvictionDispatcher(orcCache, metadataCache));
-      cachePolicy.setParentDebugDumper(orcCache);
-      orcCache.init(); // Start the cache threads.
-      bufferManager = orcCache; // Cache also serves as buffer manager.
+      cachePolicy.setEvictionListener(new EvictionDispatcher(cache, metadataCache));
+      cachePolicy.setParentDebugDumper(cacheImpl);
+      cacheImpl.startThreads(); // Start the cache threads.
+      bufferManager = cacheImpl; // Cache also serves as buffer manager.
     } else {
-      if (useAllocOnly) {
-        LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
-            conf, null, cacheMetrics);
-        allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
-      } else {
-        allocator = new SimpleAllocator(conf);
-      }
-      bufferManager = new SimpleBufferManager(allocator, cacheMetrics);
+      this.allocator = new SimpleAllocator(conf);
+      SimpleBufferManager sbm = new SimpleBufferManager(allocator, cacheMetrics);
+      bufferManager = sbm;
+      cache = sbm;
     }
     // IO thread pool. Listening is used for unhandled errors for now (TODO: remove?)
     int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
@@ -148,7 +146,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
         new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build());
     // TODO: this should depends on input format and be in a map, or something.
     this.cvp = new OrcColumnVectorProducer(
-        metadataCache, orcCache, bufferManager, conf, cacheMetrics, ioMetrics);
+        metadataCache, cache, bufferManager, conf, cacheMetrics, ioMetrics);
     LOG.info("LLAP IO initialized");
 
     registerMXBeans();

http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index 0a8e3df..12275ac 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache;
-import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
@@ -48,7 +47,7 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
   private LlapDaemonIOMetrics ioMetrics;
 
   public OrcColumnVectorProducer(OrcMetadataCache metadataCache,
-      LowLevelCacheImpl lowLevelCache, BufferUsageManager bufferManager,
+      LowLevelCache lowLevelCache, BufferUsageManager bufferManager,
       Configuration conf, LlapDaemonCacheMetrics cacheMetrics, LlapDaemonIOMetrics ioMetrics) {
     LlapIoImpl.LOG.info("Initializing ORC column vector producer");
 

http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 0d212ec..eb8ee6c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -835,12 +835,11 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     @Override
     public DiskRangeList getFileData(Object fileKey, DiskRangeList range,
         long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) {
-      DiskRangeList result = (lowLevelCache == null) ? range
-          : lowLevelCache.getFileData(fileKey, range, baseOffset, factory, counters, gotAllData);
+      DiskRangeList result = lowLevelCache.getFileData(
+          fileKey, range, baseOffset, factory, counters, gotAllData);
       if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
-        LlapIoImpl.ORC_LOGGER.trace("Disk ranges after data cache (file " + fileKey
-            + ", base offset " + baseOffset + "): "
-            + RecordReaderUtils.stringifyDiskRanges(range.next));
+        LlapIoImpl.ORC_LOGGER.trace("Disk ranges after data cache (file " + fileKey +
+            ", base offset " + baseOffset + "): " + RecordReaderUtils.stringifyDiskRanges(range));
       }
       if (gotAllData.value) return result;
       return (metadataCache == null) ? range
@@ -851,7 +850,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     public long[] putFileData(Object fileKey, DiskRange[] ranges,
         MemoryBuffer[] data, long baseOffset) {
       if (data != null) {
-        return (lowLevelCache == null) ? null : lowLevelCache.putFileData(
+        return lowLevelCache.putFileData(
             fileKey, ranges, data, baseOffset, Priority.NORMAL, counters);
       } else if (metadataCache != null) {
         metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset);

http://git-wip-us.apache.org/repos/asf/hive/blob/406e935f/llap-server/src/main/resources/llap-daemon-log4j2.properties
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/llap-daemon-log4j2.properties b/llap-server/src/main/resources/llap-daemon-log4j2.properties
index 422a92a..0c953d1 100644
--- a/llap-server/src/main/resources/llap-daemon-log4j2.properties
+++ b/llap-server/src/main/resources/llap-daemon-log4j2.properties
@@ -100,7 +100,10 @@ appender.query-routing.routes.route-mdc.file-mdc.app.layout.type = PatternLayout
 appender.query-routing.routes.route-mdc.file-mdc.app.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n
 
 # list of all loggers
-loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger, LlapIoImpl, LlapIoOrc, LlapIoCache, LlapIoLocking
+loggers = EncodedReader, NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger, LlapIoImpl, LlapIoOrc, LlapIoCache, LlapIoLocking
+
+logger.EncodedReader.name = org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl
+logger.EncodedReader.level = INFO
 
 logger.LlapIoImpl.name = LlapIoImpl
 logger.LlapIoImpl.level = INFO
@@ -109,7 +112,7 @@ logger.LlapIoOrc.name = LlapIoOrc
 logger.LlapIoOrc.level = WARN
 
 logger.LlapIoCache.name = LlapIoCache
-logger.LlapIOCache.level = WARN
+logger.LlapIoCache.level = WARN
 
 logger.LlapIoLocking.name = LlapIoLocking
 logger.LlapIoLocking.level = WARN