You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2021/05/05 00:08:08 UTC

[samza] branch master updated: SAMZA-2654: Allow coordinator url port to be configurable (#1499)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 65e51ec  SAMZA-2654: Allow coordinator url port to be configurable (#1499)
65e51ec is described below

commit 65e51ecb748ae9910c3131458fba932f4f15a67d
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Tue May 4 17:08:01 2021 -0700

    SAMZA-2654: Allow coordinator url port to be configurable (#1499)
    
    Issues: Currently, the port for the job coordinator url (for accessing job model) is always dynamically allocated. In some cases, it is helpful to be able to hardcode a port.
    
    Changes: When JobModelManager is setting up the HttpServer which serves the coordinator url, read a config to set the port.
    
    API changes:
    Set the config cluster-manager.jobcoordinator.url.port to the port number to use for the coordinator url. This is backwards compatible because the default is to use a value of 0 (to dynamically allocate an unused port), and a value of 0 was the value used before this change was made.
    If the specified port is unavailable, then an exception will be thrown at startup. Therefore, this configuration should only be used when a specific port is needed and it is known that the port is not already in use. In many cases, using dynamic port allocations (i.e. not setting cluster-manager.jobcoordinator.url.port) will be the best way to go, since it will prevent failures due to port conflicts.
    
    See https://github.com/apache/samza/pull/1499 for more details about use cases.
---
 .../org/apache/samza/config/ClusterManagerConfig.java  | 18 ++++++++++++++++++
 .../org/apache/samza/coordinator/JobModelManager.scala |  3 ++-
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
index 3f27991..acb5a0a 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
@@ -137,6 +137,20 @@ public class ClusterManagerConfig extends MapConfig {
   private static final String AM_JMX_ENABLED = "yarn.am.jmx.enabled";
   private static final String CLUSTER_MANAGER_JMX_ENABLED = "cluster-manager.jobcoordinator.jmx.enabled";
 
+  /**
+   * Use this to configure a static port for the job coordinator url for a Samza job. This url is used to provide
+   * information such as job model and locality.
+   * If the value is set to 0, then the port will be dynamically allocated from the available free ports on the node.
+   * The default value of this config is 0.
+   *
+   * Be careful when using this configuration. If the configured port is already in use on the node, then the job
+   * coordinator will fail to start.
+   *
+   * This configuration is experimental, and it might be removed in a future release.
+   */
+  private static final String JOB_COORDINATOR_URL_PORT = "cluster-manager.jobcoordinator.url.port";
+  private static final int DEFAULT_JOB_COORDINATOR_URL_PORT = 0;
+
   public ClusterManagerConfig(Config config) {
       super(config);
   }
@@ -280,4 +294,8 @@ public class ClusterManagerConfig extends MapConfig {
       return true;
     }
   }
+
+  public int getCoordinatorUrlPort() {
+    return getInt(JOB_COORDINATOR_URL_PORT, DEFAULT_JOB_COORDINATOR_URL_PORT);
+  }
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 7c0e747..5dd662b 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -100,7 +100,8 @@ object JobModelManager extends Logging {
 
       updateTaskAssignments(jobModel, taskAssignmentManager, taskPartitionAssignmentManager, grouperMetadata)
 
-      val server = new HttpServer
+      val clusterManagerConfig = new ClusterManagerConfig(config)
+      val server = new HttpServer(port = clusterManagerConfig.getCoordinatorUrlPort)
       server.addServlet("/", new JobServlet(serializedJobModelRef))
       server.addServlet("/locality", new LocalityServlet(localityManager))