You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/11/17 18:02:37 UTC
[samza] branch master updated: SAMZA-2600: Extract constants for
string literals used in AM and container (#1439)
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 120f2fb SAMZA-2600: Extract constants for string literals used in AM and container (#1439)
120f2fb is described below
commit 120f2fb5caf9f0606a7ee233e7445c4db469f413
Author: lakshmi-manasa-g <mg...@linkedin.com>
AuthorDate: Tue Nov 17 09:56:49 2020 -0800
SAMZA-2600: Extract constants for string literals used in AM and container (#1439)
* Extract constants for string literals used in AM and container
* Rename samza.autosizing.server.url to yarn.am.tracking.url
---
.../org/apache/samza/container/ContainerHeartbeatClient.java | 3 ++-
.../java/org/apache/samza/coordinator/CoordinationConstants.java | 9 +++++++++
.../org/apache/samza/webapp/YarnContainerHeartbeatServlet.java | 4 ++--
.../org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala | 6 ++++--
.../apache/samza/webapp/TestYarnContainerHeartbeatServlet.java | 6 ++++--
5 files changed, 21 insertions(+), 7 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
index e6208ba..8a90549 100644
--- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
@@ -25,6 +25,7 @@ import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.stream.Collectors;
+import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.util.HttpUtil;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
@@ -56,7 +57,7 @@ public class ContainerHeartbeatClient {
public ContainerHeartbeatClient(String coordinatorUrl, String executionEnvContainerId) {
this.heartbeatEndpoint =
- String.format("%scontainerHeartbeat?executionContainerId=%s", coordinatorUrl, executionEnvContainerId);
+ String.format(CoordinationConstants.YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT, coordinatorUrl, executionEnvContainerId);
}
/**
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
index d7a648b..22268a8 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
@@ -27,4 +27,13 @@ public final class CoordinationConstants {
public static final String APPLICATION_RUNNER_PATH_SUFFIX = "ApplicationRunnerData";
public static final String RUNID_LOCK_ID = "runId";
public static final int LOCK_TIMEOUT_MS = 300000;
+
+ // Yarn coordination constants for heartbeat
+ public static final String YARN_CONTAINER_HEARTBEAT_SERVELET = "containerHeartbeat";
+ public static final String YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID = "executionContainerId";
+ public static final String YARN_COORDINATOR_URL = "yarn.am.tracking.url";
+ private static final String YARN_CONTAINER_HEARTBEAT_SERVLET_FORMAT = "%s" + YARN_CONTAINER_HEARTBEAT_SERVELET;
+ private static final String YARN_CONTAINER_EXECUTION_ID_PARAM_FORMAT = YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID + "=" + "%s";
+ public static final String YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT = YARN_CONTAINER_HEARTBEAT_SERVLET_FORMAT + "?" +
+ YARN_CONTAINER_EXECUTION_ID_PARAM_FORMAT;
}
diff --git a/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java b/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
index 3ed3928..b683d35 100644
--- a/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
+++ b/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
@@ -27,6 +27,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.samza.container.ContainerHeartbeatResponse;
+import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.job.yarn.SamzaAppMasterMetrics;
import org.apache.samza.job.yarn.YarnAppState;
import org.apache.samza.job.yarn.YarnContainer;
@@ -47,7 +48,6 @@ import org.slf4j.LoggerFactory;
*/
public class YarnContainerHeartbeatServlet extends HttpServlet {
- private static final String YARN_CONTAINER_ID = "executionContainerId";
private static final Logger LOG = LoggerFactory.getLogger(YarnContainerHeartbeatServlet.class);
private static final String APPLICATION_JSON = "application/json";
private static final String GROUP = SamzaAppMasterMetrics.class.getName();
@@ -67,7 +67,7 @@ public class YarnContainerHeartbeatServlet extends HttpServlet {
throws ServletException, IOException {
ContainerId yarnContainerId;
PrintWriter printWriter = resp.getWriter();
- String containerIdParam = req.getParameter(YARN_CONTAINER_ID);
+ String containerIdParam = req.getParameter(CoordinationConstants.YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID);
ContainerHeartbeatResponse response;
resp.setContentType(APPLICATION_JSON);
boolean alive = false;
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
index f436f79..0f512ad 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.samza.clustermanager.SamzaApplicationState
import org.apache.samza.config.Config
+import org.apache.samza.coordinator.CoordinationConstants
import org.apache.samza.coordinator.server.HttpServer
import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
import org.apache.samza.coordinator.stream.messages.SetConfig
@@ -40,7 +41,7 @@ import org.apache.samza.webapp.{ApplicationMasterRestServlet, ApplicationMasterW
class SamzaYarnAppMasterService(config: Config, samzaAppState: SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry, yarnConfiguration: YarnConfiguration) extends Logging {
var rpcApp: HttpServer = null
var webApp: HttpServer = null
- val SERVER_URL_OPT: String = "samza.autoscaling.server.url"
+ val SERVER_URL_OPT: String = CoordinationConstants.YARN_COORDINATOR_URL;
var securityManager: Option[SamzaAppMasterSecurityManager] = None
def onInit() {
@@ -56,7 +57,8 @@ class SamzaYarnAppMasterService(config: Config, samzaAppState: SamzaApplicationS
webApp.addServlet("/*", new ApplicationMasterWebServlet(config, samzaAppState, state))
webApp.start
- samzaAppState.jobModelManager.server.addServlet("/containerHeartbeat", new YarnContainerHeartbeatServlet(state, registry))
+ samzaAppState.jobModelManager.server.addServlet("/" + CoordinationConstants.YARN_CONTAINER_HEARTBEAT_SERVELET,
+ new YarnContainerHeartbeatServlet(state, registry))
samzaAppState.jobModelManager.start
state.rpcUrl = rpcApp.getUrl
state.trackingUrl = webApp.getUrl
diff --git a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
index 8987834..0901d85 100644
--- a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
+++ b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
@@ -24,6 +24,7 @@ import java.net.URL;
import junit.framework.Assert;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.samza.container.ContainerHeartbeatResponse;
+import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.server.HttpServer;
import org.apache.samza.job.yarn.YarnAppState;
import org.apache.samza.job.yarn.YarnContainer;
@@ -76,7 +77,7 @@ public class TestYarnContainerHeartbeatServlet {
String validContainerId = "container_1350670447861_0003_01_000002";
when(container.id()).thenReturn(ConverterUtils.toContainerId(validContainerId));
yarnAppState.runningProcessors.put(validContainerId, container);
- URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + validContainerId);
+ URL url = new URL(String.format(CoordinationConstants.YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT, webApp.getUrl().toString(), validContainerId));
String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class);
Assert.assertTrue(heartbeat.isAlive());
@@ -89,7 +90,8 @@ public class TestYarnContainerHeartbeatServlet {
String invalidContainerId = "container_1350670447861_0003_01_000002";
when(container.id()).thenReturn(ConverterUtils.toContainerId(validContainerId));
yarnAppState.runningProcessors.put(validContainerId, container);
- URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + invalidContainerId);
+ URL url = new URL(String.format(CoordinationConstants.YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT,
+ webApp.getUrl().toString(), invalidContainerId));
String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class);
Assert.assertFalse(heartbeat.isAlive());