You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2016/05/25 21:04:01 UTC

hive git commit: HIVE-13029: NVDIMM support for LLAP Cache (Gopal V, reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master ebc5e6a7b -> dee455229


HIVE-13029: NVDIMM support for LLAP Cache (Gopal V, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/dee45522
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/dee45522
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/dee45522

Branch: refs/heads/master
Commit: dee45522934ae8eb9fbfdbd6df00fcc97377faed
Parents: ebc5e6a
Author: Gopal V <go...@apache.org>
Authored: Wed May 25 14:03:48 2016 -0700
Committer: Gopal V <go...@apache.org>
Committed: Wed May 25 14:03:48 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  8 +++
 .../org/apache/hadoop/hive/conf/Validator.java  | 29 ++++++++
 .../hadoop/hive/llap/cache/BuddyAllocator.java  | 75 ++++++++++++++++++--
 .../hadoop/hive/llap/cli/LlapServiceDriver.java | 15 ++--
 .../hive/llap/cache/TestBuddyAllocator.java     | 46 ++++++++----
 5 files changed, 150 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/dee45522/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ed20069..3e295fe 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.conf;
 
 import com.google.common.base.Joiner;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
@@ -29,6 +30,7 @@ import org.apache.hadoop.hive.conf.Validator.RatioValidator;
 import org.apache.hadoop.hive.conf.Validator.SizeValidator;
 import org.apache.hadoop.hive.conf.Validator.StringSet;
 import org.apache.hadoop.hive.conf.Validator.TimeValidator;
+import org.apache.hadoop.hive.conf.Validator.WritableDirectoryValidator;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
@@ -2630,6 +2632,12 @@ public class HiveConf extends Configuration {
         "Maximum size for IO allocator or ORC low-level cache.", "hive.llap.io.cache.orc.size"),
     LLAP_ALLOCATOR_DIRECT("hive.llap.io.allocator.direct", true,
         "Whether ORC low-level cache should use direct allocation."),
+    LLAP_ALLOCATOR_MAPPED("hive.llap.io.allocator.mmap", false,
+        "Whether ORC low-level cache should use memory mapped allocation (direct I/O). \n" +
+        "This is recommended to be used along-side NVDIMM (DAX) or NVMe flash storage."),
+    LLAP_ALLOCATOR_MAPPED_PATH("hive.llap.io.allocator.mmap.path", "/tmp",
+        new WritableDirectoryValidator(),
+        "The directory location for mapping NVDIMM/NVMe flash storage into the ORC low-level cache."),
     LLAP_USE_LRFU("hive.llap.io.use.lrfu", true,
         "Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."),
     LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.01f,

http://git-wip-us.apache.org/repos/asf/hive/blob/dee45522/common/src/java/org/apache/hadoop/hive/conf/Validator.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Validator.java b/common/src/java/org/apache/hadoop/hive/conf/Validator.java
index bb8962a..00e7b28 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Validator.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Validator.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hive.conf;
 
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
@@ -346,4 +349,30 @@ public interface Validator {
       return current > 0 ? ((long)(size / current) + "Pb") : (size + units[0]);
     }
   }
+
+  public class WritableDirectoryValidator implements Validator {
+
+    @Override
+    public String validate(String value) {
+      final Path path = FileSystems.getDefault().getPath(value);
+      if (path == null && value != null) {
+        return String.format("Path '%s' provided could not be located.", value);
+      }
+      final boolean isDir = Files.isDirectory(path);
+      final boolean isWritable = Files.isWritable(path);
+      if (!isDir) {
+        return String.format("Path '%s' provided is not a directory.", value);
+      }
+      if (!isWritable) {
+        return String.format("Path '%s' provided is not writable.", value);
+      }
+      return null;
+    }
+
+    @Override
+    public String toDescription() {
+      return "Expects a writable directory on the local filesystem";
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/dee45522/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 1d5a7db..47325ad 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -18,9 +18,23 @@
 package org.apache.hadoop.hive.llap.cache;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -39,6 +53,8 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
   private final int minAllocation, maxAllocation, arenaSize;
   private final long maxSize;
   private final boolean isDirect;
+  private final boolean isMapped;
+  private final Path cacheDir;
   private final LlapDaemonCacheMetrics metrics;
 
   // We don't know the acceptable size for Java array, so we'll use 1Gb boundary.
@@ -47,13 +63,21 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
   // Don't try to operate with less than MIN_SIZE allocator space, it will just give you grief.
   private static final int MIN_TOTAL_MEMORY_SIZE = 64*1024*1024;
 
+  private static final FileAttribute<Set<PosixFilePermission>> RWX = PosixFilePermissions
+      .asFileAttribute(PosixFilePermissions.fromString("rwx------"));
+  private static final FileAttribute<Set<PosixFilePermission>> RW_ = PosixFilePermissions
+      .asFileAttribute(PosixFilePermissions.fromString("rw-------"));
+
 
   public BuddyAllocator(Configuration conf, MemoryManager mm, LlapDaemonCacheMetrics metrics) {
     this(HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_DIRECT),
+        HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_MAPPED),
         (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC),
         (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MAX_ALLOC),
         HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_ARENA_COUNT),
-        getMaxTotalMemorySize(conf), mm, metrics);
+        getMaxTotalMemorySize(conf), 
+        HiveConf.getVar(conf, ConfVars.LLAP_ALLOCATOR_MAPPED_PATH),
+        mm, metrics);
   }
 
   private static long getMaxTotalMemorySize(Configuration conf) {
@@ -71,14 +95,35 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
   @VisibleForTesting
   public BuddyAllocator(boolean isDirectVal, int minAllocVal, int maxAllocVal, int arenaCount,
       long maxSizeVal, MemoryManager memoryManager, LlapDaemonCacheMetrics metrics) {
+    this(isDirectVal, false /*isMapped*/,  minAllocVal, maxAllocVal, arenaCount, maxSizeVal, 
+        null /* mapping path */, memoryManager, metrics);
+  }
+
+  @VisibleForTesting
+  public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal, int maxAllocVal,
+      int arenaCount, long maxSizeVal, String mapPath, MemoryManager memoryManager,
+      LlapDaemonCacheMetrics metrics) {
     isDirect = isDirectVal;
+    isMapped = isMappedVal;
     minAllocation = minAllocVal;
     maxAllocation = maxAllocVal;
+    if (isMapped) {
+      try {
+        cacheDir =
+            Files.createTempDirectory(FileSystems.getDefault().getPath(mapPath), "llap-", RWX);
+      } catch (IOException ioe) {
+        // conf validator already checks this, so it will never trigger usually
+        throw new AssertionError("Configured mmap directory should be writable", ioe);
+      }
+    } else {
+      cacheDir = null;
+    }
     int arenaSizeVal = (arenaCount == 0) ? MAX_ARENA_SIZE : (int)(maxSizeVal / arenaCount);
     arenaSizeVal = Math.max(maxAllocation, Math.min(arenaSizeVal, MAX_ARENA_SIZE));
     if (LlapIoImpl.LOG.isInfoEnabled()) {
-      LlapIoImpl.LOG.info("Buddy allocator with " + (isDirect ? "direct" : "byte")
-          + " buffers; allocation sizes " + minAllocation + " - " + maxAllocation
+      LlapIoImpl.LOG.info("Buddy allocator with " + (isDirect ? "direct" : "byte") + " buffers;"
+          + (isMapped ? (" memory mapped off " + cacheDir.toString() + "; ") : "")
+          + "allocation sizes " + minAllocation + " - " + maxAllocation
           + ", arena size " + arenaSizeVal + ". total size " + maxSizeVal);
     }
 
@@ -288,6 +333,28 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
     return maxSize;
   }
 
+  private ByteBuffer preallocate(int arenaSize) {
+    if (isMapped) {
+      Preconditions.checkArgument(isDirect, "All memory mapped allocations have to be direct buffers");
+      try {
+        File rf = File.createTempFile("arena-", ".cache", cacheDir.toFile());
+        RandomAccessFile rwf = new RandomAccessFile(rf, "rw");
+        rwf.setLength(arenaSize); // truncate (TODO: posix_fallocate?)
+        ByteBuffer rwbuf = rwf.getChannel().map(MapMode.PRIVATE, 0, arenaSize);
+        // A mapping, once established, is not dependent upon the file channel that was used to
+        // create it. delete file and hold onto the map
+        rwf.close();
+        rf.delete();
+        return rwbuf;
+      } catch (IOException ioe) {
+        LlapIoImpl.LOG.warn("Failed trying to allocate memory mapped arena", ioe);
+        // fail similarly when memory allocations fail
+        throw new OutOfMemoryError("Failed trying to allocate memory mapped arena: " + ioe.getMessage());
+      }
+    }
+    return isDirect ? ByteBuffer.allocateDirect(arenaSize) : ByteBuffer.allocate(arenaSize);
+  }
+
   private class Arena {
     private ByteBuffer data;
     // Avoid storing headers with data since we expect binary size allocations.
@@ -297,7 +364,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
 
     void init() {
       try {
-        data = isDirect ? ByteBuffer.allocateDirect(arenaSize) : ByteBuffer.allocate(arenaSize);
+        data = preallocate(arenaSize);
       } catch (OutOfMemoryError oom) {
         throw new OutOfMemoryError("Cannot allocate " + arenaSize + " bytes: " + oom.getMessage()
             + "; make sure your xmx and process size are set correctly.");

http://git-wip-us.apache.org/repos/asf/hive/blob/dee45522/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index c8fddb1..dc80992 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -200,16 +200,23 @@ public class LlapServiceDriver {
 
     if (options.getSize() != -1) {
       if (options.getCache() != -1) {
-        Preconditions.checkArgument(options.getCache() < options.getSize(),
-            "Cache has to be smaller than the container sizing");
+        if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED) == false) {
+          // direct heap allocations need to be safer
+          Preconditions.checkArgument(options.getCache() < options.getSize(),
+              "Cache has to be smaller than the container sizing");
+        } else if (options.getCache() < options.getSize()) {
+          LOG.warn("Note that this might need YARN physical memory monitoring to be turned off (yarn.nodemanager.pmem-check-enabled=false)");
+        }
       }
       if (options.getXmx() != -1) {
         Preconditions.checkArgument(options.getXmx() < options.getSize(),
             "Working memory has to be smaller than the container sizing");
       }
-      if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)) {
+      if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)
+          && false == HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) {
+        // direct and not memory mapped
         Preconditions.checkArgument(options.getXmx() + options.getCache() < options.getSize(),
-            "Working memory + cache has to be smaller than the containing sizing ");
+            "Working memory + cache has to be smaller than the container sizing ");
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/dee45522/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 7b04103..345f5b1 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -18,8 +18,9 @@
 package org.apache.hadoop.hive.llap.cache;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -27,18 +28,33 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.io.Allocator.AllocatorOutOfMemoryException;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+@RunWith(Parameterized.class)
 public class TestBuddyAllocator {
   private static final Logger LOG = LoggerFactory.getLogger(TestBuddyAllocator.class);
   private final Random rdm = new Random(2284);
+  private final boolean isDirect;
+  private final boolean isMapped;
+  private final String tmpDir = System.getProperty("java.io.tmpdir", ".");
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] { { false, false }, { true, false }, { true, true } });
+  }
+
+  public TestBuddyAllocator(boolean direct, boolean mmap) {
+    isDirect = direct;
+    isMapped = mmap;
+  }
 
   private static class DummyMemoryManager implements MemoryManager {
     @Override
@@ -78,8 +94,8 @@ public class TestBuddyAllocator {
   @Test
   public void testSameSizes() throws Exception {
     int min = 3, max = 8, maxAlloc = 1 << max;
-    BuddyAllocator a = new BuddyAllocator(false, 1 << min, maxAlloc, maxAlloc, maxAlloc,
-        new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
+    BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc, maxAlloc,
+        tmpDir, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
     for (int i = max; i >= min; --i) {
       allocSameSize(a, 1 << (max - i), i);
     }
@@ -88,16 +104,16 @@ public class TestBuddyAllocator {
   @Test
   public void testMultipleArenas() throws Exception {
     int max = 8, maxAlloc = 1 << max, allocLog2 = max - 1, arenaCount = 5;
-    BuddyAllocator a = new BuddyAllocator(false, 1 << 3, maxAlloc, maxAlloc, maxAlloc * arenaCount,
-        new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
+    BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << 3, maxAlloc, maxAlloc, maxAlloc * arenaCount,
+        tmpDir, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
     allocSameSize(a, arenaCount * 2, allocLog2);
   }
 
   @Test
   public void testMTT() {
     final int min = 3, max = 8, maxAlloc = 1 << max, allocsPerSize = 3;
-    final BuddyAllocator a = new BuddyAllocator(false, 1 << min, maxAlloc, maxAlloc * 8,
-        maxAlloc * 24, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
+    final BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc * 8,
+        maxAlloc * 24, tmpDir, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
     ExecutorService executor = Executors.newFixedThreadPool(3);
     final CountDownLatch cdlIn = new CountDownLatch(3), cdlOut = new CountDownLatch(1);
     FutureTask<Void> upTask = new FutureTask<Void>(new Callable<Void>() {
@@ -140,8 +156,8 @@ public class TestBuddyAllocator {
   @Test
   public void testMTTArenas() {
     final int min = 3, max = 4, maxAlloc = 1 << max, minAllocCount = 2048, threadCount = 4;
-    final BuddyAllocator a = new BuddyAllocator(false, 1 << min, maxAlloc, maxAlloc,
-        (1 << min) * minAllocCount, new DummyMemoryManager(),
+    final BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc,
+        (1 << min) * minAllocCount, tmpDir, new DummyMemoryManager(),
         LlapDaemonCacheMetrics.create("test", "1"));
     ExecutorService executor = Executors.newFixedThreadPool(threadCount);
     final CountDownLatch cdlIn = new CountDownLatch(threadCount), cdlOut = new CountDownLatch(1);
@@ -180,8 +196,8 @@ public class TestBuddyAllocator {
   private void testVariableSizeInternal(
       int allocCount, int arenaSizeMult, int arenaCount) throws Exception {
     int min = 3, max = 8, maxAlloc = 1 << max, arenaSize = maxAlloc * arenaSizeMult;
-    BuddyAllocator a = new BuddyAllocator(false, 1 << min, maxAlloc, arenaSize,
-        arenaSize * arenaCount, new DummyMemoryManager(),
+    BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, arenaSize,
+        arenaSize * arenaCount, tmpDir , new DummyMemoryManager(),
         LlapDaemonCacheMetrics.create("test", "1"));
     allocateUp(a, min, max, allocCount, true);
     allocateDown(a, min, max, allocCount, true);