You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/12/09 01:40:28 UTC

[GitHub] [samza] mynameborat commented on a change in pull request #1450: SAMZA-2610: Handle Metadata changes for AM HA orchestration

mynameborat commented on a change in pull request #1450:
URL: https://github.com/apache/samza/pull/1450#discussion_r538929697



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -456,6 +482,18 @@ StartpointManager createStartpointManager() {
 
   @VisibleForTesting
   ContainerProcessManager createContainerProcessManager() {
-    return new ContainerProcessManager(config, state, metrics, containerPlacementMetadataStore, localityManager);
+    return new ContainerProcessManager(config, state, metrics, containerPlacementMetadataStore, localityManager,
+        metadataChangedAcrossAttempts);
+  }
+
+  @VisibleForTesting
+  JobCoordinatorMetadataManager createJobCoordinatorMetadataManager() {
+    return new JobCoordinatorMetadataManager(new NamespaceAwareCoordinatorStreamStore(metadataStore,
+        SetJobCoordinatorMetadataMessage.TYPE), JobCoordinatorMetadataManager.ClusterType.YARN, metrics);
+  }
+
+  @VisibleForTesting
+  boolean isMetadataChangedAcrossAttempts() {

Review comment:
       the getter is only for tests.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -248,16 +253,23 @@ public void start() {
     // Request initial set of containers
     LocalityModel localityModel = localityManager.readLocality();
     Map<String, String> processorToHost = new HashMap<>();
-    state.jobModelManager.jobModel().getContainers().keySet().forEach((containerId) -> {
-      String host = Optional.ofNullable(localityModel.getProcessorLocality(containerId))
+    state.jobModelManager.jobModel().getContainers().keySet().forEach((processorId) -> {
+      String host = Optional.ofNullable(localityModel.getProcessorLocality(processorId))
           .map(ProcessorLocality::host)
           .filter(StringUtils::isNotBlank)
           .orElse(null);
-      processorToHost.put(containerId, host);
+      processorToHost.put(processorId, host);
     });
     if (jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
       // don't request resource for container that is already running
-      state.runningProcessors.keySet().forEach(processorToHost::remove);
+      state.runningProcessors.forEach((processorId, samzaResource) -> {
+        LOG.info("Not requesting container for processorId: {} since its already running as containerId: {}",
+            processorId, samzaResource.getContainerId());
+        processorToHost.remove(processorId);
+        if (restartContainers) {
+          clusterResourceManager.stopStreamProcessor(samzaResource);

Review comment:
       1. We don't need to request these explicitly as they stop callbacks for the running processors will eventually spin up the container again. Additionally, not removing them will end up requesting resources for those running containers & update some state with double counting.
   2. I initially had logs but then there is log immediately that follows in the `stopStreamProcessor(..)` and there is nothing in between. Hence, decided against it.

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -331,9 +332,24 @@ public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder
   public void stopStreamProcessor(SamzaResource resource) {
     synchronized (lock) {
       Container container = allocatedResources.get(resource);
+      /*
+       * 1. Stop the container through NMClient if the container was instantiated as part of NMClient lifecycle.
+       * 2. Stop the container through AMClient by release the assigned container if the container was from the previous
+       *    attempt and managed by the AM due to AM-HA
+       * 3. Ignore the request if the container associated with the resource isn't present in the bookeeping.
+       */
       if (container != null) {
         log.info("Stopping Container ID: {} on host: {}", resource.getContainerId(), resource.getHost());
         this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
+      } else {
+        YarnContainer yarnContainer = state.runningProcessors.get(resource.getContainerId());
+        if (yarnContainer != null) {
+          log.info("Stopping container from previous attempt with Container ID: {} on host: {}",
+              resource.getContainerId(), resource.getHost());
+          amClient.releaseAssignedContainer(yarnContainer.id());

Review comment:
       It follows the similar workflow as `StopStreamProcessor` which is notified through the AMRM client callback `onContainersCompleted` which will treat it as the container was stopped agnostic to which workflow triggered it and hence you get all the niceties of state update and instantiating new container in its place.

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -331,9 +332,24 @@ public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder
   public void stopStreamProcessor(SamzaResource resource) {
     synchronized (lock) {
       Container container = allocatedResources.get(resource);
+      /*
+       * 1. Stop the container through NMClient if the container was instantiated as part of NMClient lifecycle.
+       * 2. Stop the container through AMClient by release the assigned container if the container was from the previous
+       *    attempt and managed by the AM due to AM-HA
+       * 3. Ignore the request if the container associated with the resource isn't present in the bookeeping.
+       */
       if (container != null) {
         log.info("Stopping Container ID: {} on host: {}", resource.getContainerId(), resource.getHost());
         this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
+      } else {
+        YarnContainer yarnContainer = state.runningProcessors.get(resource.getContainerId());

Review comment:
       Sync'd offline on this. It only uses processorId and the prototype was using a buggy version which populated it with containerId. Fixed it.
   
   Thanks for calling this out 👍 

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -331,9 +332,24 @@ public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder
   public void stopStreamProcessor(SamzaResource resource) {
     synchronized (lock) {
       Container container = allocatedResources.get(resource);
+      /*
+       * 1. Stop the container through NMClient if the container was instantiated as part of NMClient lifecycle.
+       * 2. Stop the container through AMClient by release the assigned container if the container was from the previous
+       *    attempt and managed by the AM due to AM-HA
+       * 3. Ignore the request if the container associated with the resource isn't present in the bookeeping.
+       */
       if (container != null) {
         log.info("Stopping Container ID: {} on host: {}", resource.getContainerId(), resource.getHost());
         this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
+      } else {

Review comment:
       The call site for this method in the AM HA flow is behind the config. Hence dropping that here for brevity and readability. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org