You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2019/08/23 07:09:28 UTC
[incubator-nemo] branch master updated: [NEMO-414] Command-line
specified runtime data plane configurations not applied (#235)
This is an automated email from the ASF dual-hosted git repository.
taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new ebaf109 [NEMO-414] Command-line specified runtime data plane configurations not applied (#235)
ebaf109 is described below
commit ebaf109a0880bce49b7c2fd45f795d875a6945be
Author: Haeyoon Cho <ch...@gmail.com>
AuthorDate: Fri Aug 23 16:09:23 2019 +0900
[NEMO-414] Command-line specified runtime data plane configurations not applied (#235)
JIRA: [NEMO-414: Command-line specified runtime data plane configurations not applied](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-414)
**Major changes:**
- User-specified data plane configuration now applies to parameters.
**Minor changes to note:**
- The number of serialization threads for scheduling(schedule_ser_thread) now can be set through the command-line argument.
---
.../java/org/apache/nemo/client/JobLauncher.java | 1 +
.../java/org/apache/nemo/conf/DataPlaneConf.java | 72 ++++++++++++++++++++++
.../java/org/apache/nemo/driver/NemoDriver.java | 8 ++-
3 files changed, 80 insertions(+), 1 deletion(-)
diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index 573ab9c..e9f3394 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -417,6 +417,7 @@ public final class JobLauncher {
cl.registerShortNameOfClass(JobConf.PartitionTransportClientNumThreads.class);
cl.registerShortNameOfClass(JobConf.MaxNumDownloadsForARuntimeEdge.class);
cl.registerShortNameOfClass(JobConf.SchedulerImplClassName.class);
+ cl.registerShortNameOfClass(JobConf.ScheduleSerThread.class);
cl.registerShortNameOfClass(JobConf.MaxOffheapMb.class);
cl.registerShortNameOfClass(JobConf.ChunkSizeKb.class);
cl.processCommandLine(args);
diff --git a/conf/src/main/java/org/apache/nemo/conf/DataPlaneConf.java b/conf/src/main/java/org/apache/nemo/conf/DataPlaneConf.java
new file mode 100644
index 0000000..b48c92a
--- /dev/null
+++ b/conf/src/main/java/org/apache/nemo/conf/DataPlaneConf.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.conf;
+
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * Data plane Configuration for Executors.
+ */
+public final class DataPlaneConf {
+
+ private final int numIOThreads;
+ private final int maxNumDownloads;
+ private final int scheduleSerThread;
+ private final int serverPort;
+ private final int clientNumThreads;
+ private final int serverBackLog;
+ private final int listenThreads;
+ private final int workThreads;
+
+ @Inject
+ private DataPlaneConf(@Parameter(JobConf.IORequestHandleThreadsTotal.class) final int numIOThreads,
+ @Parameter(JobConf.MaxNumDownloadsForARuntimeEdge.class) final int maxNumDownloads,
+ @Parameter(JobConf.ScheduleSerThread.class) final int scheduleSerThread,
+ @Parameter(JobConf.PartitionTransportServerPort.class) final int serverPort,
+ @Parameter(JobConf.PartitionTransportClientNumThreads.class) final int clientNumThreads,
+ @Parameter(JobConf.PartitionTransportServerBacklog.class) final int serverBackLog,
+ @Parameter(JobConf.PartitionTransportServerNumListeningThreads.class) final int listenThreads,
+ @Parameter(JobConf.PartitionTransportServerNumWorkingThreads.class) final int workThreads) {
+ this.numIOThreads = numIOThreads;
+ this.maxNumDownloads = maxNumDownloads;
+ this.scheduleSerThread = scheduleSerThread;
+ this.serverPort = serverPort;
+ this.clientNumThreads = clientNumThreads;
+ this.serverBackLog = serverBackLog;
+ this.listenThreads = listenThreads;
+ this.workThreads = workThreads;
+ }
+
+ public Configuration getDataPlaneConfiguration() {
+ return Tang.Factory.getTang().newConfigurationBuilder()
+ .bindNamedParameter(JobConf.IORequestHandleThreadsTotal.class, Integer.toString(numIOThreads))
+ .bindNamedParameter(JobConf.MaxNumDownloadsForARuntimeEdge.class, Integer.toString(maxNumDownloads))
+ .bindNamedParameter(JobConf.ScheduleSerThread.class, Integer.toString(scheduleSerThread))
+ .bindNamedParameter(JobConf.PartitionTransportServerPort.class, Integer.toString(serverPort))
+ .bindNamedParameter(JobConf.PartitionTransportClientNumThreads.class, Integer.toString(clientNumThreads))
+ .bindNamedParameter(JobConf.PartitionTransportServerBacklog.class, Integer.toString(serverBackLog))
+ .bindNamedParameter(JobConf.PartitionTransportServerNumListeningThreads.class, Integer.toString(listenThreads))
+ .bindNamedParameter(JobConf.PartitionTransportServerNumWorkingThreads.class, Integer.toString(workThreads))
+ .build();
+ }
+ }
diff --git a/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java b/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java
index afca55a..3b43abd 100644
--- a/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.nemo.common.ir.IdManager;
import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.ResourceSitePass;
import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.XGBoostPass;
+import org.apache.nemo.conf.DataPlaneConf;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
@@ -80,6 +81,7 @@ public final class NemoDriver {
private final String localDirectory;
private final String glusterDirectory;
private final ClientRPC clientRPC;
+ private final DataPlaneConf dataPlaneConf;
private static ExecutorService runnerThread = Executors.newSingleThreadExecutor(
new BasicThreadFactory.Builder().namingPattern("User App thread-%d").build());
@@ -94,6 +96,7 @@ public final class NemoDriver {
final LocalAddressProvider localAddressProvider,
final JobMessageObserver client,
final ClientRPC clientRPC,
+ final DataPlaneConf dataPlaneConf,
@Parameter(JobConf.ExecutorJSONContents.class) final String resourceSpecificationString,
@Parameter(JobConf.BandwidthJSONContents.class) final String bandwidthString,
@Parameter(JobConf.JobId.class) final String jobId,
@@ -110,6 +113,7 @@ public final class NemoDriver {
this.glusterDirectory = glusterDirectory;
this.handler = new RemoteClientMessageLoggingHandler(client);
this.clientRPC = clientRPC;
+ this.dataPlaneConf = dataPlaneConf;
// TODO #69: Support job-wide execution property
ResourceSitePass.setBandwidthSpecificationString(bandwidthString);
clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.Notification, this::handleNotification);
@@ -259,8 +263,10 @@ public final class NemoDriver {
final Configuration ncsConfiguration = getExecutorNcsConfiguration();
final Configuration messageConfiguration = getExecutorMessageConfiguration(executorId);
+ final Configuration dataPlaneConfiguration = dataPlaneConf.getDataPlaneConfiguration();
- return Configurations.merge(executorConfiguration, contextConfiguration, ncsConfiguration, messageConfiguration);
+ return Configurations.merge(executorConfiguration, contextConfiguration, ncsConfiguration,
+ messageConfiguration, dataPlaneConfiguration);
}
private Configuration getExecutorNcsConfiguration() {