You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by gw...@apache.org on 2019/08/26 03:01:24 UTC

[incubator-nemo] branch master updated: [NEMO-397] Separation of JVM heap region and off-heap memory region (#236)

This is an automated email from the ASF dual-hosted git repository.

gwlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new bd70bb2  [NEMO-397] Separation of JVM heap region and off-heap memory region (#236)
bd70bb2 is described below

commit bd70bb2502c87ba485fdcef76314c9e97fa067cc
Author: Haeyoon Cho <ch...@gmail.com>
AuthorDate: Mon Aug 26 12:01:19 2019 +0900

    [NEMO-397] Separation of JVM heap region and off-heap memory region (#236)
    
    JIRA: [NEMO-397: Separation of JVM heap region and off-heap memory region](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-397)
    
    **Major changes:**
    - Off-heap memory ratio option added
    - slack region reserved for off-heap memory region by adding the two
    
    Closes #236
---
 .../java/org/apache/nemo/client/JobLauncher.java   | 35 ++++++++++++++++++++--
 .../java/org/apache/nemo/conf/DataPlaneConf.java   | 10 ++++++-
 .../main/java/org/apache/nemo/conf/JobConf.java    | 13 ++++++--
 .../nemo/runtime/executor/data/BlockStoreTest.java |  4 +++
 .../executor/datatransfer/DataTransferTest.java    |  1 +
 5 files changed, 56 insertions(+), 7 deletions(-)

diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index e9f3394..0211122 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -18,6 +18,8 @@
  */
 package org.apache.nemo.client;
 
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import org.apache.commons.lang3.SerializationUtils;
@@ -147,6 +149,8 @@ public final class JobLauncher {
       + "{\"type\":\"Reserved\",\"memory_mb\":512,\"capacity\":5}]";
     final Configuration executorResourceConfig = getJSONConf(builtJobConf, JobConf.ExecutorJSONPath.class,
       JobConf.ExecutorJSONContents.class, defaultExecutorResourceConfig);
+    final Configuration offheapMemoryConfig = getMemoryConf(builtJobConf, executorResourceConfig,
+      JobConf.ExecutorJSONContents.class, JobConf.MaxOffheapRatio.class, JobConf.MaxOffheapMb.class);
     final Configuration bandwidthConfig = getJSONConf(builtJobConf, JobConf.BandwidthJSONPath.class,
       JobConf.BandwidthJSONContents.class, "");
     final Configuration clientConf = getClientConf();
@@ -154,7 +158,8 @@ public final class JobLauncher {
 
     // Merge Job and Driver Confs
     jobAndDriverConf = Configurations.merge(builtJobConf, driverConf, driverNcsConf, driverMessageConfig,
-      executorResourceConfig, bandwidthConfig, driverRPCServer.getListeningConfiguration(), schedulerConf);
+      executorResourceConfig, bandwidthConfig, driverRPCServer.getListeningConfiguration(), schedulerConf,
+      offheapMemoryConfig);
 
     // Get DeployMode Conf
     deployModeConf = Configurations.merge(getDeployModeConf(builtJobConf), clientConf);
@@ -418,7 +423,7 @@ public final class JobLauncher {
     cl.registerShortNameOfClass(JobConf.MaxNumDownloadsForARuntimeEdge.class);
     cl.registerShortNameOfClass(JobConf.SchedulerImplClassName.class);
     cl.registerShortNameOfClass(JobConf.ScheduleSerThread.class);
-    cl.registerShortNameOfClass(JobConf.MaxOffheapMb.class);
+    cl.registerShortNameOfClass(JobConf.MaxOffheapRatio.class);
     cl.registerShortNameOfClass(JobConf.ChunkSizeKb.class);
     cl.processCommandLine(args);
     return confBuilder.build();
@@ -441,7 +446,9 @@ public final class JobLauncher {
           .build();
       case "yarn":
         return YarnClientConfiguration.CONF
-          .set(YarnClientConfiguration.JVM_HEAP_SLACK, injector.getNamedInstance(JobConf.JVMHeapSlack.class))
+          .set(YarnClientConfiguration.JVM_HEAP_SLACK, injector.getNamedInstance(JobConf.JVMHeapSlack.class)
+            + injector.getNamedInstance(JobConf.MaxOffheapRatio.class))
+          // Off-heap memory size is added to memory slack so that JVM heap region does not invade the off-heap region.
           .build();
       default:
         throw new UnsupportedOperationException(deployMode);
@@ -476,6 +483,28 @@ public final class JobLauncher {
     }
   }
 
+  private static Configuration getMemoryConf(final Configuration jobConf,
+                                             final Configuration executorConf,
+                                             final Class<? extends Name<String>> contentsParameter,
+                                             final Class<? extends Name<Double>> offHeapRatio,
+                                             final Class<? extends Name<Integer>> maxOffHeapMb)
+    throws InjectionException {
+    final Injector injector = TANG.newInjector(Configurations.merge(jobConf, executorConf));
+    try {
+      final String contents = injector.getNamedInstance(contentsParameter);
+      final ObjectMapper objectMapper = new ObjectMapper();
+      final TreeNode jsonRootNode = objectMapper.readTree(contents);
+      final TreeNode resourceNode = jsonRootNode.get(0);
+      final int executorMemory = resourceNode.get("memory_mb").traverse().getIntValue();
+      final int offHeapMemory =  (int) (executorMemory * injector.getNamedInstance(offHeapRatio));
+      return TANG.newConfigurationBuilder()
+        .bindNamedParameter(maxOffHeapMb, String.valueOf(offHeapMemory))
+        .build();
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   /**
    * Get the built job configuration.
    * It can be {@code null} if this method is not called by the process which called the main function of this class.
diff --git a/conf/src/main/java/org/apache/nemo/conf/DataPlaneConf.java b/conf/src/main/java/org/apache/nemo/conf/DataPlaneConf.java
index b48c92a..fe1d0df 100644
--- a/conf/src/main/java/org/apache/nemo/conf/DataPlaneConf.java
+++ b/conf/src/main/java/org/apache/nemo/conf/DataPlaneConf.java
@@ -37,6 +37,8 @@ public final class DataPlaneConf {
   private final int serverBackLog;
   private final int listenThreads;
   private final int workThreads;
+  private final int maxOffHeapMb;
+  private final int chunkSizeKb;
 
   @Inject
   private DataPlaneConf(@Parameter(JobConf.IORequestHandleThreadsTotal.class) final int numIOThreads,
@@ -46,7 +48,9 @@ public final class DataPlaneConf {
                         @Parameter(JobConf.PartitionTransportClientNumThreads.class) final int clientNumThreads,
                         @Parameter(JobConf.PartitionTransportServerBacklog.class) final int serverBackLog,
                         @Parameter(JobConf.PartitionTransportServerNumListeningThreads.class) final int listenThreads,
-                        @Parameter(JobConf.PartitionTransportServerNumWorkingThreads.class) final int workThreads) {
+                        @Parameter(JobConf.PartitionTransportServerNumWorkingThreads.class) final int workThreads,
+                        @Parameter(JobConf.MaxOffheapMb.class) final int maxOffHeapMb,
+                        @Parameter(JobConf.ChunkSizeKb.class) final int chunkSizeKb) {
     this.numIOThreads = numIOThreads;
     this.maxNumDownloads = maxNumDownloads;
     this.scheduleSerThread = scheduleSerThread;
@@ -55,6 +59,8 @@ public final class DataPlaneConf {
     this.serverBackLog = serverBackLog;
     this.listenThreads = listenThreads;
     this.workThreads = workThreads;
+    this.maxOffHeapMb = maxOffHeapMb;
+    this.chunkSizeKb = chunkSizeKb;
   }
 
   public Configuration getDataPlaneConfiguration() {
@@ -67,6 +73,8 @@ public final class DataPlaneConf {
       .bindNamedParameter(JobConf.PartitionTransportServerBacklog.class, Integer.toString(serverBackLog))
       .bindNamedParameter(JobConf.PartitionTransportServerNumListeningThreads.class, Integer.toString(listenThreads))
       .bindNamedParameter(JobConf.PartitionTransportServerNumWorkingThreads.class, Integer.toString(workThreads))
+      .bindNamedParameter(JobConf.MaxOffheapMb.class, Integer.toString(maxOffHeapMb))
+      .bindNamedParameter(JobConf.ChunkSizeKb.class, Integer.toString(chunkSizeKb))
       .build();
   }
  }
diff --git a/conf/src/main/java/org/apache/nemo/conf/JobConf.java b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
index c93bc67..d12f191 100644
--- a/conf/src/main/java/org/apache/nemo/conf/JobConf.java
+++ b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
@@ -300,11 +300,18 @@ public final class JobConf extends ConfigurationModuleBuilder {
   }
 
   /**
+   * Maximum off-heap memory ratio to the total memory in the executor.
+   */
+  @NamedParameter(doc = "The maximum ratio of off-heap memory size to the total memory size.",
+    short_name = "max_offheap_ratio", default_value = "0.2")
+  public final class MaxOffheapRatio implements Name<Double> {
+  }
+
+  /**
    * Maximum off-heap memory size in the executor.
+   * This is set by the system according to the off-heap ratio.
    */
-  // TODO #397: Separation of JVM heap region and off-heap memory region
-  @NamedParameter(doc = "The maximum off-heap memory that can be allocated",
-    short_name = "max_offheap_mb", default_value = "8192")
+  @NamedParameter(doc = "The maximum off-heap memory that can be allocated")
   public final class MaxOffheapMb implements Name<Integer> {
   }
 
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockStoreTest.java
index 457960f..b8a148a 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockStoreTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockStoreTest.java
@@ -229,6 +229,7 @@ public final class BlockStoreTest {
   public void testMemoryStore() throws Exception {
     final Injector injector = Tang.Factory.getTang().newInjector();
     injector.bindVolatileInstance(SerializerManager.class, serializerManager);
+    injector.bindVolatileParameter(JobConf.MaxOffheapMb.class, 128);
     final BlockStore memoryStore = injector.getInstance(MemoryStore.class);
     shuffle(memoryStore, memoryStore);
     concurrentRead(memoryStore, memoryStore);
@@ -244,6 +245,7 @@ public final class BlockStoreTest {
   public void testSerMemoryStore() throws Exception {
     final Injector injector = Tang.Factory.getTang().newInjector();
     injector.bindVolatileInstance(SerializerManager.class, serializerManager);
+    injector.bindVolatileParameter(JobConf.MaxOffheapMb.class, 128);
     final BlockStore serMemoryStore = injector.getInstance(SerializedMemoryStore.class);
     shuffle(serMemoryStore, serMemoryStore);
     concurrentRead(serMemoryStore, serMemoryStore);
@@ -261,6 +263,7 @@ public final class BlockStoreTest {
     final Injector injector = Tang.Factory.getTang().newInjector();
     injector.bindVolatileParameter(JobConf.FileDirectory.class, TMP_FILE_DIRECTORY);
     injector.bindVolatileInstance(SerializerManager.class, serializerManager);
+    injector.bindVolatileParameter(JobConf.MaxOffheapMb.class, 128);
 
     final BlockStore localFileStore = injector.getInstance(LocalFileStore.class);
     shuffle(localFileStore, localFileStore);
@@ -297,6 +300,7 @@ public final class BlockStoreTest {
     injector.bindVolatileParameter(JobConf.JobId.class, "GFS test");
     injector.bindVolatileParameter(JobConf.ExecutorId.class, executorId);
     injector.bindVolatileInstance(SerializerManager.class, serializerManager);
+    injector.bindVolatileParameter(JobConf.MaxOffheapMb.class, 128);
     return injector.getInstance(GlusterFileStore.class);
   }
 
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java
index c20c9a5..d846dc6 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -183,6 +183,7 @@ public final class DataTransferTest {
     final Configuration executorConfiguration = TANG.newConfigurationBuilder()
       .bindNamedParameter(JobConf.ExecutorId.class, executorId)
       .bindNamedParameter(MessageParameters.SenderId.class, executorId)
+      .bindNamedParameter(JobConf.MaxOffheapMb.class, "128")
       .build();
     final Injector injector = nameClientInjector.forkInjector(executorConfiguration);
     injector.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);