You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/01/06 21:30:52 UTC
[1/2] hive git commit: HIVE-12597 : LLAP - allow using elevator
without cache (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/branch-2.0 410b0dd3d -> 3133ac5a3
refs/heads/master 20c549908 -> 8069b59a0
HIVE-12597 : LLAP - allow using elevator without cache (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8069b59a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8069b59a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8069b59a
Branch: refs/heads/master
Commit: 8069b59a0580c53dbcad0acffadb659104651f0d
Parents: 20c5499
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Jan 6 12:29:55 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Jan 6 12:29:55 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 28 +++--
data/conf/hive-site.xml | 2 +-
data/conf/llap/hive-site.xml | 2 +-
.../hadoop/hive/llap/cache/BuddyAllocator.java | 10 +-
.../hive/llap/cache/BufferUsageManager.java | 39 +++++++
.../hadoop/hive/llap/cache/LowLevelCache.java | 14 ---
.../hive/llap/cache/LowLevelCacheImpl.java | 8 +-
.../llap/cache/LowLevelCacheMemoryManager.java | 8 +-
.../llap/cache/LowLevelLrfuCachePolicy.java | 4 +-
.../hadoop/hive/llap/cache/SimpleAllocator.java | 116 +++++++++++++++++++
.../hive/llap/cache/SimpleBufferManager.java | 78 +++++++++++++
.../hadoop/hive/llap/cli/LlapServiceDriver.java | 12 +-
.../hive/llap/daemon/impl/LlapDaemon.java | 12 +-
.../hive/llap/io/api/impl/LlapIoImpl.java | 59 ++++++----
.../llap/io/decode/OrcColumnVectorProducer.java | 11 +-
.../llap/io/encoded/OrcEncodedDataReader.java | 44 ++++---
.../hive/llap/cache/TestBuddyAllocator.java | 8 +-
.../hive/llap/cache/TestLowLevelCacheImpl.java | 6 +-
.../llap/cache/TestLowLevelLrfuCachePolicy.java | 4 +-
19 files changed, 358 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ffe0d9a..479fa46 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2337,21 +2337,25 @@ public class HiveConf extends Configuration {
true,
"Updates tez job execution progress in-place in the terminal."),
LLAP_IO_ENABLED("hive.llap.io.enabled", false, "Whether the LLAP IO layer is enabled."),
- LLAP_LOW_LEVEL_CACHE("hive.llap.io.use.lowlevel.cache", true, "Must always be true for now"),
- LLAP_ORC_CACHE_MIN_ALLOC("hive.llap.io.cache.orc.alloc.min", 128 * 1024,
- "Minimum allocation possible from LLAP low-level cache for ORC. Allocations below that\n" +
- "will be padded to minimum allocation. Should generally be the same as expected ORC\n" +
- "compression buffer size, or next lowest power of 2. Must be power of 2."),
- LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.io.cache.orc.alloc.max", 16 * 1024 * 1024,
- "Maximum allocation possible from LLAP low-level cache for ORC. Should be as large as\n" +
- "the largest expected ORC compression buffer size. Must be power of 2."),
- LLAP_ORC_CACHE_ARENA_COUNT("hive.llap.io.cache.orc.arena.count", 8,
+ LLAP_IO_MEMORY_MODE("hive.llap.io.memory.mode", "cache",
+ new StringSet("cache", "allocator", "none"),
+ "LLAP IO memory usage; 'cache' (the default) uses data and metadata cache with a\n" +
+ "custom off-heap allocator, 'allocator' uses the custom allocator without the caches,\n" +
+ "'none' doesn't use either (this mode may result in significant performance degradation)"),
+ LLAP_ALLOCATOR_MIN_ALLOC("hive.llap.io.allocator.alloc.min", 128 * 1024,
+ "Minimum allocation possible from LLAP buddy allocator. Allocations below that are\n" +
+ "padded to minimum allocation. For ORC, should generally be the same as the expected\n" +
+ "compression buffer size, or next lowest power of 2. Must be a power of 2."),
+ LLAP_ALLOCATOR_MAX_ALLOC("hive.llap.io.allocator.alloc.max", 16 * 1024 * 1024,
+ "Maximum allocation possible from LLAP buddy allocator. For ORC, should be as large as\n" +
+ "the largest expected ORC compression buffer size. Must be a power of 2."),
+ LLAP_ALLOCATOR_ARENA_COUNT("hive.llap.io.allocator.arena.count", 8,
"Arena count for LLAP low-level cache; cache will be allocated in the steps of\n" +
"(size/arena_count) bytes. This size must be <= 1Gb and >= max allocation; if it is\n" +
"not the case, an adjusted size will be used. Using powers of 2 is recommended."),
- LLAP_ORC_CACHE_MAX_SIZE("hive.llap.io.cache.orc.size", 1024L * 1024 * 1024,
- "Maximum size for ORC low-level cache; must be a multiple of arena size."),
- LLAP_ORC_CACHE_ALLOCATE_DIRECT("hive.llap.io.cache.direct", true,
+ LLAP_IO_MEMORY_MAX_SIZE("hive.llap.io.memory.size", 1024L * 1024 * 1024,
+ "Maximum size for IO allocator or ORC low-level cache.", "hive.llap.io.cache.orc.size"),
+ LLAP_ALLOCATOR_DIRECT("hive.llap.io.allocator.direct", true,
"Whether ORC low-level cache should use direct allocation."),
LLAP_USE_LRFU("hive.llap.io.use.lrfu", false,
"Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."),
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/data/conf/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/hive-site.xml b/data/conf/hive-site.xml
index 44f75bf..cbb5546 100644
--- a/data/conf/hive-site.xml
+++ b/data/conf/hive-site.xml
@@ -293,7 +293,7 @@
<property>
- <name>hive.llap.io.cache.direct</name>
+ <name>hive.llap.io.allocator.direct</name>
<value>false</value>
</property>
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/data/conf/llap/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/llap/hive-site.xml b/data/conf/llap/hive-site.xml
index f768601..c2bef58 100644
--- a/data/conf/llap/hive-site.xml
+++ b/data/conf/llap/hive-site.xml
@@ -264,7 +264,7 @@
</property>
<property>
- <name>hive.llap.io.cache.direct</name>
+ <name>hive.llap.io.allocator.direct</name>
<value>false</value>
</property>
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 0c96efa..ab4df5d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -46,11 +46,11 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
private static final int MAX_ARENA_SIZE = 1024*1024*1024;
public BuddyAllocator(Configuration conf, MemoryManager memoryManager,
LlapDaemonCacheMetrics metrics) {
- isDirect = HiveConf.getBoolVar(conf, ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT);
- minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
- maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_ALLOC);
- int arenaCount = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_COUNT);
- long maxSizeVal = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
+ isDirect = HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_DIRECT);
+ minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
+ maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_MAX_ALLOC);
+ int arenaCount = HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_ARENA_COUNT);
+ long maxSizeVal = HiveConf.getLongVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
int arenaSizeVal = (arenaCount == 0) ? MAX_ARENA_SIZE : (int)(maxSizeVal / arenaCount);
arenaSizeVal = Math.max(maxAllocation, Math.min(arenaSizeVal, MAX_ARENA_SIZE));
if (LlapIoImpl.LOG.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BufferUsageManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BufferUsageManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BufferUsageManager.java
new file mode 100644
index 0000000..a441dc4
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BufferUsageManager.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+
+/**
+ * The buffer usage (refcount) tracker. Buffer usage can either be tracked by a cache, which
+ * will handle the coordination with eviction, or by a simple refcount manager to facilitate
+ * sharing when the buffer contains data for multiple readers.
+ */
+public interface BufferUsageManager {
+ Allocator getAllocator();
+
+ void decRefBuffer(MemoryBuffer buffer);
+
+ void decRefBuffers(List<MemoryBuffer> buffers);
+
+ boolean incRefBuffer(MemoryBuffer buffer);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
index b17edb5..17d9fdf 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
@@ -18,9 +18,6 @@
package org.apache.hadoop.hive.llap.cache;
-import java.util.List;
-
-import org.apache.hadoop.hive.common.io.Allocator;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
@@ -62,15 +59,4 @@ public interface LowLevelCache {
*/
long[] putFileData(long fileId, DiskRange[] ranges, MemoryBuffer[] chunks,
long baseOffset, Priority priority, LowLevelCacheCounters qfCounters);
-
- Allocator getAllocator();
-
- /**
- * Releases the buffer returned by getFileData.
- */
- void releaseBuffer(MemoryBuffer buffer);
-
- void releaseBuffers(List<MemoryBuffer> cacheBuffers);
-
- boolean reuseBuffer(MemoryBuffer buffer);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index c2a130a..1132171 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import com.google.common.annotations.VisibleForTesting;
-public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
+public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, LlapOomDebugDump {
private static final int DEFAULT_CLEANUP_INTERVAL = 600;
private final EvictionAwareAllocator allocator;
private final AtomicInteger newEvictions = new AtomicInteger(0);
@@ -334,12 +334,12 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
}
@Override
- public void releaseBuffer(MemoryBuffer buffer) {
+ public void decRefBuffer(MemoryBuffer buffer) {
unlockBuffer((LlapDataBuffer)buffer, true);
}
@Override
- public void releaseBuffers(List<MemoryBuffer> cacheBuffers) {
+ public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
for (MemoryBuffer b : cacheBuffers) {
unlockBuffer((LlapDataBuffer)b, true);
}
@@ -502,7 +502,7 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
}
@Override
- public boolean reuseBuffer(MemoryBuffer buffer) {
+ public boolean incRefBuffer(MemoryBuffer buffer) {
// notifyReused implies that buffer is already locked; it's also called once for new
// buffers that are not cached yet. Don't notify cache policy.
return lockBuffer(((LlapDataBuffer)buffer), false);
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index 8788e15..992da8e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -39,13 +39,14 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
public LowLevelCacheMemoryManager(Configuration conf, LowLevelCachePolicy evictor,
LlapDaemonCacheMetrics metrics) {
- this.maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
+ this.maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
this.evictor = evictor;
this.usedMemory = new AtomicLong(0);
this.metrics = metrics;
metrics.setCacheCapacityTotal(maxSize);
if (LlapIoImpl.LOGL.isInfoEnabled()) {
- LlapIoImpl.LOG.info("Cache memory manager initialized with max size " + maxSize);
+ LlapIoImpl.LOG.info("Memory manager initialized with max size " + maxSize + " and "
+ + ((evictor == null) ? "no " : "") + "ability to evict blocks");
}
}
@@ -65,6 +66,7 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
}
continue;
}
+ if (evictor == null) return false;
// TODO: for one-block case, we could move notification for the last block out of the loop.
long evicted = evictor.evictSomeBlocks(remainingToReserve);
if (evicted == 0) {
@@ -107,6 +109,7 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
@Override
public void forceReservedMemory(int memoryToEvict) {
+ if (evictor == null) return;
while (memoryToEvict > 0) {
long evicted = evictor.evictSomeBlocks(memoryToEvict);
if (evicted == 0) return;
@@ -126,6 +129,7 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
@Override
public String debugDumpForOom() {
+ if (evictor == null) return null;
return "cache state\n" + evictor.debugDumpForOom();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index 40cb92d..84910d7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -67,8 +67,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
private LlapOomDebugDump parentDebugDump;
public LowLevelLrfuCachePolicy(Configuration conf) {
- long maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
- int minBufferSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
+ long maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
+ int minBufferSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
int maxBuffers = (int)Math.ceil((maxSize * 1.0) / minBufferSize);
int maxHeapSize = -1;
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
new file mode 100644
index 0000000..526ff22
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+
+import sun.misc.Cleaner;
+
+public final class SimpleAllocator implements Allocator, BuddyAllocatorMXBean {
+ private final boolean isDirect;
+ private static Field cleanerField;
+ static {
+ ByteBuffer tmp = ByteBuffer.allocateDirect(1);
+ try {
+ cleanerField = tmp.getClass().getDeclaredField("cleaner");
+ cleanerField.setAccessible(true);
+ } catch (Throwable t) {
+ LlapIoImpl.LOG.warn("Cannot initialize DirectByteBuffer cleaner", t);
+ cleanerField = null;
+ }
+ }
+
+ public SimpleAllocator(Configuration conf) {
+ isDirect = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT);
+ if (LlapIoImpl.LOG.isInfoEnabled()) {
+ LlapIoImpl.LOG.info("Simple allocator with " + (isDirect ? "direct" : "byte") + " buffers");
+ }
+ }
+
+ @Override
+ public void allocateMultiple(MemoryBuffer[] dest, int size) {
+ for (int i = 0; i < dest.length; ++i) {
+ LlapDataBuffer buf = null;
+ if (dest[i] == null) {
+ dest[i] = buf = createUnallocated();
+ } else {
+ buf = (LlapDataBuffer)dest[i];
+ }
+ ByteBuffer bb = isDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
+ buf.initialize(0, bb, 0, size);
+ }
+ }
+
+ @Override
+ public void deallocate(MemoryBuffer buffer) {
+ LlapDataBuffer buf = (LlapDataBuffer)buffer;
+ ByteBuffer bb = buf.byteBuffer;
+ buf.byteBuffer = null;
+ Field field = cleanerField;
+ if (field == null) return;
+ try {
+ ((Cleaner)field.get(bb)).clean();
+ } catch (Throwable t) {
+ LlapIoImpl.LOG.warn("Error using DirectByteBuffer cleaner; stopping its use", t);
+ cleanerField = null;
+ }
+ }
+
+ @Override
+ public boolean isDirectAlloc() {
+ return isDirect;
+ }
+
+ @Override
+ public LlapDataBuffer createUnallocated() {
+ return new LlapDataBuffer();
+ }
+
+ // BuddyAllocatorMXBean
+ @Override
+ public boolean getIsDirect() {
+ return isDirect;
+ }
+
+ @Override
+ public int getMinAllocation() {
+ return 0;
+ }
+
+ @Override
+ public int getMaxAllocation() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public int getArenaSize() {
+ return -1;
+ }
+
+ @Override
+ public long getMaxCacheSize() {
+ return Integer.MAX_VALUE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
new file mode 100644
index 0000000..734a5c0
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+
+public class SimpleBufferManager implements BufferUsageManager {
+ private final Allocator allocator;
+ private final LlapDaemonCacheMetrics metrics;
+
+ public SimpleBufferManager(Allocator allocator, LlapDaemonCacheMetrics metrics) {
+ if (LlapIoImpl.LOGL.isInfoEnabled()) {
+ LlapIoImpl.LOG.info("Simple buffer manager");
+ }
+ this.allocator = allocator;
+ this.metrics = metrics;
+ }
+
+ private boolean lockBuffer(LlapDataBuffer buffer) {
+ int rc = buffer.incRef();
+ if (rc <= 0) return false;
+ metrics.incrCacheNumLockedBuffers();
+ return true;
+ }
+
+ private void unlockBuffer(LlapDataBuffer buffer) {
+ if (buffer.decRef() == 0) {
+ if (DebugUtils.isTraceCachingEnabled()) {
+ LlapIoImpl.LOG.info("Deallocating " + buffer + " that was not cached");
+ }
+ allocator.deallocate(buffer);
+ }
+ metrics.decrCacheNumLockedBuffers();
+ }
+
+ @Override
+ public void decRefBuffer(MemoryBuffer buffer) {
+ unlockBuffer((LlapDataBuffer)buffer);
+ }
+
+ @Override
+ public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
+ for (MemoryBuffer b : cacheBuffers) {
+ unlockBuffer((LlapDataBuffer)b);
+ }
+ }
+
+ @Override
+ public boolean incRefBuffer(MemoryBuffer buffer) {
+ return lockBuffer((LlapDataBuffer)buffer);
+ }
+
+ @Override
+ public Allocator getAllocator() {
+ return allocator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index 8e5377f..4b330f4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -145,7 +145,7 @@ public class LlapServiceDriver {
Preconditions.checkArgument(options.getXmx() < options.getSize(),
"Working memory has to be smaller than the container sizing");
}
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT)) {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)) {
Preconditions.checkArgument(options.getXmx() + options.getCache() < options.getSize(),
"Working memory + cache has to be smaller than the containing sizing ");
}
@@ -165,7 +165,7 @@ public class LlapServiceDriver {
}
if (options.getCache() != -1) {
- conf.setLong(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, options.getCache());
+ conf.setLong(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, options.getCache());
}
if (options.getXmx() != -1) {
@@ -278,11 +278,11 @@ public class LlapServiceDriver {
configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, HiveConf.getIntVar(conf,
ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB));
- configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname,
- HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE));
+ configs.put(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
+ HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE));
- configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT.varname,
- HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT));
+ configs.put(HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT.varname,
+ HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT));
configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, HiveConf.getIntVar(conf,
ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 467ab71..b3057c3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -290,14 +290,12 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
long executorMemoryBytes = HiveConf.getIntVar(
daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
- long cacheMemoryBytes =
- HiveConf.getLongVar(daemonConf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
- boolean isDirectCache =
- HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT);
+
+ long ioMemoryBytes = HiveConf.getLongVar(daemonConf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
+ boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT);
boolean llapIoEnabled = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED);
- llapDaemon =
- new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, llapIoEnabled, isDirectCache,
- cacheMemoryBytes, localDirs, rpcPort, mngPort, shufflePort);
+ llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, llapIoEnabled,
+ isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort);
LOG.info("Adding shutdown hook for LlapDaemon");
ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1);
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index b38f472..d2c1907 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -28,8 +28,10 @@ import javax.management.ObjectName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.Allocator;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
import org.apache.hadoop.hive.llap.cache.Cache;
import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
@@ -38,7 +40,8 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager;
import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
import org.apache.hadoop.hive.llap.cache.LowLevelFifoCachePolicy;
import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy;
-import org.apache.hadoop.hive.llap.cache.NoopCache;
+import org.apache.hadoop.hive.llap.cache.SimpleAllocator;
+import org.apache.hadoop.hive.llap.cache.SimpleBufferManager;
import org.apache.hadoop.hive.llap.io.api.LlapIo;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
@@ -59,19 +62,21 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
public static final Logger LOG = LoggerFactory.getLogger(LlapIoImpl.class);
public static final LogLevels LOGL = new LogLevels(LOG);
+ private static final String MODE_CACHE = "cache", MODE_ALLOCATOR = "allocator";
private final ColumnVectorProducer cvp;
private final ListeningExecutorService executor;
private LlapDaemonCacheMetrics cacheMetrics;
private LlapDaemonQueueMetrics queueMetrics;
private ObjectName buddyAllocatorMXBean;
- private EvictionAwareAllocator allocator;
+ private Allocator allocator;
private LlapIoImpl(Configuration conf) throws IOException {
- boolean useLowLevelCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_LOW_LEVEL_CACHE);
- // High-level cache not supported yet.
+ String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE);
+ boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode),
+ useAllocOnly = !useLowLevelCache && LlapIoImpl.MODE_ALLOCATOR.equalsIgnoreCase(ioMode);
if (LOGL.isInfoEnabled()) {
- LOG.info("Initializing LLAP IO" + (useLowLevelCache ? " with low level cache" : ""));
+ LOG.info("Initializing LLAP IO in " + ioMode + " mode");
}
String displayName = "LlapDaemonCacheMetrics-" + MetricsUtils.getHostName();
@@ -86,35 +91,47 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
LOG.info("Started llap daemon metrics with displayName: " + displayName +
" sessionId: " + sessionId);
- Cache<OrcCacheKey> cache = useLowLevelCache ? null : new NoopCache<OrcCacheKey>();
- boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU);
- LowLevelCachePolicy cachePolicy =
- useLrfu ? new LowLevelLrfuCachePolicy(conf) : new LowLevelFifoCachePolicy(conf);
- LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
- conf, cachePolicy, cacheMetrics);
- // Memory manager uses cache policy to trigger evictions.
- OrcMetadataCache metadataCache = new OrcMetadataCache(memManager, cachePolicy);
+ Cache<OrcCacheKey> cache = null; // High-level cache is not implemented or supported.
+
+ OrcMetadataCache metadataCache = null;
LowLevelCacheImpl orcCache = null;
+ BufferUsageManager bufferManager = null;
if (useLowLevelCache) {
- // Allocator uses memory manager to request memory.
- allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
- // Cache uses allocator to allocate and deallocate.
+ // Memory manager uses cache policy to trigger evictions, so create the policy first.
+ boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU);
+ LowLevelCachePolicy cachePolicy =
+ useLrfu ? new LowLevelLrfuCachePolicy(conf) : new LowLevelFifoCachePolicy(conf);
+ // Allocator uses memory manager to request memory, so create the manager next.
+ LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
+ conf, cachePolicy, cacheMetrics);
+ // Cache uses allocator to allocate and deallocate, create allocator and then caches.
+ EvictionAwareAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
+ this.allocator = allocator;
orcCache = new LowLevelCacheImpl(cacheMetrics, cachePolicy, allocator, true);
+ metadataCache = new OrcMetadataCache(memManager, cachePolicy);
// And finally cache policy uses cache to notify it of eviction. The cycle is complete!
cachePolicy.setEvictionListener(new EvictionDispatcher(orcCache, metadataCache));
cachePolicy.setParentDebugDumper(orcCache);
- orcCache.init();
+ orcCache.init(); // Start the cache threads.
+ bufferManager = orcCache; // Cache also serves as buffer manager.
} else {
- cachePolicy.setEvictionListener(new EvictionDispatcher(null, metadataCache));
+ if (useAllocOnly) {
+ LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
+ conf, null, cacheMetrics);
+ allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
+ } else {
+ allocator = new SimpleAllocator(conf);
+ }
+ bufferManager = new SimpleBufferManager(allocator, cacheMetrics);
}
- // Arbitrary thread pool. Listening is used for unhandled errors for now (TODO: remove?)
+ // IO thread pool. Listening is used for unhandled errors for now (TODO: remove?)
int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build()));
// TODO: this should depends on input format and be in a map, or something.
- this.cvp = new OrcColumnVectorProducer(metadataCache, orcCache, cache, conf, cacheMetrics,
- queueMetrics);
+ this.cvp = new OrcColumnVectorProducer(
+ metadataCache, orcCache, bufferManager, cache, conf, cacheMetrics, queueMetrics);
if (LOGL.isInfoEnabled()) {
LOG.info("LLAP IO initialized");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index e156eaa..18191da 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
import org.apache.hadoop.hive.llap.cache.Cache;
import org.apache.hadoop.hive.llap.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
@@ -42,20 +43,22 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
private final OrcMetadataCache metadataCache;
private final Cache<OrcCacheKey> cache;
private final LowLevelCache lowLevelCache;
+ private final BufferUsageManager bufferManager;
private final Configuration conf;
private boolean _skipCorrupt; // TODO: get rid of this
private LlapDaemonCacheMetrics cacheMetrics;
private LlapDaemonQueueMetrics queueMetrics;
public OrcColumnVectorProducer(OrcMetadataCache metadataCache,
- LowLevelCacheImpl lowLevelCache, Cache<OrcCacheKey> cache, Configuration conf,
- LlapDaemonCacheMetrics metrics, LlapDaemonQueueMetrics queueMetrics) {
+ LowLevelCacheImpl lowLevelCache, BufferUsageManager bufferManager, Cache<OrcCacheKey> cache,
+ Configuration conf, LlapDaemonCacheMetrics metrics, LlapDaemonQueueMetrics queueMetrics) {
if (LlapIoImpl.LOGL.isInfoEnabled()) {
LlapIoImpl.LOG.info("Initializing ORC column vector producer");
}
this.metadataCache = metadataCache;
this.lowLevelCache = lowLevelCache;
+ this.bufferManager = bufferManager;
this.cache = cache;
this.conf = conf;
this._skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
@@ -71,8 +74,8 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
cacheMetrics.incrCacheReadRequests();
OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
_skipCorrupt, counters, queueMetrics);
- OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, cache, metadataCache,
- conf, split, columnIds, sarg, columnNames, edc, counters);
+ OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager, cache,
+ metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters);
edc.init(reader, reader);
return edc;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 58d2ac8..3ddfc29 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
import org.apache.hadoop.hive.llap.cache.Cache;
import org.apache.hadoop.hive.llap.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
@@ -133,6 +134,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private final OrcMetadataCache metadataCache;
private final LowLevelCache lowLevelCache;
+ private final BufferUsageManager bufferManager;
private final Configuration conf;
private final Cache<OrcCacheKey> cache;
private final FileSplit split;
@@ -160,12 +162,13 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
@SuppressWarnings("unused")
private volatile boolean isPaused = false;
- public OrcEncodedDataReader(LowLevelCache lowLevelCache, Cache<OrcCacheKey> cache,
- OrcMetadataCache metadataCache, Configuration conf, FileSplit split,
- List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager,
+ Cache<OrcCacheKey> cache, OrcMetadataCache metadataCache, Configuration conf,
+ FileSplit split, List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
OrcEncodedDataConsumer consumer, QueryFragmentCounters counters) {
this.lowLevelCache = lowLevelCache;
this.metadataCache = metadataCache;
+ this.bufferManager = bufferManager;
this.cache = cache;
this.conf = conf;
this.split = split;
@@ -382,7 +385,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
if (stripeMetadatas != null) {
stripeMetadata = stripeMetadatas.get(stripeIxMod);
} else {
- if (hasFileId) {
+ if (hasFileId && metadataCache != null) {
stripeKey.stripeIx = stripeIx;
stripeMetadata = metadataCache.getStripeMetadata(stripeKey);
}
@@ -394,7 +397,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
stripeMetadata = new OrcStripeMetadata(
stripeKey, metadataReader, stripe, stripeIncludes, sargColumns);
counters.incrTimeCounter(Counter.HDFS_TIME_US, startTimeHdfs);
- if (hasFileId) {
+ if (hasFileId && metadataCache != null) {
stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata);
if (DebugUtils.isTraceOrcEnabled()) {
LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
@@ -510,11 +513,11 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private void validateFileMetadata() throws IOException {
if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return;
int bufferSize = fileMetadata.getCompressionBufferSize();
- int minAllocSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
+ int minAllocSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
if (bufferSize < minAllocSize) {
LOG.warn("ORC compression buffer size (" + bufferSize + ") is smaller than LLAP low-level "
+ "cache minimum allocation size (" + minAllocSize + "). Decrease the value for "
- + HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.toString() + " to avoid wasting memory");
+ + HiveConf.ConfVars.LLAP_ALLOCATOR_MIN_ALLOC.toString() + " to avoid wasting memory");
}
}
@@ -617,18 +620,20 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
*/
private OrcFileMetadata getOrReadFileMetadata() throws IOException {
OrcFileMetadata metadata = null;
- if (fileId != null) {
+ if (fileId != null && metadataCache != null) {
metadata = metadataCache.getFileMetadata(fileId);
if (metadata != null) {
counters.incrCounter(Counter.METADATA_CACHE_HIT);
return metadata;
+ } else {
+ counters.incrCounter(Counter.METADATA_CACHE_MISS);
}
- counters.incrCounter(Counter.METADATA_CACHE_MISS);
}
ensureOrcReader();
// We assume this call doesn't touch HDFS because everything is already read; don't add time.
metadata = new OrcFileMetadata(fileId != null ? fileId : 0, orcReader);
- return (fileId == null) ? metadata : metadataCache.putFileMetadata(metadata);
+ if (fileId == null || metadataCache == null) return metadata;
+ return metadataCache.putFileMetadata(metadata);
}
/**
@@ -643,7 +648,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
OrcStripeMetadata value = null;
int stripeIx = stripeIxMod + stripeIxFrom;
- if (hasFileId) {
+ if (hasFileId && metadataCache != null) {
stripeKey.stripeIx = stripeIx;
value = metadataCache.getStripeMetadata(stripeKey);
}
@@ -655,7 +660,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
long startTime = counters.startTimeCounter();
value = new OrcStripeMetadata(stripeKey, metadataReader, si, globalInc, sargColumns);
counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
- if (hasFileId) {
+ if (hasFileId && metadataCache != null) {
value = metadataCache.putStripeMetadata(value);
if (DebugUtils.isTraceOrcEnabled()) {
LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
@@ -701,7 +706,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
LlapIoImpl.LOG.info("Unlocking " + buf + " at the end of processing");
}
}
- lowLevelCache.releaseBuffers(data.getCacheBuffers());
+ bufferManager.decRefBuffers(data.getCacheBuffers());
CSD_POOL.offer(data);
}
}
@@ -939,7 +944,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
public DataWrapperForOrc() {
boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf);
- if (useZeroCopy && !lowLevelCache.getAllocator().isDirectAlloc()) {
+ if (useZeroCopy && !getAllocator().isDirectAlloc()) {
throw new UnsupportedOperationException("Cannot use zero-copy reader with non-direct cache "
+ "buffers; either disable zero-copy or enable direct cache allocation");
}
@@ -949,30 +954,31 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
@Override
public DiskRangeList getFileData(long fileId, DiskRangeList range,
long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) {
- return lowLevelCache.getFileData(fileId, range, baseOffset, factory, counters, gotAllData);
+ return (lowLevelCache == null) ? range : lowLevelCache.getFileData(
+ fileId, range, baseOffset, factory, counters, gotAllData);
}
@Override
public long[] putFileData(long fileId, DiskRange[] ranges,
MemoryBuffer[] data, long baseOffset) {
- return lowLevelCache.putFileData(
+ return (lowLevelCache == null) ? null : lowLevelCache.putFileData(
fileId, ranges, data, baseOffset, Priority.NORMAL, counters);
}
@Override
public void releaseBuffer(MemoryBuffer buffer) {
- lowLevelCache.releaseBuffer(buffer);
+ bufferManager.decRefBuffer(buffer);
}
@Override
public void reuseBuffer(MemoryBuffer buffer) {
- boolean isReused = lowLevelCache.reuseBuffer(buffer);
+ boolean isReused = bufferManager.incRefBuffer(buffer);
assert isReused;
}
@Override
public Allocator getAllocator() {
- return lowLevelCache.getAllocator();
+ return bufferManager.getAllocator();
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 6375996..fc014a7 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -282,10 +282,10 @@ public class TestBuddyAllocator {
private Configuration createConf(int min, int max, int arena, int total) {
Configuration conf = new Configuration();
- conf.setInt(ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, min);
- conf.setInt(ConfVars.LLAP_ORC_CACHE_MAX_ALLOC.varname, max);
- conf.setInt(ConfVars.LLAP_ORC_CACHE_ARENA_COUNT.varname, total/arena);
- conf.setLong(ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, total);
+ conf.setInt(ConfVars.LLAP_ALLOCATOR_MIN_ALLOC.varname, min);
+ conf.setInt(ConfVars.LLAP_ALLOCATOR_MAX_ALLOC.varname, max);
+ conf.setInt(ConfVars.LLAP_ALLOCATOR_ARENA_COUNT.varname, total/arena);
+ conf.setLong(ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, total);
return conf;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index 8324b21..0846db9 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -348,7 +348,7 @@ public class TestLowLevelCacheImpl {
++gets;
LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)iter).getBuffer();
assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), result.arenaIndex);
- cache.releaseBuffer(result);
+ cache.decRefBuffer(result);
iter = iter.next;
}
} else {
@@ -378,7 +378,7 @@ public class TestLowLevelCacheImpl {
assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), buf.arenaIndex);
}
maskVal >>= 1;
- cache.releaseBuffer(buf);
+ cache.decRefBuffer(buf);
}
}
}
@@ -411,7 +411,7 @@ public class TestLowLevelCacheImpl {
DiskRange r = results[index];
if (r instanceof CacheChunk) {
LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)r).getBuffer();
- cache.releaseBuffer(result);
+ cache.decRefBuffer(result);
if (victim == null && result.invalidate()) {
++evictions;
victim = result;
http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index d0abfa3..46e9547 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -145,8 +145,8 @@ public class TestLowLevelLrfuCachePolicy {
private Configuration createConf(int min, int heapSize, Double lambda) {
Configuration conf = new Configuration();
- conf.setInt(HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, min);
- conf.setInt(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, heapSize);
+ conf.setInt(HiveConf.ConfVars.LLAP_ALLOCATOR_MIN_ALLOC.varname, min);
+ conf.setInt(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, heapSize);
if (lambda != null) {
conf.setDouble(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, lambda.doubleValue());
}
[2/2] hive git commit: HIVE-12597 : LLAP - allow using elevator
without cache (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Posted by se...@apache.org.
HIVE-12597 : LLAP - allow using elevator without cache (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3133ac5a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3133ac5a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3133ac5a
Branch: refs/heads/branch-2.0
Commit: 3133ac5a37d7a558278284bc031924b0cb98d51e
Parents: 410b0dd
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Jan 6 12:29:55 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Jan 6 12:30:16 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 28 +++--
data/conf/hive-site.xml | 2 +-
data/conf/llap/hive-site.xml | 2 +-
.../hadoop/hive/llap/cache/BuddyAllocator.java | 10 +-
.../hive/llap/cache/BufferUsageManager.java | 39 +++++++
.../hadoop/hive/llap/cache/LowLevelCache.java | 14 ---
.../hive/llap/cache/LowLevelCacheImpl.java | 8 +-
.../llap/cache/LowLevelCacheMemoryManager.java | 8 +-
.../llap/cache/LowLevelLrfuCachePolicy.java | 4 +-
.../hadoop/hive/llap/cache/SimpleAllocator.java | 116 +++++++++++++++++++
.../hive/llap/cache/SimpleBufferManager.java | 78 +++++++++++++
.../hadoop/hive/llap/cli/LlapServiceDriver.java | 12 +-
.../hive/llap/daemon/impl/LlapDaemon.java | 12 +-
.../hive/llap/io/api/impl/LlapIoImpl.java | 59 ++++++----
.../llap/io/decode/OrcColumnVectorProducer.java | 11 +-
.../llap/io/encoded/OrcEncodedDataReader.java | 44 ++++---
.../hive/llap/cache/TestBuddyAllocator.java | 8 +-
.../hive/llap/cache/TestLowLevelCacheImpl.java | 6 +-
.../llap/cache/TestLowLevelLrfuCachePolicy.java | 4 +-
19 files changed, 358 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a831d09..caf7842 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2325,21 +2325,25 @@ public class HiveConf extends Configuration {
true,
"Updates tez job execution progress in-place in the terminal."),
LLAP_IO_ENABLED("hive.llap.io.enabled", false, "Whether the LLAP IO layer is enabled."),
- LLAP_LOW_LEVEL_CACHE("hive.llap.io.use.lowlevel.cache", true, "Must always be true for now"),
- LLAP_ORC_CACHE_MIN_ALLOC("hive.llap.io.cache.orc.alloc.min", 128 * 1024,
- "Minimum allocation possible from LLAP low-level cache for ORC. Allocations below that\n" +
- "will be padded to minimum allocation. Should generally be the same as expected ORC\n" +
- "compression buffer size, or next lowest power of 2. Must be power of 2."),
- LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.io.cache.orc.alloc.max", 16 * 1024 * 1024,
- "Maximum allocation possible from LLAP low-level cache for ORC. Should be as large as\n" +
- "the largest expected ORC compression buffer size. Must be power of 2."),
- LLAP_ORC_CACHE_ARENA_COUNT("hive.llap.io.cache.orc.arena.count", 8,
+ LLAP_IO_MEMORY_MODE("hive.llap.io.memory.mode", "cache",
+ new StringSet("cache", "allocator", "none"),
+ "LLAP IO memory usage; 'cache' (the default) uses data and metadata cache with a\n" +
+ "custom off-heap allocator, 'allocator' uses the custom allocator without the caches,\n" +
+ "'none' doesn't use either (this mode may result in significant performance degradation)"),
+ LLAP_ALLOCATOR_MIN_ALLOC("hive.llap.io.allocator.alloc.min", 128 * 1024,
+ "Minimum allocation possible from LLAP buddy allocator. Allocations below that are\n" +
+ "padded to minimum allocation. For ORC, should generally be the same as the expected\n" +
+ "compression buffer size, or next lowest power of 2. Must be a power of 2."),
+ LLAP_ALLOCATOR_MAX_ALLOC("hive.llap.io.allocator.alloc.max", 16 * 1024 * 1024,
+ "Maximum allocation possible from LLAP buddy allocator. For ORC, should be as large as\n" +
+ "the largest expected ORC compression buffer size. Must be a power of 2."),
+ LLAP_ALLOCATOR_ARENA_COUNT("hive.llap.io.allocator.arena.count", 8,
"Arena count for LLAP low-level cache; cache will be allocated in the steps of\n" +
"(size/arena_count) bytes. This size must be <= 1Gb and >= max allocation; if it is\n" +
"not the case, an adjusted size will be used. Using powers of 2 is recommended."),
- LLAP_ORC_CACHE_MAX_SIZE("hive.llap.io.cache.orc.size", 1024L * 1024 * 1024,
- "Maximum size for ORC low-level cache; must be a multiple of arena size."),
- LLAP_ORC_CACHE_ALLOCATE_DIRECT("hive.llap.io.cache.direct", true,
+ LLAP_IO_MEMORY_MAX_SIZE("hive.llap.io.memory.size", 1024L * 1024 * 1024,
+ "Maximum size for IO allocator or ORC low-level cache.", "hive.llap.io.cache.orc.size"),
+ LLAP_ALLOCATOR_DIRECT("hive.llap.io.allocator.direct", true,
"Whether ORC low-level cache should use direct allocation."),
LLAP_USE_LRFU("hive.llap.io.use.lrfu", false,
"Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."),
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/data/conf/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/hive-site.xml b/data/conf/hive-site.xml
index 5a790e7..d5e950d 100644
--- a/data/conf/hive-site.xml
+++ b/data/conf/hive-site.xml
@@ -276,7 +276,7 @@
<property>
- <name>hive.llap.io.cache.direct</name>
+ <name>hive.llap.io.allocator.direct</name>
<value>false</value>
</property>
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/data/conf/llap/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/llap/hive-site.xml b/data/conf/llap/hive-site.xml
index f768601..c2bef58 100644
--- a/data/conf/llap/hive-site.xml
+++ b/data/conf/llap/hive-site.xml
@@ -264,7 +264,7 @@
</property>
<property>
- <name>hive.llap.io.cache.direct</name>
+ <name>hive.llap.io.allocator.direct</name>
<value>false</value>
</property>
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 0c96efa..ab4df5d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -46,11 +46,11 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
private static final int MAX_ARENA_SIZE = 1024*1024*1024;
public BuddyAllocator(Configuration conf, MemoryManager memoryManager,
LlapDaemonCacheMetrics metrics) {
- isDirect = HiveConf.getBoolVar(conf, ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT);
- minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
- maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_ALLOC);
- int arenaCount = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_COUNT);
- long maxSizeVal = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
+ isDirect = HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_DIRECT);
+ minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
+ maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_MAX_ALLOC);
+ int arenaCount = HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_ARENA_COUNT);
+ long maxSizeVal = HiveConf.getLongVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
int arenaSizeVal = (arenaCount == 0) ? MAX_ARENA_SIZE : (int)(maxSizeVal / arenaCount);
arenaSizeVal = Math.max(maxAllocation, Math.min(arenaSizeVal, MAX_ARENA_SIZE));
if (LlapIoImpl.LOG.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BufferUsageManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BufferUsageManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BufferUsageManager.java
new file mode 100644
index 0000000..a441dc4
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BufferUsageManager.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+
+/**
+ * The buffer usage (refcount) tracker. Buffer usage can either be tracked by a cache, which
+ * will handle the coordination with eviction, or by a simple refcount manager to facilitate
+ * sharing when the buffer contains data for multiple readers.
+ */
+public interface BufferUsageManager {
+ Allocator getAllocator();
+
+ void decRefBuffer(MemoryBuffer buffer);
+
+ void decRefBuffers(List<MemoryBuffer> buffers);
+
+ boolean incRefBuffer(MemoryBuffer buffer);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
index b17edb5..17d9fdf 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
@@ -18,9 +18,6 @@
package org.apache.hadoop.hive.llap.cache;
-import java.util.List;
-
-import org.apache.hadoop.hive.common.io.Allocator;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
@@ -62,15 +59,4 @@ public interface LowLevelCache {
*/
long[] putFileData(long fileId, DiskRange[] ranges, MemoryBuffer[] chunks,
long baseOffset, Priority priority, LowLevelCacheCounters qfCounters);
-
- Allocator getAllocator();
-
- /**
- * Releases the buffer returned by getFileData.
- */
- void releaseBuffer(MemoryBuffer buffer);
-
- void releaseBuffers(List<MemoryBuffer> cacheBuffers);
-
- boolean reuseBuffer(MemoryBuffer buffer);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index c2a130a..1132171 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import com.google.common.annotations.VisibleForTesting;
-public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
+public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, LlapOomDebugDump {
private static final int DEFAULT_CLEANUP_INTERVAL = 600;
private final EvictionAwareAllocator allocator;
private final AtomicInteger newEvictions = new AtomicInteger(0);
@@ -334,12 +334,12 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
}
@Override
- public void releaseBuffer(MemoryBuffer buffer) {
+ public void decRefBuffer(MemoryBuffer buffer) {
unlockBuffer((LlapDataBuffer)buffer, true);
}
@Override
- public void releaseBuffers(List<MemoryBuffer> cacheBuffers) {
+ public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
for (MemoryBuffer b : cacheBuffers) {
unlockBuffer((LlapDataBuffer)b, true);
}
@@ -502,7 +502,7 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
}
@Override
- public boolean reuseBuffer(MemoryBuffer buffer) {
+ public boolean incRefBuffer(MemoryBuffer buffer) {
// notifyReused implies that buffer is already locked; it's also called once for new
// buffers that are not cached yet. Don't notify cache policy.
return lockBuffer(((LlapDataBuffer)buffer), false);
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index 8788e15..992da8e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -39,13 +39,14 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
public LowLevelCacheMemoryManager(Configuration conf, LowLevelCachePolicy evictor,
LlapDaemonCacheMetrics metrics) {
- this.maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
+ this.maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
this.evictor = evictor;
this.usedMemory = new AtomicLong(0);
this.metrics = metrics;
metrics.setCacheCapacityTotal(maxSize);
if (LlapIoImpl.LOGL.isInfoEnabled()) {
- LlapIoImpl.LOG.info("Cache memory manager initialized with max size " + maxSize);
+ LlapIoImpl.LOG.info("Memory manager initialized with max size " + maxSize + " and "
+ + ((evictor == null) ? "no " : "") + "ability to evict blocks");
}
}
@@ -65,6 +66,7 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
}
continue;
}
+ if (evictor == null) return false;
// TODO: for one-block case, we could move notification for the last block out of the loop.
long evicted = evictor.evictSomeBlocks(remainingToReserve);
if (evicted == 0) {
@@ -107,6 +109,7 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
@Override
public void forceReservedMemory(int memoryToEvict) {
+ if (evictor == null) return;
while (memoryToEvict > 0) {
long evicted = evictor.evictSomeBlocks(memoryToEvict);
if (evicted == 0) return;
@@ -126,6 +129,7 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
@Override
public String debugDumpForOom() {
+ if (evictor == null) return null;
return "cache state\n" + evictor.debugDumpForOom();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index 40cb92d..84910d7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -67,8 +67,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
private LlapOomDebugDump parentDebugDump;
public LowLevelLrfuCachePolicy(Configuration conf) {
- long maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
- int minBufferSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
+ long maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
+ int minBufferSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
int maxBuffers = (int)Math.ceil((maxSize * 1.0) / minBufferSize);
int maxHeapSize = -1;
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
new file mode 100644
index 0000000..526ff22
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+
+import sun.misc.Cleaner;
+
+public final class SimpleAllocator implements Allocator, BuddyAllocatorMXBean {
+ private final boolean isDirect;
+ private static Field cleanerField;
+ static {
+ ByteBuffer tmp = ByteBuffer.allocateDirect(1);
+ try {
+ cleanerField = tmp.getClass().getDeclaredField("cleaner");
+ cleanerField.setAccessible(true);
+ } catch (Throwable t) {
+ LlapIoImpl.LOG.warn("Cannot initialize DirectByteBuffer cleaner", t);
+ cleanerField = null;
+ }
+ }
+
+ public SimpleAllocator(Configuration conf) {
+ isDirect = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT);
+ if (LlapIoImpl.LOG.isInfoEnabled()) {
+ LlapIoImpl.LOG.info("Simple allocator with " + (isDirect ? "direct" : "byte") + " buffers");
+ }
+ }
+
+ @Override
+ public void allocateMultiple(MemoryBuffer[] dest, int size) {
+ for (int i = 0; i < dest.length; ++i) {
+ LlapDataBuffer buf = null;
+ if (dest[i] == null) {
+ dest[i] = buf = createUnallocated();
+ } else {
+ buf = (LlapDataBuffer)dest[i];
+ }
+ ByteBuffer bb = isDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
+ buf.initialize(0, bb, 0, size);
+ }
+ }
+
+ @Override
+ public void deallocate(MemoryBuffer buffer) {
+ LlapDataBuffer buf = (LlapDataBuffer)buffer;
+ ByteBuffer bb = buf.byteBuffer;
+ buf.byteBuffer = null;
+ Field field = cleanerField;
+ if (field == null) return;
+ try {
+ ((Cleaner)field.get(bb)).clean();
+ } catch (Throwable t) {
+ LlapIoImpl.LOG.warn("Error using DirectByteBuffer cleaner; stopping its use", t);
+ cleanerField = null;
+ }
+ }
+
+ @Override
+ public boolean isDirectAlloc() {
+ return isDirect;
+ }
+
+ @Override
+ public LlapDataBuffer createUnallocated() {
+ return new LlapDataBuffer();
+ }
+
+ // BuddyAllocatorMXBean
+ @Override
+ public boolean getIsDirect() {
+ return isDirect;
+ }
+
+ @Override
+ public int getMinAllocation() {
+ return 0;
+ }
+
+ @Override
+ public int getMaxAllocation() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public int getArenaSize() {
+ return -1;
+ }
+
+ @Override
+ public long getMaxCacheSize() {
+ return Integer.MAX_VALUE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
new file mode 100644
index 0000000..734a5c0
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+
+public class SimpleBufferManager implements BufferUsageManager {
+ private final Allocator allocator;
+ private final LlapDaemonCacheMetrics metrics;
+
+ public SimpleBufferManager(Allocator allocator, LlapDaemonCacheMetrics metrics) {
+ if (LlapIoImpl.LOGL.isInfoEnabled()) {
+ LlapIoImpl.LOG.info("Simple buffer manager");
+ }
+ this.allocator = allocator;
+ this.metrics = metrics;
+ }
+
+ private boolean lockBuffer(LlapDataBuffer buffer) {
+ int rc = buffer.incRef();
+ if (rc <= 0) return false;
+ metrics.incrCacheNumLockedBuffers();
+ return true;
+ }
+
+ private void unlockBuffer(LlapDataBuffer buffer) {
+ if (buffer.decRef() == 0) {
+ if (DebugUtils.isTraceCachingEnabled()) {
+ LlapIoImpl.LOG.info("Deallocating " + buffer + " that was not cached");
+ }
+ allocator.deallocate(buffer);
+ }
+ metrics.decrCacheNumLockedBuffers();
+ }
+
+ @Override
+ public void decRefBuffer(MemoryBuffer buffer) {
+ unlockBuffer((LlapDataBuffer)buffer);
+ }
+
+ @Override
+ public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
+ for (MemoryBuffer b : cacheBuffers) {
+ unlockBuffer((LlapDataBuffer)b);
+ }
+ }
+
+ @Override
+ public boolean incRefBuffer(MemoryBuffer buffer) {
+ return lockBuffer((LlapDataBuffer)buffer);
+ }
+
+ @Override
+ public Allocator getAllocator() {
+ return allocator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index 8e5377f..4b330f4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -145,7 +145,7 @@ public class LlapServiceDriver {
Preconditions.checkArgument(options.getXmx() < options.getSize(),
"Working memory has to be smaller than the container sizing");
}
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT)) {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)) {
Preconditions.checkArgument(options.getXmx() + options.getCache() < options.getSize(),
"Working memory + cache has to be smaller than the containing sizing ");
}
@@ -165,7 +165,7 @@ public class LlapServiceDriver {
}
if (options.getCache() != -1) {
- conf.setLong(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, options.getCache());
+ conf.setLong(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, options.getCache());
}
if (options.getXmx() != -1) {
@@ -278,11 +278,11 @@ public class LlapServiceDriver {
configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, HiveConf.getIntVar(conf,
ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB));
- configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname,
- HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE));
+ configs.put(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
+ HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE));
- configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT.varname,
- HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT));
+ configs.put(HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT.varname,
+ HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT));
configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, HiveConf.getIntVar(conf,
ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 467ab71..b3057c3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -290,14 +290,12 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
long executorMemoryBytes = HiveConf.getIntVar(
daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
- long cacheMemoryBytes =
- HiveConf.getLongVar(daemonConf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
- boolean isDirectCache =
- HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT);
+
+ long ioMemoryBytes = HiveConf.getLongVar(daemonConf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
+ boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT);
boolean llapIoEnabled = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED);
- llapDaemon =
- new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, llapIoEnabled, isDirectCache,
- cacheMemoryBytes, localDirs, rpcPort, mngPort, shufflePort);
+ llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, llapIoEnabled,
+ isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort);
LOG.info("Adding shutdown hook for LlapDaemon");
ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1);
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index b38f472..d2c1907 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -28,8 +28,10 @@ import javax.management.ObjectName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.Allocator;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
import org.apache.hadoop.hive.llap.cache.Cache;
import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
@@ -38,7 +40,8 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager;
import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
import org.apache.hadoop.hive.llap.cache.LowLevelFifoCachePolicy;
import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy;
-import org.apache.hadoop.hive.llap.cache.NoopCache;
+import org.apache.hadoop.hive.llap.cache.SimpleAllocator;
+import org.apache.hadoop.hive.llap.cache.SimpleBufferManager;
import org.apache.hadoop.hive.llap.io.api.LlapIo;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
@@ -59,19 +62,21 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
public static final Logger LOG = LoggerFactory.getLogger(LlapIoImpl.class);
public static final LogLevels LOGL = new LogLevels(LOG);
+ private static final String MODE_CACHE = "cache", MODE_ALLOCATOR = "allocator";
private final ColumnVectorProducer cvp;
private final ListeningExecutorService executor;
private LlapDaemonCacheMetrics cacheMetrics;
private LlapDaemonQueueMetrics queueMetrics;
private ObjectName buddyAllocatorMXBean;
- private EvictionAwareAllocator allocator;
+ private Allocator allocator;
private LlapIoImpl(Configuration conf) throws IOException {
- boolean useLowLevelCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_LOW_LEVEL_CACHE);
- // High-level cache not supported yet.
+ String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE);
+ boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode),
+ useAllocOnly = !useLowLevelCache && LlapIoImpl.MODE_ALLOCATOR.equalsIgnoreCase(ioMode);
if (LOGL.isInfoEnabled()) {
- LOG.info("Initializing LLAP IO" + (useLowLevelCache ? " with low level cache" : ""));
+ LOG.info("Initializing LLAP IO in " + ioMode + " mode");
}
String displayName = "LlapDaemonCacheMetrics-" + MetricsUtils.getHostName();
@@ -86,35 +91,47 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
LOG.info("Started llap daemon metrics with displayName: " + displayName +
" sessionId: " + sessionId);
- Cache<OrcCacheKey> cache = useLowLevelCache ? null : new NoopCache<OrcCacheKey>();
- boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU);
- LowLevelCachePolicy cachePolicy =
- useLrfu ? new LowLevelLrfuCachePolicy(conf) : new LowLevelFifoCachePolicy(conf);
- LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
- conf, cachePolicy, cacheMetrics);
- // Memory manager uses cache policy to trigger evictions.
- OrcMetadataCache metadataCache = new OrcMetadataCache(memManager, cachePolicy);
+ Cache<OrcCacheKey> cache = null; // High-level cache is not implemented or supported.
+
+ OrcMetadataCache metadataCache = null;
LowLevelCacheImpl orcCache = null;
+ BufferUsageManager bufferManager = null;
if (useLowLevelCache) {
- // Allocator uses memory manager to request memory.
- allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
- // Cache uses allocator to allocate and deallocate.
+ // Memory manager uses cache policy to trigger evictions, so create the policy first.
+ boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU);
+ LowLevelCachePolicy cachePolicy =
+ useLrfu ? new LowLevelLrfuCachePolicy(conf) : new LowLevelFifoCachePolicy(conf);
+ // Allocator uses memory manager to request memory, so create the manager next.
+ LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
+ conf, cachePolicy, cacheMetrics);
+ // Cache uses allocator to allocate and deallocate, create allocator and then caches.
+ EvictionAwareAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
+ this.allocator = allocator;
orcCache = new LowLevelCacheImpl(cacheMetrics, cachePolicy, allocator, true);
+ metadataCache = new OrcMetadataCache(memManager, cachePolicy);
// And finally cache policy uses cache to notify it of eviction. The cycle is complete!
cachePolicy.setEvictionListener(new EvictionDispatcher(orcCache, metadataCache));
cachePolicy.setParentDebugDumper(orcCache);
- orcCache.init();
+ orcCache.init(); // Start the cache threads.
+ bufferManager = orcCache; // Cache also serves as buffer manager.
} else {
- cachePolicy.setEvictionListener(new EvictionDispatcher(null, metadataCache));
+ if (useAllocOnly) {
+ LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
+ conf, null, cacheMetrics);
+ allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
+ } else {
+ allocator = new SimpleAllocator(conf);
+ }
+ bufferManager = new SimpleBufferManager(allocator, cacheMetrics);
}
- // Arbitrary thread pool. Listening is used for unhandled errors for now (TODO: remove?)
+ // IO thread pool. Listening is used for unhandled errors for now (TODO: remove?)
int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build()));
// TODO: this should depends on input format and be in a map, or something.
- this.cvp = new OrcColumnVectorProducer(metadataCache, orcCache, cache, conf, cacheMetrics,
- queueMetrics);
+ this.cvp = new OrcColumnVectorProducer(
+ metadataCache, orcCache, bufferManager, cache, conf, cacheMetrics, queueMetrics);
if (LOGL.isInfoEnabled()) {
LOG.info("LLAP IO initialized");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index e156eaa..18191da 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
import org.apache.hadoop.hive.llap.cache.Cache;
import org.apache.hadoop.hive.llap.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
@@ -42,20 +43,22 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
private final OrcMetadataCache metadataCache;
private final Cache<OrcCacheKey> cache;
private final LowLevelCache lowLevelCache;
+ private final BufferUsageManager bufferManager;
private final Configuration conf;
private boolean _skipCorrupt; // TODO: get rid of this
private LlapDaemonCacheMetrics cacheMetrics;
private LlapDaemonQueueMetrics queueMetrics;
public OrcColumnVectorProducer(OrcMetadataCache metadataCache,
- LowLevelCacheImpl lowLevelCache, Cache<OrcCacheKey> cache, Configuration conf,
- LlapDaemonCacheMetrics metrics, LlapDaemonQueueMetrics queueMetrics) {
+ LowLevelCacheImpl lowLevelCache, BufferUsageManager bufferManager, Cache<OrcCacheKey> cache,
+ Configuration conf, LlapDaemonCacheMetrics metrics, LlapDaemonQueueMetrics queueMetrics) {
if (LlapIoImpl.LOGL.isInfoEnabled()) {
LlapIoImpl.LOG.info("Initializing ORC column vector producer");
}
this.metadataCache = metadataCache;
this.lowLevelCache = lowLevelCache;
+ this.bufferManager = bufferManager;
this.cache = cache;
this.conf = conf;
this._skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
@@ -71,8 +74,8 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
cacheMetrics.incrCacheReadRequests();
OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
_skipCorrupt, counters, queueMetrics);
- OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, cache, metadataCache,
- conf, split, columnIds, sarg, columnNames, edc, counters);
+ OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager, cache,
+ metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters);
edc.init(reader, reader);
return edc;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 58d2ac8..3ddfc29 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
import org.apache.hadoop.hive.llap.cache.Cache;
import org.apache.hadoop.hive.llap.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
@@ -133,6 +134,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private final OrcMetadataCache metadataCache;
private final LowLevelCache lowLevelCache;
+ private final BufferUsageManager bufferManager;
private final Configuration conf;
private final Cache<OrcCacheKey> cache;
private final FileSplit split;
@@ -160,12 +162,13 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
@SuppressWarnings("unused")
private volatile boolean isPaused = false;
- public OrcEncodedDataReader(LowLevelCache lowLevelCache, Cache<OrcCacheKey> cache,
- OrcMetadataCache metadataCache, Configuration conf, FileSplit split,
- List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager,
+ Cache<OrcCacheKey> cache, OrcMetadataCache metadataCache, Configuration conf,
+ FileSplit split, List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
OrcEncodedDataConsumer consumer, QueryFragmentCounters counters) {
this.lowLevelCache = lowLevelCache;
this.metadataCache = metadataCache;
+ this.bufferManager = bufferManager;
this.cache = cache;
this.conf = conf;
this.split = split;
@@ -382,7 +385,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
if (stripeMetadatas != null) {
stripeMetadata = stripeMetadatas.get(stripeIxMod);
} else {
- if (hasFileId) {
+ if (hasFileId && metadataCache != null) {
stripeKey.stripeIx = stripeIx;
stripeMetadata = metadataCache.getStripeMetadata(stripeKey);
}
@@ -394,7 +397,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
stripeMetadata = new OrcStripeMetadata(
stripeKey, metadataReader, stripe, stripeIncludes, sargColumns);
counters.incrTimeCounter(Counter.HDFS_TIME_US, startTimeHdfs);
- if (hasFileId) {
+ if (hasFileId && metadataCache != null) {
stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata);
if (DebugUtils.isTraceOrcEnabled()) {
LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
@@ -510,11 +513,11 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private void validateFileMetadata() throws IOException {
if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return;
int bufferSize = fileMetadata.getCompressionBufferSize();
- int minAllocSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
+ int minAllocSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
if (bufferSize < minAllocSize) {
LOG.warn("ORC compression buffer size (" + bufferSize + ") is smaller than LLAP low-level "
+ "cache minimum allocation size (" + minAllocSize + "). Decrease the value for "
- + HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.toString() + " to avoid wasting memory");
+ + HiveConf.ConfVars.LLAP_ALLOCATOR_MIN_ALLOC.toString() + " to avoid wasting memory");
}
}
@@ -617,18 +620,20 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
*/
private OrcFileMetadata getOrReadFileMetadata() throws IOException {
OrcFileMetadata metadata = null;
- if (fileId != null) {
+ if (fileId != null && metadataCache != null) {
metadata = metadataCache.getFileMetadata(fileId);
if (metadata != null) {
counters.incrCounter(Counter.METADATA_CACHE_HIT);
return metadata;
+ } else {
+ counters.incrCounter(Counter.METADATA_CACHE_MISS);
}
- counters.incrCounter(Counter.METADATA_CACHE_MISS);
}
ensureOrcReader();
// We assume this call doesn't touch HDFS because everything is already read; don't add time.
metadata = new OrcFileMetadata(fileId != null ? fileId : 0, orcReader);
- return (fileId == null) ? metadata : metadataCache.putFileMetadata(metadata);
+ if (fileId == null || metadataCache == null) return metadata;
+ return metadataCache.putFileMetadata(metadata);
}
/**
@@ -643,7 +648,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
OrcStripeMetadata value = null;
int stripeIx = stripeIxMod + stripeIxFrom;
- if (hasFileId) {
+ if (hasFileId && metadataCache != null) {
stripeKey.stripeIx = stripeIx;
value = metadataCache.getStripeMetadata(stripeKey);
}
@@ -655,7 +660,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
long startTime = counters.startTimeCounter();
value = new OrcStripeMetadata(stripeKey, metadataReader, si, globalInc, sargColumns);
counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
- if (hasFileId) {
+ if (hasFileId && metadataCache != null) {
value = metadataCache.putStripeMetadata(value);
if (DebugUtils.isTraceOrcEnabled()) {
LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
@@ -701,7 +706,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
LlapIoImpl.LOG.info("Unlocking " + buf + " at the end of processing");
}
}
- lowLevelCache.releaseBuffers(data.getCacheBuffers());
+ bufferManager.decRefBuffers(data.getCacheBuffers());
CSD_POOL.offer(data);
}
}
@@ -939,7 +944,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
public DataWrapperForOrc() {
boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf);
- if (useZeroCopy && !lowLevelCache.getAllocator().isDirectAlloc()) {
+ if (useZeroCopy && !getAllocator().isDirectAlloc()) {
throw new UnsupportedOperationException("Cannot use zero-copy reader with non-direct cache "
+ "buffers; either disable zero-copy or enable direct cache allocation");
}
@@ -949,30 +954,31 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
@Override
public DiskRangeList getFileData(long fileId, DiskRangeList range,
long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) {
- return lowLevelCache.getFileData(fileId, range, baseOffset, factory, counters, gotAllData);
+ return (lowLevelCache == null) ? range : lowLevelCache.getFileData(
+ fileId, range, baseOffset, factory, counters, gotAllData);
}
@Override
public long[] putFileData(long fileId, DiskRange[] ranges,
MemoryBuffer[] data, long baseOffset) {
- return lowLevelCache.putFileData(
+ return (lowLevelCache == null) ? null : lowLevelCache.putFileData(
fileId, ranges, data, baseOffset, Priority.NORMAL, counters);
}
@Override
public void releaseBuffer(MemoryBuffer buffer) {
- lowLevelCache.releaseBuffer(buffer);
+ bufferManager.decRefBuffer(buffer);
}
@Override
public void reuseBuffer(MemoryBuffer buffer) {
- boolean isReused = lowLevelCache.reuseBuffer(buffer);
+ boolean isReused = bufferManager.incRefBuffer(buffer);
assert isReused;
}
@Override
public Allocator getAllocator() {
- return lowLevelCache.getAllocator();
+ return bufferManager.getAllocator();
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 6375996..fc014a7 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -282,10 +282,10 @@ public class TestBuddyAllocator {
private Configuration createConf(int min, int max, int arena, int total) {
Configuration conf = new Configuration();
- conf.setInt(ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, min);
- conf.setInt(ConfVars.LLAP_ORC_CACHE_MAX_ALLOC.varname, max);
- conf.setInt(ConfVars.LLAP_ORC_CACHE_ARENA_COUNT.varname, total/arena);
- conf.setLong(ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, total);
+ conf.setInt(ConfVars.LLAP_ALLOCATOR_MIN_ALLOC.varname, min);
+ conf.setInt(ConfVars.LLAP_ALLOCATOR_MAX_ALLOC.varname, max);
+ conf.setInt(ConfVars.LLAP_ALLOCATOR_ARENA_COUNT.varname, total/arena);
+ conf.setLong(ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, total);
return conf;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index 8324b21..0846db9 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -348,7 +348,7 @@ public class TestLowLevelCacheImpl {
++gets;
LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)iter).getBuffer();
assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), result.arenaIndex);
- cache.releaseBuffer(result);
+ cache.decRefBuffer(result);
iter = iter.next;
}
} else {
@@ -378,7 +378,7 @@ public class TestLowLevelCacheImpl {
assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), buf.arenaIndex);
}
maskVal >>= 1;
- cache.releaseBuffer(buf);
+ cache.decRefBuffer(buf);
}
}
}
@@ -411,7 +411,7 @@ public class TestLowLevelCacheImpl {
DiskRange r = results[index];
if (r instanceof CacheChunk) {
LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)r).getBuffer();
- cache.releaseBuffer(result);
+ cache.decRefBuffer(result);
if (victim == null && result.invalidate()) {
++evictions;
victim = result;
http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index d0abfa3..46e9547 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -145,8 +145,8 @@ public class TestLowLevelLrfuCachePolicy {
private Configuration createConf(int min, int heapSize, Double lambda) {
Configuration conf = new Configuration();
- conf.setInt(HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, min);
- conf.setInt(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, heapSize);
+ conf.setInt(HiveConf.ConfVars.LLAP_ALLOCATOR_MIN_ALLOC.varname, min);
+ conf.setInt(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, heapSize);
if (lambda != null) {
conf.setDouble(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, lambda.doubleValue());
}