You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by gw...@apache.org on 2019/08/26 03:01:24 UTC
[incubator-nemo] branch master updated: [NEMO-397] Separation of
JVM heap region and off-heap memory region (#236)
This is an automated email from the ASF dual-hosted git repository.
gwlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new bd70bb2 [NEMO-397] Separation of JVM heap region and off-heap memory region (#236)
bd70bb2 is described below
commit bd70bb2502c87ba485fdcef76314c9e97fa067cc
Author: Haeyoon Cho <ch...@gmail.com>
AuthorDate: Mon Aug 26 12:01:19 2019 +0900
[NEMO-397] Separation of JVM heap region and off-heap memory region (#236)
JIRA: [NEMO-397: Separation of JVM heap region and off-heap memory region](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-397)
**Major changes:**
- Off-heap memory ratio option added
- slack region reserved for off-heap memory region by adding the two
Closes #236
---
.../java/org/apache/nemo/client/JobLauncher.java | 35 ++++++++++++++++++++--
.../java/org/apache/nemo/conf/DataPlaneConf.java | 10 ++++++-
.../main/java/org/apache/nemo/conf/JobConf.java | 13 ++++++--
.../nemo/runtime/executor/data/BlockStoreTest.java | 4 +++
.../executor/datatransfer/DataTransferTest.java | 1 +
5 files changed, 56 insertions(+), 7 deletions(-)
diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index e9f3394..0211122 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -18,6 +18,8 @@
*/
package org.apache.nemo.client;
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.SerializationUtils;
@@ -147,6 +149,8 @@ public final class JobLauncher {
+ "{\"type\":\"Reserved\",\"memory_mb\":512,\"capacity\":5}]";
final Configuration executorResourceConfig = getJSONConf(builtJobConf, JobConf.ExecutorJSONPath.class,
JobConf.ExecutorJSONContents.class, defaultExecutorResourceConfig);
+ final Configuration offheapMemoryConfig = getMemoryConf(builtJobConf, executorResourceConfig,
+ JobConf.ExecutorJSONContents.class, JobConf.MaxOffheapRatio.class, JobConf.MaxOffheapMb.class);
final Configuration bandwidthConfig = getJSONConf(builtJobConf, JobConf.BandwidthJSONPath.class,
JobConf.BandwidthJSONContents.class, "");
final Configuration clientConf = getClientConf();
@@ -154,7 +158,8 @@ public final class JobLauncher {
// Merge Job and Driver Confs
jobAndDriverConf = Configurations.merge(builtJobConf, driverConf, driverNcsConf, driverMessageConfig,
- executorResourceConfig, bandwidthConfig, driverRPCServer.getListeningConfiguration(), schedulerConf);
+ executorResourceConfig, bandwidthConfig, driverRPCServer.getListeningConfiguration(), schedulerConf,
+ offheapMemoryConfig);
// Get DeployMode Conf
deployModeConf = Configurations.merge(getDeployModeConf(builtJobConf), clientConf);
@@ -418,7 +423,7 @@ public final class JobLauncher {
cl.registerShortNameOfClass(JobConf.MaxNumDownloadsForARuntimeEdge.class);
cl.registerShortNameOfClass(JobConf.SchedulerImplClassName.class);
cl.registerShortNameOfClass(JobConf.ScheduleSerThread.class);
- cl.registerShortNameOfClass(JobConf.MaxOffheapMb.class);
+ cl.registerShortNameOfClass(JobConf.MaxOffheapRatio.class);
cl.registerShortNameOfClass(JobConf.ChunkSizeKb.class);
cl.processCommandLine(args);
return confBuilder.build();
@@ -441,7 +446,9 @@ public final class JobLauncher {
.build();
case "yarn":
return YarnClientConfiguration.CONF
- .set(YarnClientConfiguration.JVM_HEAP_SLACK, injector.getNamedInstance(JobConf.JVMHeapSlack.class))
+ .set(YarnClientConfiguration.JVM_HEAP_SLACK, injector.getNamedInstance(JobConf.JVMHeapSlack.class)
+ + injector.getNamedInstance(JobConf.MaxOffheapRatio.class))
+ // Off-heap memory size is added to memory slack so that JVM heap region does not invade the off-heap region.
.build();
default:
throw new UnsupportedOperationException(deployMode);
@@ -476,6 +483,28 @@ public final class JobLauncher {
}
}
+ private static Configuration getMemoryConf(final Configuration jobConf,
+ final Configuration executorConf,
+ final Class<? extends Name<String>> contentsParameter,
+ final Class<? extends Name<Double>> offHeapRatio,
+ final Class<? extends Name<Integer>> maxOffHeapMb)
+ throws InjectionException {
+ final Injector injector = TANG.newInjector(Configurations.merge(jobConf, executorConf));
+ try {
+ final String contents = injector.getNamedInstance(contentsParameter);
+ final ObjectMapper objectMapper = new ObjectMapper();
+ final TreeNode jsonRootNode = objectMapper.readTree(contents);
+ final TreeNode resourceNode = jsonRootNode.get(0);
+ final int executorMemory = resourceNode.get("memory_mb").traverse().getIntValue();
+ final int offHeapMemory = (int) (executorMemory * injector.getNamedInstance(offHeapRatio));
+ return TANG.newConfigurationBuilder()
+ .bindNamedParameter(maxOffHeapMb, String.valueOf(offHeapMemory))
+ .build();
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Get the built job configuration.
* It can be {@code null} if this method is not called by the process which called the main function of this class.
diff --git a/conf/src/main/java/org/apache/nemo/conf/DataPlaneConf.java b/conf/src/main/java/org/apache/nemo/conf/DataPlaneConf.java
index b48c92a..fe1d0df 100644
--- a/conf/src/main/java/org/apache/nemo/conf/DataPlaneConf.java
+++ b/conf/src/main/java/org/apache/nemo/conf/DataPlaneConf.java
@@ -37,6 +37,8 @@ public final class DataPlaneConf {
private final int serverBackLog;
private final int listenThreads;
private final int workThreads;
+ private final int maxOffHeapMb;
+ private final int chunkSizeKb;
@Inject
private DataPlaneConf(@Parameter(JobConf.IORequestHandleThreadsTotal.class) final int numIOThreads,
@@ -46,7 +48,9 @@ public final class DataPlaneConf {
@Parameter(JobConf.PartitionTransportClientNumThreads.class) final int clientNumThreads,
@Parameter(JobConf.PartitionTransportServerBacklog.class) final int serverBackLog,
@Parameter(JobConf.PartitionTransportServerNumListeningThreads.class) final int listenThreads,
- @Parameter(JobConf.PartitionTransportServerNumWorkingThreads.class) final int workThreads) {
+ @Parameter(JobConf.PartitionTransportServerNumWorkingThreads.class) final int workThreads,
+ @Parameter(JobConf.MaxOffheapMb.class) final int maxOffHeapMb,
+ @Parameter(JobConf.ChunkSizeKb.class) final int chunkSizeKb) {
this.numIOThreads = numIOThreads;
this.maxNumDownloads = maxNumDownloads;
this.scheduleSerThread = scheduleSerThread;
@@ -55,6 +59,8 @@ public final class DataPlaneConf {
this.serverBackLog = serverBackLog;
this.listenThreads = listenThreads;
this.workThreads = workThreads;
+ this.maxOffHeapMb = maxOffHeapMb;
+ this.chunkSizeKb = chunkSizeKb;
}
public Configuration getDataPlaneConfiguration() {
@@ -67,6 +73,8 @@ public final class DataPlaneConf {
.bindNamedParameter(JobConf.PartitionTransportServerBacklog.class, Integer.toString(serverBackLog))
.bindNamedParameter(JobConf.PartitionTransportServerNumListeningThreads.class, Integer.toString(listenThreads))
.bindNamedParameter(JobConf.PartitionTransportServerNumWorkingThreads.class, Integer.toString(workThreads))
+ .bindNamedParameter(JobConf.MaxOffheapMb.class, Integer.toString(maxOffHeapMb))
+ .bindNamedParameter(JobConf.ChunkSizeKb.class, Integer.toString(chunkSizeKb))
.build();
}
}
diff --git a/conf/src/main/java/org/apache/nemo/conf/JobConf.java b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
index c93bc67..d12f191 100644
--- a/conf/src/main/java/org/apache/nemo/conf/JobConf.java
+++ b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
@@ -300,11 +300,18 @@ public final class JobConf extends ConfigurationModuleBuilder {
}
/**
+ * Maximum off-heap memory ratio to the total memory in the executor.
+ */
+ @NamedParameter(doc = "The maximum ratio of off-heap memory size to the total memory size.",
+ short_name = "max_offheap_ratio", default_value = "0.2")
+ public final class MaxOffheapRatio implements Name<Double> {
+ }
+
+ /**
* Maximum off-heap memory size in the executor.
+ * This is set by the system according to the off-heap ratio.
*/
- // TODO #397: Separation of JVM heap region and off-heap memory region
- @NamedParameter(doc = "The maximum off-heap memory that can be allocated",
- short_name = "max_offheap_mb", default_value = "8192")
+ @NamedParameter(doc = "The maximum off-heap memory that can be allocated")
public final class MaxOffheapMb implements Name<Integer> {
}
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockStoreTest.java
index 457960f..b8a148a 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockStoreTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockStoreTest.java
@@ -229,6 +229,7 @@ public final class BlockStoreTest {
public void testMemoryStore() throws Exception {
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileInstance(SerializerManager.class, serializerManager);
+ injector.bindVolatileParameter(JobConf.MaxOffheapMb.class, 128);
final BlockStore memoryStore = injector.getInstance(MemoryStore.class);
shuffle(memoryStore, memoryStore);
concurrentRead(memoryStore, memoryStore);
@@ -244,6 +245,7 @@ public final class BlockStoreTest {
public void testSerMemoryStore() throws Exception {
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileInstance(SerializerManager.class, serializerManager);
+ injector.bindVolatileParameter(JobConf.MaxOffheapMb.class, 128);
final BlockStore serMemoryStore = injector.getInstance(SerializedMemoryStore.class);
shuffle(serMemoryStore, serMemoryStore);
concurrentRead(serMemoryStore, serMemoryStore);
@@ -261,6 +263,7 @@ public final class BlockStoreTest {
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(JobConf.FileDirectory.class, TMP_FILE_DIRECTORY);
injector.bindVolatileInstance(SerializerManager.class, serializerManager);
+ injector.bindVolatileParameter(JobConf.MaxOffheapMb.class, 128);
final BlockStore localFileStore = injector.getInstance(LocalFileStore.class);
shuffle(localFileStore, localFileStore);
@@ -297,6 +300,7 @@ public final class BlockStoreTest {
injector.bindVolatileParameter(JobConf.JobId.class, "GFS test");
injector.bindVolatileParameter(JobConf.ExecutorId.class, executorId);
injector.bindVolatileInstance(SerializerManager.class, serializerManager);
+ injector.bindVolatileParameter(JobConf.MaxOffheapMb.class, 128);
return injector.getInstance(GlusterFileStore.class);
}
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java
index c20c9a5..d846dc6 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -183,6 +183,7 @@ public final class DataTransferTest {
final Configuration executorConfiguration = TANG.newConfigurationBuilder()
.bindNamedParameter(JobConf.ExecutorId.class, executorId)
.bindNamedParameter(MessageParameters.SenderId.class, executorId)
+ .bindNamedParameter(JobConf.MaxOffheapMb.class, "128")
.build();
final Injector injector = nameClientInjector.forkInjector(executorConfiguration);
injector.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);