You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2016/05/26 15:40:54 UTC
[32/66] [abbrv] hive git commit: HIVE-13029: NVDIMM support for LLAP
Cache (Gopal V, reviewed by Sergey Shelukhin)
HIVE-13029: NVDIMM support for LLAP Cache (Gopal V, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/dee45522
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/dee45522
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/dee45522
Branch: refs/heads/java8
Commit: dee45522934ae8eb9fbfdbd6df00fcc97377faed
Parents: ebc5e6a
Author: Gopal V <go...@apache.org>
Authored: Wed May 25 14:03:48 2016 -0700
Committer: Gopal V <go...@apache.org>
Committed: Wed May 25 14:03:48 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 8 +++
.../org/apache/hadoop/hive/conf/Validator.java | 29 ++++++++
.../hadoop/hive/llap/cache/BuddyAllocator.java | 75 ++++++++++++++++++--
.../hadoop/hive/llap/cli/LlapServiceDriver.java | 15 ++--
.../hive/llap/cache/TestBuddyAllocator.java | 46 ++++++++----
5 files changed, 150 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/dee45522/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ed20069..3e295fe 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.conf;
import com.google.common.base.Joiner;
+
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
@@ -29,6 +30,7 @@ import org.apache.hadoop.hive.conf.Validator.RatioValidator;
import org.apache.hadoop.hive.conf.Validator.SizeValidator;
import org.apache.hadoop.hive.conf.Validator.StringSet;
import org.apache.hadoop.hive.conf.Validator.TimeValidator;
+import org.apache.hadoop.hive.conf.Validator.WritableDirectoryValidator;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
@@ -2630,6 +2632,12 @@ public class HiveConf extends Configuration {
"Maximum size for IO allocator or ORC low-level cache.", "hive.llap.io.cache.orc.size"),
LLAP_ALLOCATOR_DIRECT("hive.llap.io.allocator.direct", true,
"Whether ORC low-level cache should use direct allocation."),
+ LLAP_ALLOCATOR_MAPPED("hive.llap.io.allocator.mmap", false,
+ "Whether ORC low-level cache should use memory mapped allocation (direct I/O). \n" +
+ "This is recommended to be used along-side NVDIMM (DAX) or NVMe flash storage."),
+ LLAP_ALLOCATOR_MAPPED_PATH("hive.llap.io.allocator.mmap.path", "/tmp",
+ new WritableDirectoryValidator(),
+ "The directory location for mapping NVDIMM/NVMe flash storage into the ORC low-level cache."),
LLAP_USE_LRFU("hive.llap.io.use.lrfu", true,
"Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."),
LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.01f,
http://git-wip-us.apache.org/repos/asf/hive/blob/dee45522/common/src/java/org/apache/hadoop/hive/conf/Validator.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Validator.java b/common/src/java/org/apache/hadoop/hive/conf/Validator.java
index bb8962a..00e7b28 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Validator.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Validator.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hive.conf;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
@@ -346,4 +349,30 @@ public interface Validator {
return current > 0 ? ((long)(size / current) + "Pb") : (size + units[0]);
}
}
+
+ public class WritableDirectoryValidator implements Validator {
+
+ @Override
+ public String validate(String value) {
+ final Path path = FileSystems.getDefault().getPath(value);
+ if (path == null && value != null) {
+ return String.format("Path '%s' provided could not be located.", value);
+ }
+ final boolean isDir = Files.isDirectory(path);
+ final boolean isWritable = Files.isWritable(path);
+ if (!isDir) {
+ return String.format("Path '%s' provided is not a directory.", value);
+ }
+ if (!isWritable) {
+ return String.format("Path '%s' provided is not writable.", value);
+ }
+ return null;
+ }
+
+ @Override
+ public String toDescription() {
+ return "Expects a writable directory on the local filesystem";
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/dee45522/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 1d5a7db..47325ad 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -18,9 +18,23 @@
package org.apache.hadoop.hive.llap.cache;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -39,6 +53,8 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
private final int minAllocation, maxAllocation, arenaSize;
private final long maxSize;
private final boolean isDirect;
+ private final boolean isMapped;
+ private final Path cacheDir;
private final LlapDaemonCacheMetrics metrics;
// We don't know the acceptable size for Java array, so we'll use 1Gb boundary.
@@ -47,13 +63,21 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
// Don't try to operate with less than MIN_SIZE allocator space, it will just give you grief.
private static final int MIN_TOTAL_MEMORY_SIZE = 64*1024*1024;
+ private static final FileAttribute<Set<PosixFilePermission>> RWX = PosixFilePermissions
+ .asFileAttribute(PosixFilePermissions.fromString("rwx------"));
+ private static final FileAttribute<Set<PosixFilePermission>> RW_ = PosixFilePermissions
+ .asFileAttribute(PosixFilePermissions.fromString("rw-------"));
+
public BuddyAllocator(Configuration conf, MemoryManager mm, LlapDaemonCacheMetrics metrics) {
this(HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_DIRECT),
+ HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_MAPPED),
(int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC),
(int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MAX_ALLOC),
HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_ARENA_COUNT),
- getMaxTotalMemorySize(conf), mm, metrics);
+ getMaxTotalMemorySize(conf),
+ HiveConf.getVar(conf, ConfVars.LLAP_ALLOCATOR_MAPPED_PATH),
+ mm, metrics);
}
private static long getMaxTotalMemorySize(Configuration conf) {
@@ -71,14 +95,35 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
@VisibleForTesting
public BuddyAllocator(boolean isDirectVal, int minAllocVal, int maxAllocVal, int arenaCount,
long maxSizeVal, MemoryManager memoryManager, LlapDaemonCacheMetrics metrics) {
+ this(isDirectVal, false /*isMapped*/, minAllocVal, maxAllocVal, arenaCount, maxSizeVal,
+ null /* mapping path */, memoryManager, metrics);
+ }
+
+ @VisibleForTesting
+ public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal, int maxAllocVal,
+ int arenaCount, long maxSizeVal, String mapPath, MemoryManager memoryManager,
+ LlapDaemonCacheMetrics metrics) {
isDirect = isDirectVal;
+ isMapped = isMappedVal;
minAllocation = minAllocVal;
maxAllocation = maxAllocVal;
+ if (isMapped) {
+ try {
+ cacheDir =
+ Files.createTempDirectory(FileSystems.getDefault().getPath(mapPath), "llap-", RWX);
+ } catch (IOException ioe) {
+ // conf validator already checks this, so it will never trigger usually
+ throw new AssertionError("Configured mmap directory should be writable", ioe);
+ }
+ } else {
+ cacheDir = null;
+ }
int arenaSizeVal = (arenaCount == 0) ? MAX_ARENA_SIZE : (int)(maxSizeVal / arenaCount);
arenaSizeVal = Math.max(maxAllocation, Math.min(arenaSizeVal, MAX_ARENA_SIZE));
if (LlapIoImpl.LOG.isInfoEnabled()) {
- LlapIoImpl.LOG.info("Buddy allocator with " + (isDirect ? "direct" : "byte")
- + " buffers; allocation sizes " + minAllocation + " - " + maxAllocation
+ LlapIoImpl.LOG.info("Buddy allocator with " + (isDirect ? "direct" : "byte") + " buffers;"
+ + (isMapped ? (" memory mapped off " + cacheDir.toString() + "; ") : "")
+ + "allocation sizes " + minAllocation + " - " + maxAllocation
+ ", arena size " + arenaSizeVal + ". total size " + maxSizeVal);
}
@@ -288,6 +333,28 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
return maxSize;
}
+ private ByteBuffer preallocate(int arenaSize) {
+ if (isMapped) {
+ Preconditions.checkArgument(isDirect, "All memory mapped allocations have to be direct buffers");
+ try {
+ File rf = File.createTempFile("arena-", ".cache", cacheDir.toFile());
+ RandomAccessFile rwf = new RandomAccessFile(rf, "rw");
+ rwf.setLength(arenaSize); // truncate (TODO: posix_fallocate?)
+ ByteBuffer rwbuf = rwf.getChannel().map(MapMode.PRIVATE, 0, arenaSize);
+ // A mapping, once established, is not dependent upon the file channel that was used to
+ // create it. delete file and hold onto the map
+ rwf.close();
+ rf.delete();
+ return rwbuf;
+ } catch (IOException ioe) {
+ LlapIoImpl.LOG.warn("Failed trying to allocate memory mapped arena", ioe);
+ // fail similarly when memory allocations fail
+ throw new OutOfMemoryError("Failed trying to allocate memory mapped arena: " + ioe.getMessage());
+ }
+ }
+ return isDirect ? ByteBuffer.allocateDirect(arenaSize) : ByteBuffer.allocate(arenaSize);
+ }
+
private class Arena {
private ByteBuffer data;
// Avoid storing headers with data since we expect binary size allocations.
@@ -297,7 +364,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
void init() {
try {
- data = isDirect ? ByteBuffer.allocateDirect(arenaSize) : ByteBuffer.allocate(arenaSize);
+ data = preallocate(arenaSize);
} catch (OutOfMemoryError oom) {
throw new OutOfMemoryError("Cannot allocate " + arenaSize + " bytes: " + oom.getMessage()
+ "; make sure your xmx and process size are set correctly.");
http://git-wip-us.apache.org/repos/asf/hive/blob/dee45522/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index c8fddb1..dc80992 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -200,16 +200,23 @@ public class LlapServiceDriver {
if (options.getSize() != -1) {
if (options.getCache() != -1) {
- Preconditions.checkArgument(options.getCache() < options.getSize(),
- "Cache has to be smaller than the container sizing");
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED) == false) {
+ // direct heap allocations need to be safer
+ Preconditions.checkArgument(options.getCache() < options.getSize(),
+ "Cache has to be smaller than the container sizing");
+ } else if (options.getCache() < options.getSize()) {
+ LOG.warn("Note that this might need YARN physical memory monitoring to be turned off (yarn.nodemanager.pmem-check-enabled=false)");
+ }
}
if (options.getXmx() != -1) {
Preconditions.checkArgument(options.getXmx() < options.getSize(),
"Working memory has to be smaller than the container sizing");
}
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)) {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)
+ && false == HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) {
+ // direct and not memory mapped
Preconditions.checkArgument(options.getXmx() + options.getCache() < options.getSize(),
- "Working memory + cache has to be smaller than the containing sizing ");
+ "Working memory + cache has to be smaller than the container sizing ");
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/dee45522/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 7b04103..345f5b1 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -18,8 +18,9 @@
package org.apache.hadoop.hive.llap.cache;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -27,18 +28,33 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.io.Allocator.AllocatorOutOfMemoryException;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+@RunWith(Parameterized.class)
public class TestBuddyAllocator {
private static final Logger LOG = LoggerFactory.getLogger(TestBuddyAllocator.class);
private final Random rdm = new Random(2284);
+ private final boolean isDirect;
+ private final boolean isMapped;
+ private final String tmpDir = System.getProperty("java.io.tmpdir", ".");
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] { { false, false }, { true, false }, { true, true } });
+ }
+
+ public TestBuddyAllocator(boolean direct, boolean mmap) {
+ isDirect = direct;
+ isMapped = mmap;
+ }
private static class DummyMemoryManager implements MemoryManager {
@Override
@@ -78,8 +94,8 @@ public class TestBuddyAllocator {
@Test
public void testSameSizes() throws Exception {
int min = 3, max = 8, maxAlloc = 1 << max;
- BuddyAllocator a = new BuddyAllocator(false, 1 << min, maxAlloc, maxAlloc, maxAlloc,
- new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
+ BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc, maxAlloc,
+ tmpDir, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
for (int i = max; i >= min; --i) {
allocSameSize(a, 1 << (max - i), i);
}
@@ -88,16 +104,16 @@ public class TestBuddyAllocator {
@Test
public void testMultipleArenas() throws Exception {
int max = 8, maxAlloc = 1 << max, allocLog2 = max - 1, arenaCount = 5;
- BuddyAllocator a = new BuddyAllocator(false, 1 << 3, maxAlloc, maxAlloc, maxAlloc * arenaCount,
- new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
+ BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << 3, maxAlloc, maxAlloc, maxAlloc * arenaCount,
+ tmpDir, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
allocSameSize(a, arenaCount * 2, allocLog2);
}
@Test
public void testMTT() {
final int min = 3, max = 8, maxAlloc = 1 << max, allocsPerSize = 3;
- final BuddyAllocator a = new BuddyAllocator(false, 1 << min, maxAlloc, maxAlloc * 8,
- maxAlloc * 24, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
+ final BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc * 8,
+ maxAlloc * 24, tmpDir, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
ExecutorService executor = Executors.newFixedThreadPool(3);
final CountDownLatch cdlIn = new CountDownLatch(3), cdlOut = new CountDownLatch(1);
FutureTask<Void> upTask = new FutureTask<Void>(new Callable<Void>() {
@@ -140,8 +156,8 @@ public class TestBuddyAllocator {
@Test
public void testMTTArenas() {
final int min = 3, max = 4, maxAlloc = 1 << max, minAllocCount = 2048, threadCount = 4;
- final BuddyAllocator a = new BuddyAllocator(false, 1 << min, maxAlloc, maxAlloc,
- (1 << min) * minAllocCount, new DummyMemoryManager(),
+ final BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc,
+ (1 << min) * minAllocCount, tmpDir, new DummyMemoryManager(),
LlapDaemonCacheMetrics.create("test", "1"));
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
final CountDownLatch cdlIn = new CountDownLatch(threadCount), cdlOut = new CountDownLatch(1);
@@ -180,8 +196,8 @@ public class TestBuddyAllocator {
private void testVariableSizeInternal(
int allocCount, int arenaSizeMult, int arenaCount) throws Exception {
int min = 3, max = 8, maxAlloc = 1 << max, arenaSize = maxAlloc * arenaSizeMult;
- BuddyAllocator a = new BuddyAllocator(false, 1 << min, maxAlloc, arenaSize,
- arenaSize * arenaCount, new DummyMemoryManager(),
+ BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, arenaSize,
+ arenaSize * arenaCount, tmpDir , new DummyMemoryManager(),
LlapDaemonCacheMetrics.create("test", "1"));
allocateUp(a, min, max, allocCount, true);
allocateDown(a, min, max, allocCount, true);