You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/10/15 04:34:54 UTC

hive git commit: HIVE-20649: LLAP aware memory manager for Orc writers (Prasanth Jayachandran reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/branch-3 2048f6262 -> 1ce6c7c2a


HIVE-20649: LLAP aware memory manager for Orc writers (Prasanth Jayachandran reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1ce6c7c2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1ce6c7c2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1ce6c7c2

Branch: refs/heads/branch-3
Commit: 1ce6c7c2a3a93f0f92078ba8c929a870eaa8134d
Parents: 2048f62
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Sun Oct 14 21:34:08 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Sun Oct 14 21:34:42 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  4 ++
 .../apache/hadoop/hive/ql/io/orc/OrcFile.java   | 48 +++++++++++++++++++-
 .../hadoop/hive/ql/io/orc/TestOrcFile.java      | 41 +++++++++++++++++
 3 files changed, 91 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1ce6c7c2/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3852d79..a04ef38 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1923,6 +1923,10 @@ public class HiveConf extends Configuration {
         " ETL strategy is used when spending little more time in split generation is acceptable" +
         " (split generation reads and caches file footers). HYBRID chooses between the above strategies" +
         " based on heuristics."),
+    HIVE_ORC_WRITER_LLAP_MEMORY_MANAGER_ENABLED("hive.exec.orc.writer.llap.memory.manager.enabled", true,
+      "Whether orc writers should use llap-aware memory manager. LLAP aware memory manager will use memory\n" +
+        "per executor instead of entire heap memory when concurrent orc writers are involved. This will let\n" +
+        "task fragments to use memory within its limit (memory per executor) when performing ETL in LLAP."),
 
     // hive streaming ingest settings
     HIVE_STREAMING_AUTO_FLUSH_ENABLED("hive.streaming.auto.flush.enabled", true, "Whether to enable memory \n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/1ce6c7c2/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index e7dfb05..e246ac2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -24,20 +24,29 @@ import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
+import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.orc.FileMetadata;
+import org.apache.orc.OrcConf;
 import org.apache.orc.PhysicalWriter;
 import org.apache.orc.MemoryManager;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.MemoryManagerImpl;
 import org.apache.orc.impl.OrcTail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Contains factory methods to read or write ORC files.
  */
 public final class OrcFile extends org.apache.orc.OrcFile {
-
+  private static final Logger LOG = LoggerFactory.getLogger(OrcFile.class);
   // unused
   protected OrcFile() {}
 
@@ -96,6 +105,37 @@ public final class OrcFile extends org.apache.orc.OrcFile {
     return new ReaderImpl(path, options);
   }
 
+  @VisibleForTesting
+  static class LlapAwareMemoryManager extends MemoryManagerImpl {
+    private final double maxLoad;
+    private final long totalMemoryPool;
+
+    public LlapAwareMemoryManager(Configuration conf) {
+      super(conf);
+      maxLoad = OrcConf.MEMORY_POOL.getDouble(conf);
+      long memPerExecutor = LlapDaemonInfo.INSTANCE.getMemoryPerExecutor();
+      totalMemoryPool = (long) (memPerExecutor * maxLoad);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Using LLAP memory manager for orc writer. memPerExecutor: {} maxLoad: {} totalMemPool: {}",
+          LlapUtil.humanReadableByteCount(memPerExecutor), maxLoad, LlapUtil.humanReadableByteCount(totalMemoryPool));
+      }
+    }
+
+    @Override
+    public long getTotalMemoryPool() {
+      return totalMemoryPool;
+    }
+  }
+
+  private static ThreadLocal<MemoryManager> threadLocalOrcLlapMemoryManager = null;
+
+  private static synchronized MemoryManager getThreadLocalOrcLlapMemoryManager(final Configuration conf) {
+    if (threadLocalOrcLlapMemoryManager == null) {
+      threadLocalOrcLlapMemoryManager = ThreadLocal.withInitial(() -> new LlapAwareMemoryManager(conf));
+    }
+    return threadLocalOrcLlapMemoryManager.get();
+  }
+
   /**
    * Options for creating ORC file writers.
    */
@@ -111,6 +151,10 @@ public final class OrcFile extends org.apache.orc.OrcFile {
     WriterOptions(Properties tableProperties, Configuration conf) {
       super(tableProperties, conf);
       useUTCTimestamp(true);
+      if (conf.getBoolean(HiveConf.ConfVars.HIVE_ORC_WRITER_LLAP_MEMORY_MANAGER_ENABLED.varname, true) &&
+        LlapProxy.isDaemon()) {
+        memory(getThreadLocalOrcLlapMemoryManager(conf));
+      }
     }
 
    /**

http://git-wip-us.apache.org/repos/asf/hive/blob/1ce6c7c2/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
index 97d4fc6..2931c04 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
@@ -19,12 +19,14 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
 import static junit.framework.Assert.assertNotNull;
 import static junit.framework.Assert.assertNull;
 import static junit.framework.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -43,6 +45,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.Date;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
@@ -91,6 +96,7 @@ import org.apache.orc.StringColumnStatistics;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.StripeStatistics;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.MemoryManagerImpl;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -2202,4 +2208,39 @@ public class TestOrcFile {
     assertEquals(false, reader.hasNext());
     reader.close();
   }
+
+  @Test
+  public void testLlapAwareMemoryManager() throws IOException {
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class,
+        ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+
+    try {
+      OrcFile.WriterOptions opts = OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.ZLIB);
+      Writer writer = OrcFile.createWriter(new Path(testFilePath, "-0"), opts);
+      writer.close();
+      assertEquals(opts.getMemoryManager().getClass(), MemoryManagerImpl.class);
+
+      conf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "llap");
+      LlapDaemonInfo.initialize("test", new Configuration());
+      LlapProxy.setDaemon(true);
+      opts = OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.ZLIB);
+      writer = OrcFile.createWriter(new Path(testFilePath, "-1"), opts);
+      writer.close();
+      assertEquals(opts.getMemoryManager().getClass(), OrcFile.LlapAwareMemoryManager.class);
+      assertEquals(LlapDaemonInfo.INSTANCE.getMemoryPerExecutor() * 0.5,
+        ((OrcFile.LlapAwareMemoryManager) opts.getMemoryManager()).getTotalMemoryPool(), 100);
+
+      conf.setBoolean(HiveConf.ConfVars.HIVE_ORC_WRITER_LLAP_MEMORY_MANAGER_ENABLED.varname, false);
+      opts = OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.ZLIB);
+      writer = OrcFile.createWriter(new Path(testFilePath, "-2"), opts);
+      writer.close();
+      assertEquals(opts.getMemoryManager().getClass(), MemoryManagerImpl.class);
+    } finally {
+      LlapProxy.setDaemon(false);
+      conf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "container");
+    }
+  }
 }