You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/11/17 18:02:37 UTC

[samza] branch master updated: SAMZA-2600: Extract constants for string literals used in AM and container (#1439)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 120f2fb  SAMZA-2600: Extract constants for string literals used in AM and container (#1439)
120f2fb is described below

commit 120f2fb5caf9f0606a7ee233e7445c4db469f413
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Tue Nov 17 09:56:49 2020 -0800

    SAMZA-2600: Extract constants for string literals used in AM and container (#1439)
    
    * Extract constants for string literals used in AM and container
    * Rename samza.autosizing.server.url to yarn.am.tracking.url
---
 .../org/apache/samza/container/ContainerHeartbeatClient.java     | 3 ++-
 .../java/org/apache/samza/coordinator/CoordinationConstants.java | 9 +++++++++
 .../org/apache/samza/webapp/YarnContainerHeartbeatServlet.java   | 4 ++--
 .../org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala    | 6 ++++--
 .../apache/samza/webapp/TestYarnContainerHeartbeatServlet.java   | 6 ++++--
 5 files changed, 21 insertions(+), 7 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
index e6208ba..8a90549 100644
--- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
@@ -25,6 +25,7 @@ import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.stream.Collectors;
+import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.util.HttpUtil;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
@@ -56,7 +57,7 @@ public class ContainerHeartbeatClient {
 
   public ContainerHeartbeatClient(String coordinatorUrl, String executionEnvContainerId) {
     this.heartbeatEndpoint =
-        String.format("%scontainerHeartbeat?executionContainerId=%s", coordinatorUrl, executionEnvContainerId);
+        String.format(CoordinationConstants.YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT, coordinatorUrl, executionEnvContainerId);
   }
 
   /**
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
index d7a648b..22268a8 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
@@ -27,4 +27,13 @@ public final class CoordinationConstants {
   public static final String APPLICATION_RUNNER_PATH_SUFFIX = "ApplicationRunnerData";
   public static final String RUNID_LOCK_ID = "runId";
   public static final int LOCK_TIMEOUT_MS = 300000;
+
+  // Yarn coordination constants for heartbeat
+  public static final String YARN_CONTAINER_HEARTBEAT_SERVELET = "containerHeartbeat";
+  public static final String YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID = "executionContainerId";
+  public static final String YARN_COORDINATOR_URL = "yarn.am.tracking.url";
+  private static final String YARN_CONTAINER_HEARTBEAT_SERVLET_FORMAT = "%s" + YARN_CONTAINER_HEARTBEAT_SERVELET;
+  private static final String YARN_CONTAINER_EXECUTION_ID_PARAM_FORMAT = YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID + "=" + "%s";
+  public static final String YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT = YARN_CONTAINER_HEARTBEAT_SERVLET_FORMAT + "?" +
+      YARN_CONTAINER_EXECUTION_ID_PARAM_FORMAT;
 }
diff --git a/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java b/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
index 3ed3928..b683d35 100644
--- a/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
+++ b/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
@@ -27,6 +27,7 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.samza.container.ContainerHeartbeatResponse;
+import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.job.yarn.SamzaAppMasterMetrics;
 import org.apache.samza.job.yarn.YarnAppState;
 import org.apache.samza.job.yarn.YarnContainer;
@@ -47,7 +48,6 @@ import org.slf4j.LoggerFactory;
  */
 public class YarnContainerHeartbeatServlet extends HttpServlet {
 
-  private static final String YARN_CONTAINER_ID = "executionContainerId";
   private static final Logger LOG = LoggerFactory.getLogger(YarnContainerHeartbeatServlet.class);
   private static final String APPLICATION_JSON = "application/json";
   private static final String GROUP = SamzaAppMasterMetrics.class.getName();
@@ -67,7 +67,7 @@ public class YarnContainerHeartbeatServlet extends HttpServlet {
       throws ServletException, IOException {
     ContainerId yarnContainerId;
     PrintWriter printWriter = resp.getWriter();
-    String containerIdParam = req.getParameter(YARN_CONTAINER_ID);
+    String containerIdParam = req.getParameter(CoordinationConstants.YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID);
     ContainerHeartbeatResponse response;
     resp.setContentType(APPLICATION_JSON);
     boolean alive = false;
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
index f436f79..0f512ad 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.samza.clustermanager.SamzaApplicationState
 import org.apache.samza.config.Config
+import org.apache.samza.coordinator.CoordinationConstants
 import org.apache.samza.coordinator.server.HttpServer
 import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
 import org.apache.samza.coordinator.stream.messages.SetConfig
@@ -40,7 +41,7 @@ import org.apache.samza.webapp.{ApplicationMasterRestServlet, ApplicationMasterW
 class SamzaYarnAppMasterService(config: Config, samzaAppState: SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry, yarnConfiguration: YarnConfiguration) extends  Logging {
   var rpcApp: HttpServer = null
   var webApp: HttpServer = null
-  val SERVER_URL_OPT: String = "samza.autoscaling.server.url"
+  val SERVER_URL_OPT: String = CoordinationConstants.YARN_COORDINATOR_URL;
   var securityManager: Option[SamzaAppMasterSecurityManager] = None
 
   def onInit() {
@@ -56,7 +57,8 @@ class SamzaYarnAppMasterService(config: Config, samzaAppState: SamzaApplicationS
     webApp.addServlet("/*", new ApplicationMasterWebServlet(config, samzaAppState, state))
     webApp.start
 
-    samzaAppState.jobModelManager.server.addServlet("/containerHeartbeat", new YarnContainerHeartbeatServlet(state, registry))
+    samzaAppState.jobModelManager.server.addServlet("/" + CoordinationConstants.YARN_CONTAINER_HEARTBEAT_SERVELET,
+      new YarnContainerHeartbeatServlet(state, registry))
     samzaAppState.jobModelManager.start
     state.rpcUrl = rpcApp.getUrl
     state.trackingUrl = webApp.getUrl
diff --git a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
index 8987834..0901d85 100644
--- a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
+++ b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
@@ -24,6 +24,7 @@ import java.net.URL;
 import junit.framework.Assert;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.samza.container.ContainerHeartbeatResponse;
+import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.coordinator.server.HttpServer;
 import org.apache.samza.job.yarn.YarnAppState;
 import org.apache.samza.job.yarn.YarnContainer;
@@ -76,7 +77,7 @@ public class TestYarnContainerHeartbeatServlet {
     String validContainerId = "container_1350670447861_0003_01_000002";
     when(container.id()).thenReturn(ConverterUtils.toContainerId(validContainerId));
     yarnAppState.runningProcessors.put(validContainerId, container);
-    URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + validContainerId);
+    URL url = new URL(String.format(CoordinationConstants.YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT, webApp.getUrl().toString(), validContainerId));
     String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
     heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class);
     Assert.assertTrue(heartbeat.isAlive());
@@ -89,7 +90,8 @@ public class TestYarnContainerHeartbeatServlet {
     String invalidContainerId = "container_1350670447861_0003_01_000002";
     when(container.id()).thenReturn(ConverterUtils.toContainerId(validContainerId));
     yarnAppState.runningProcessors.put(validContainerId, container);
-    URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + invalidContainerId);
+    URL url = new URL(String.format(CoordinationConstants.YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT,
+        webApp.getUrl().toString(), invalidContainerId));
     String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
     heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class);
     Assert.assertFalse(heartbeat.isAlive());