You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2019/08/23 07:09:28 UTC

[incubator-nemo] branch master updated: [NEMO-414] Command-line specified runtime data plane configurations not applied (#235)

This is an automated email from the ASF dual-hosted git repository.

taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new ebaf109  [NEMO-414] Command-line specified runtime data plane configurations not applied (#235)
ebaf109 is described below

commit ebaf109a0880bce49b7c2fd45f795d875a6945be
Author: Haeyoon Cho <ch...@gmail.com>
AuthorDate: Fri Aug 23 16:09:23 2019 +0900

    [NEMO-414] Command-line specified runtime data plane configurations not applied (#235)
    
    JIRA: [NEMO-414: Command-line specified runtime data plane configurations not applied](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-414)
    
    **Major changes:**
    - User-specified data plane configuration now applies to parameters.
    
    **Minor changes to note:**
    - The number of serialization threads for scheduling(schedule_ser_thread) now can be set through the command-line argument.
---
 .../java/org/apache/nemo/client/JobLauncher.java   |  1 +
 .../java/org/apache/nemo/conf/DataPlaneConf.java   | 72 ++++++++++++++++++++++
 .../java/org/apache/nemo/driver/NemoDriver.java    |  8 ++-
 3 files changed, 80 insertions(+), 1 deletion(-)

diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index 573ab9c..e9f3394 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -417,6 +417,7 @@ public final class JobLauncher {
     cl.registerShortNameOfClass(JobConf.PartitionTransportClientNumThreads.class);
     cl.registerShortNameOfClass(JobConf.MaxNumDownloadsForARuntimeEdge.class);
     cl.registerShortNameOfClass(JobConf.SchedulerImplClassName.class);
+    cl.registerShortNameOfClass(JobConf.ScheduleSerThread.class);
     cl.registerShortNameOfClass(JobConf.MaxOffheapMb.class);
     cl.registerShortNameOfClass(JobConf.ChunkSizeKb.class);
     cl.processCommandLine(args);
diff --git a/conf/src/main/java/org/apache/nemo/conf/DataPlaneConf.java b/conf/src/main/java/org/apache/nemo/conf/DataPlaneConf.java
new file mode 100644
index 0000000..b48c92a
--- /dev/null
+++ b/conf/src/main/java/org/apache/nemo/conf/DataPlaneConf.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.conf;
+
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * Data plane Configuration for Executors.
+ */
+public final class DataPlaneConf {
+
+  private final int numIOThreads;
+  private final int maxNumDownloads;
+  private final int scheduleSerThread;
+  private final int serverPort;
+  private final int clientNumThreads;
+  private final int serverBackLog;
+  private final int listenThreads;
+  private final int workThreads;
+
+  @Inject
+  private DataPlaneConf(@Parameter(JobConf.IORequestHandleThreadsTotal.class) final int numIOThreads,
+                        @Parameter(JobConf.MaxNumDownloadsForARuntimeEdge.class) final int maxNumDownloads,
+                        @Parameter(JobConf.ScheduleSerThread.class) final int scheduleSerThread,
+                        @Parameter(JobConf.PartitionTransportServerPort.class) final int serverPort,
+                        @Parameter(JobConf.PartitionTransportClientNumThreads.class) final int clientNumThreads,
+                        @Parameter(JobConf.PartitionTransportServerBacklog.class) final int serverBackLog,
+                        @Parameter(JobConf.PartitionTransportServerNumListeningThreads.class) final int listenThreads,
+                        @Parameter(JobConf.PartitionTransportServerNumWorkingThreads.class) final int workThreads) {
+    this.numIOThreads = numIOThreads;
+    this.maxNumDownloads = maxNumDownloads;
+    this.scheduleSerThread = scheduleSerThread;
+    this.serverPort = serverPort;
+    this.clientNumThreads = clientNumThreads;
+    this.serverBackLog = serverBackLog;
+    this.listenThreads = listenThreads;
+    this.workThreads = workThreads;
+  }
+
+  public Configuration getDataPlaneConfiguration() {
+    return Tang.Factory.getTang().newConfigurationBuilder()
+      .bindNamedParameter(JobConf.IORequestHandleThreadsTotal.class, Integer.toString(numIOThreads))
+      .bindNamedParameter(JobConf.MaxNumDownloadsForARuntimeEdge.class, Integer.toString(maxNumDownloads))
+      .bindNamedParameter(JobConf.ScheduleSerThread.class, Integer.toString(scheduleSerThread))
+      .bindNamedParameter(JobConf.PartitionTransportServerPort.class, Integer.toString(serverPort))
+      .bindNamedParameter(JobConf.PartitionTransportClientNumThreads.class, Integer.toString(clientNumThreads))
+      .bindNamedParameter(JobConf.PartitionTransportServerBacklog.class, Integer.toString(serverBackLog))
+      .bindNamedParameter(JobConf.PartitionTransportServerNumListeningThreads.class, Integer.toString(listenThreads))
+      .bindNamedParameter(JobConf.PartitionTransportServerNumWorkingThreads.class, Integer.toString(workThreads))
+      .build();
+  }
+ }
diff --git a/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java b/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java
index afca55a..3b43abd 100644
--- a/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.nemo.common.ir.IdManager;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.ResourceSitePass;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.XGBoostPass;
+import org.apache.nemo.conf.DataPlaneConf;
 import org.apache.nemo.conf.JobConf;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
@@ -80,6 +81,7 @@ public final class NemoDriver {
   private final String localDirectory;
   private final String glusterDirectory;
   private final ClientRPC clientRPC;
+  private final DataPlaneConf dataPlaneConf;
 
   private static ExecutorService runnerThread = Executors.newSingleThreadExecutor(
     new BasicThreadFactory.Builder().namingPattern("User App thread-%d").build());
@@ -94,6 +96,7 @@ public final class NemoDriver {
                      final LocalAddressProvider localAddressProvider,
                      final JobMessageObserver client,
                      final ClientRPC clientRPC,
+                     final DataPlaneConf dataPlaneConf,
                      @Parameter(JobConf.ExecutorJSONContents.class) final String resourceSpecificationString,
                      @Parameter(JobConf.BandwidthJSONContents.class) final String bandwidthString,
                      @Parameter(JobConf.JobId.class) final String jobId,
@@ -110,6 +113,7 @@ public final class NemoDriver {
     this.glusterDirectory = glusterDirectory;
     this.handler = new RemoteClientMessageLoggingHandler(client);
     this.clientRPC = clientRPC;
+    this.dataPlaneConf = dataPlaneConf;
     // TODO #69: Support job-wide execution property
     ResourceSitePass.setBandwidthSpecificationString(bandwidthString);
     clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.Notification, this::handleNotification);
@@ -259,8 +263,10 @@ public final class NemoDriver {
 
     final Configuration ncsConfiguration = getExecutorNcsConfiguration();
     final Configuration messageConfiguration = getExecutorMessageConfiguration(executorId);
+    final Configuration dataPlaneConfiguration = dataPlaneConf.getDataPlaneConfiguration();
 
-    return Configurations.merge(executorConfiguration, contextConfiguration, ncsConfiguration, messageConfiguration);
+    return Configurations.merge(executorConfiguration, contextConfiguration, ncsConfiguration,
+      messageConfiguration, dataPlaneConfiguration);
   }
 
   private Configuration getExecutorNcsConfiguration() {