You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/12/03 05:07:31 UTC

[GitHub] [samza] mynameborat opened a new pull request #1450: SAMZA-2610: Handle Metadata changes for AM HA orchestration

mynameborat opened a new pull request #1450:
URL: https://github.com/apache/samza/pull/1450


   **Description**:
   AM performs planning and job model generation for every incarnation. With AM-HA, the new job model or configuration may invalidate the containers from the previous attempt. In order to ensure correctness, we handle this by detecting these changes and restart all the containers in case of any changes to metadata (job model or configuration).
   
   **Note**: We stack this PR with two other PRs #1449 & #1448 that are part of AM-HA feature work and hence the reviewers are requested to review the last commit 036c068.
   
   **Changes**:
   - Detect changes in metadata by reading older metadata from coordinator stream and signal the CPM
   - As part of resource request & orchestration, ignore the containers that are already running from the previous attempt and proceed to release them if metadata changed.
   - Releasing the container will signal RM through AMRM client and RM will orchestrate killing the processing container. It is different from the normal `StopStreamProcessor` flow as the NMClient isn't the source of truth and doesn't have context about the containers spun in the previous attempts
   
   **Tests**: In progress
   **API Changes**: None
   **Upgrade Instructions**: None
   **Usage Instructions**: None


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1450: SAMZA-2610: Handle Metadata changes for AM HA orchestration

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1450:
URL: https://github.com/apache/samza/pull/1450#discussion_r539474378



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -267,9 +267,12 @@ public void run() {
       MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, this.metrics, config);
       metadataResourceUtil.createResources();
 
-      // fan out the startpoints if startpoints is enabled and if the metadata changed across attempts.
-      // the metadata changed should be false and only get evaluated if job coordinator high availability is enabled.
-      if (new JobConfig(config).getStartpointEnabled() && !metadataChangedAcrossAttempts) {
+      /*
+       * We fan out startpoint if and only if
+       *  1. Startpoint is enabled in configuration
+       *  2. If AM HA is enabled, fan out only if metadata changed

Review comment:
       yeah. fanout messages are read as part of the container initialization within OffsetManager. Container's don't read them  after the it has started. So it is guaranteed that the fanout message for a task will not be seen by an existing container (which has the task assigned to).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1450: SAMZA-2610: Handle Metadata changes for AM HA orchestration

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1450:
URL: https://github.com/apache/samza/pull/1450#discussion_r538972316



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -260,8 +267,9 @@ public void run() {
       MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, this.metrics, config);
       metadataResourceUtil.createResources();
 
-      // fan out the startpoints if startpoints is enabled
-      if (new JobConfig(config).getStartpointEnabled()) {
+      // fan out the startpoints if startpoints is enabled and if the metadata changed across attempts.
+      // the metadata changed should be false and only get evaluated if job coordinator high availability is enabled.
+      if (new JobConfig(config).getStartpointEnabled() && !metadataChangedAcrossAttempts) {

Review comment:
       Thanks for pointing this out. I intended to handle it as part of this PR.
   Fixed it and added unit tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat removed a comment on pull request #1450: SAMZA-2610: Handle Metadata changes for AM HA orchestration

Posted by GitBox <gi...@apache.org>.
mynameborat removed a comment on pull request #1450:
URL: https://github.com/apache/samza/pull/1450#issuecomment-737668815


   Will handle the merge conflicts once the other dependent PRs #1448 & #1449 are committed. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1450: SAMZA-2610: Handle Metadata changes for AM HA orchestration

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1450:
URL: https://github.com/apache/samza/pull/1450#discussion_r538929697



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -456,6 +482,18 @@ StartpointManager createStartpointManager() {
 
   @VisibleForTesting
   ContainerProcessManager createContainerProcessManager() {
-    return new ContainerProcessManager(config, state, metrics, containerPlacementMetadataStore, localityManager);
+    return new ContainerProcessManager(config, state, metrics, containerPlacementMetadataStore, localityManager,
+        metadataChangedAcrossAttempts);
+  }
+
+  @VisibleForTesting
+  JobCoordinatorMetadataManager createJobCoordinatorMetadataManager() {
+    return new JobCoordinatorMetadataManager(new NamespaceAwareCoordinatorStreamStore(metadataStore,
+        SetJobCoordinatorMetadataMessage.TYPE), JobCoordinatorMetadataManager.ClusterType.YARN, metrics);
+  }
+
+  @VisibleForTesting
+  boolean isMetadataChangedAcrossAttempts() {

Review comment:
       the getter is only for tests.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -248,16 +253,23 @@ public void start() {
     // Request initial set of containers
     LocalityModel localityModel = localityManager.readLocality();
     Map<String, String> processorToHost = new HashMap<>();
-    state.jobModelManager.jobModel().getContainers().keySet().forEach((containerId) -> {
-      String host = Optional.ofNullable(localityModel.getProcessorLocality(containerId))
+    state.jobModelManager.jobModel().getContainers().keySet().forEach((processorId) -> {
+      String host = Optional.ofNullable(localityModel.getProcessorLocality(processorId))
           .map(ProcessorLocality::host)
           .filter(StringUtils::isNotBlank)
           .orElse(null);
-      processorToHost.put(containerId, host);
+      processorToHost.put(processorId, host);
     });
     if (jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
       // don't request resource for container that is already running
-      state.runningProcessors.keySet().forEach(processorToHost::remove);
+      state.runningProcessors.forEach((processorId, samzaResource) -> {
+        LOG.info("Not requesting container for processorId: {} since its already running as containerId: {}",
+            processorId, samzaResource.getContainerId());
+        processorToHost.remove(processorId);
+        if (restartContainers) {
+          clusterResourceManager.stopStreamProcessor(samzaResource);

Review comment:
       1. We don't need to request these explicitly as they stop callbacks for the running processors will eventually spin up the container again. Additionally, not removing them will end up requesting resources for those running containers & update some state with double counting.
   2. I initially had logs but then there is log immediately that follows in the `stopStreamProcessor(..)` and there is nothing in between. Hence, decided against it.

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -331,9 +332,24 @@ public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder
   public void stopStreamProcessor(SamzaResource resource) {
     synchronized (lock) {
       Container container = allocatedResources.get(resource);
+      /*
+       * 1. Stop the container through NMClient if the container was instantiated as part of NMClient lifecycle.
+       * 2. Stop the container through AMClient by release the assigned container if the container was from the previous
+       *    attempt and managed by the AM due to AM-HA
+       * 3. Ignore the request if the container associated with the resource isn't present in the bookeeping.
+       */
       if (container != null) {
         log.info("Stopping Container ID: {} on host: {}", resource.getContainerId(), resource.getHost());
         this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
+      } else {
+        YarnContainer yarnContainer = state.runningProcessors.get(resource.getContainerId());
+        if (yarnContainer != null) {
+          log.info("Stopping container from previous attempt with Container ID: {} on host: {}",
+              resource.getContainerId(), resource.getHost());
+          amClient.releaseAssignedContainer(yarnContainer.id());

Review comment:
       It follows the similar workflow as `StopStreamProcessor` which is notified through the AMRM client callback `onContainersCompleted` which will treat it as the container was stopped agnostic to which workflow triggered it and hence you get all the niceties of state update and instantiating new container in its place.

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -331,9 +332,24 @@ public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder
   public void stopStreamProcessor(SamzaResource resource) {
     synchronized (lock) {
       Container container = allocatedResources.get(resource);
+      /*
+       * 1. Stop the container through NMClient if the container was instantiated as part of NMClient lifecycle.
+       * 2. Stop the container through AMClient by release the assigned container if the container was from the previous
+       *    attempt and managed by the AM due to AM-HA
+       * 3. Ignore the request if the container associated with the resource isn't present in the bookeeping.
+       */
       if (container != null) {
         log.info("Stopping Container ID: {} on host: {}", resource.getContainerId(), resource.getHost());
         this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
+      } else {
+        YarnContainer yarnContainer = state.runningProcessors.get(resource.getContainerId());

Review comment:
       Sync'd offline on this. It only uses processorId and the prototype was using a buggy version which populated it with containerId. Fixed it.
   
   Thanks for calling this out 👍 

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -331,9 +332,24 @@ public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder
   public void stopStreamProcessor(SamzaResource resource) {
     synchronized (lock) {
       Container container = allocatedResources.get(resource);
+      /*
+       * 1. Stop the container through NMClient if the container was instantiated as part of NMClient lifecycle.
+       * 2. Stop the container through AMClient by release the assigned container if the container was from the previous
+       *    attempt and managed by the AM due to AM-HA
+       * 3. Ignore the request if the container associated with the resource isn't present in the bookeeping.
+       */
       if (container != null) {
         log.info("Stopping Container ID: {} on host: {}", resource.getContainerId(), resource.getHost());
         this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
+      } else {

Review comment:
       The call site for this method in the AM HA flow is behind the config. Hence dropping that here for brevity and readability. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1450: SAMZA-2610: Handle Metadata changes for AM HA orchestration

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1450:
URL: https://github.com/apache/samza/pull/1450#discussion_r538891696



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -248,16 +253,23 @@ public void start() {
     // Request initial set of containers
     LocalityModel localityModel = localityManager.readLocality();
     Map<String, String> processorToHost = new HashMap<>();
-    state.jobModelManager.jobModel().getContainers().keySet().forEach((containerId) -> {
-      String host = Optional.ofNullable(localityModel.getProcessorLocality(containerId))
+    state.jobModelManager.jobModel().getContainers().keySet().forEach((processorId) -> {
+      String host = Optional.ofNullable(localityModel.getProcessorLocality(processorId))
           .map(ProcessorLocality::host)
           .filter(StringUtils::isNotBlank)
           .orElse(null);
-      processorToHost.put(containerId, host);
+      processorToHost.put(processorId, host);
     });
     if (jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
       // don't request resource for container that is already running
-      state.runningProcessors.keySet().forEach(processorToHost::remove);
+      state.runningProcessors.forEach((processorId, samzaResource) -> {
+        LOG.info("Not requesting container for processorId: {} since its already running as containerId: {}",
+            processorId, samzaResource.getContainerId());
+        processorToHost.remove(processorId);
+        if (restartContainers) {
+          clusterResourceManager.stopStreamProcessor(samzaResource);

Review comment:
       1. wait, if we have to restart all containers then why are we asking allocator for resources for only some of the processors.. is it to avoid a scenario where we spin up a processor with Id 0 though there is a processor with same id from previous attempt leading to orphan container issues?
   
   2. would benefit from a log here too i feel.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -456,6 +482,18 @@ StartpointManager createStartpointManager() {
 
   @VisibleForTesting
   ContainerProcessManager createContainerProcessManager() {
-    return new ContainerProcessManager(config, state, metrics, containerPlacementMetadataStore, localityManager);
+    return new ContainerProcessManager(config, state, metrics, containerPlacementMetadataStore, localityManager,
+        metadataChangedAcrossAttempts);
+  }
+
+  @VisibleForTesting
+  JobCoordinatorMetadataManager createJobCoordinatorMetadataManager() {
+    return new JobCoordinatorMetadataManager(new NamespaceAwareCoordinatorStreamStore(metadataStore,
+        SetJobCoordinatorMetadataMessage.TYPE), JobCoordinatorMetadataManager.ClusterType.YARN, metrics);
+  }
+
+  @VisibleForTesting
+  boolean isMetadataChangedAcrossAttempts() {

Review comment:
       is this only for tests?

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -331,9 +332,24 @@ public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder
   public void stopStreamProcessor(SamzaResource resource) {
     synchronized (lock) {
       Container container = allocatedResources.get(resource);
+      /*
+       * 1. Stop the container through NMClient if the container was instantiated as part of NMClient lifecycle.
+       * 2. Stop the container through AMClient by release the assigned container if the container was from the previous
+       *    attempt and managed by the AM due to AM-HA
+       * 3. Ignore the request if the container associated with the resource isn't present in the bookeeping.
+       */
       if (container != null) {
         log.info("Stopping Container ID: {} on host: {}", resource.getContainerId(), resource.getHost());
         this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
+      } else {
+        YarnContainer yarnContainer = state.runningProcessors.get(resource.getContainerId());

Review comment:
       major: hmm.. some confusion here.. resource.getContainerId() gets the yarn container id right (of the form `container_1350670447861_0003_01_000001`).  see one ex [here ](https://github.com/apache/samza/blob/master/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java#L488)
   
   But if we see code where the runningProcessors gets populated -- it has key as samza processorId (of the form 0) -- see [here ](https://github.com/apache/samza/blob/master/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java#L738)
   
   but here you are using container id to fetch from runningProcessors.

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -331,9 +332,24 @@ public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder
   public void stopStreamProcessor(SamzaResource resource) {
     synchronized (lock) {
       Container container = allocatedResources.get(resource);
+      /*
+       * 1. Stop the container through NMClient if the container was instantiated as part of NMClient lifecycle.
+       * 2. Stop the container through AMClient by release the assigned container if the container was from the previous
+       *    attempt and managed by the AM due to AM-HA
+       * 3. Ignore the request if the container associated with the resource isn't present in the bookeeping.
+       */
       if (container != null) {
         log.info("Stopping Container ID: {} on host: {}", resource.getContainerId(), resource.getHost());
         this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
+      } else {

Review comment:
       should we put this block behind the AM-HA config?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1450: SAMZA-2610: Handle Metadata changes for AM HA orchestration

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1450:
URL: https://github.com/apache/samza/pull/1450#discussion_r538943207



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -260,8 +267,9 @@ public void run() {
       MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, this.metrics, config);
       metadataResourceUtil.createResources();
 
-      // fan out the startpoints if startpoints is enabled
-      if (new JobConfig(config).getStartpointEnabled()) {
+      // fan out the startpoints if startpoints is enabled and if the metadata changed across attempts.
+      // the metadata changed should be false and only get evaluated if job coordinator high availability is enabled.
+      if (new JobConfig(config).getStartpointEnabled() && !metadataChangedAcrossAttempts) {

Review comment:
       yeah. We will not fanout startpoint since there may be containers that are already running in which case the newly instantiated containers will act on startpoint but the old ones wont. As a result, we skip fanout and applications will force job restart through new deployment like how they do it today.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1450: SAMZA-2610: Handle Metadata changes for AM HA orchestration

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1450:
URL: https://github.com/apache/samza/pull/1450#discussion_r539442040



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -267,9 +267,12 @@ public void run() {
       MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, this.metrics, config);
       metadataResourceUtil.createResources();
 
-      // fan out the startpoints if startpoints is enabled and if the metadata changed across attempts.
-      // the metadata changed should be false and only get evaluated if job coordinator high availability is enabled.
-      if (new JobConfig(config).getStartpointEnabled() && !metadataChangedAcrossAttempts) {
+      /*
+       * We fan out startpoint if and only if
+       *  1. Startpoint is enabled in configuration
+       *  2. If AM HA is enabled, fan out only if metadata changed

Review comment:
       so if metadata has changed then JC will stop all running processors right. additionally if there are startpoints to fanout then they are assigned to respective tasks. I am inferring that since the fanouts for task are fetched before processor entering runloop, the prev-running-now-restarted processors will pick them up. Am i correct? just trying to ensure we dont lose the fanouts for tasks that are part of already running processors.
   
   minor: if AM HA is enabled: fanout ONLY if  `startpoint enabled && metadata changed`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1450: SAMZA-2610: Handle Metadata changes for AM HA orchestration

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1450:
URL: https://github.com/apache/samza/pull/1450#discussion_r538940474



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -260,8 +267,9 @@ public void run() {
       MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, this.metrics, config);
       metadataResourceUtil.createResources();
 
-      // fan out the startpoints if startpoints is enabled
-      if (new JobConfig(config).getStartpointEnabled()) {
+      // fan out the startpoints if startpoints is enabled and if the metadata changed across attempts.
+      // the metadata changed should be false and only get evaluated if job coordinator high availability is enabled.
+      if (new JobConfig(config).getStartpointEnabled() && !metadataChangedAcrossAttempts) {

Review comment:
       what if startpoint is enabled and metadata has changed? we just ignore? as in for AM-HA with metadata changes we dont create startpoint manager? if we are planning another PR for this then lets leave a comment saying tbd.
   
   sorry missed this in the first pass.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -248,16 +253,23 @@ public void start() {
     // Request initial set of containers
     LocalityModel localityModel = localityManager.readLocality();
     Map<String, String> processorToHost = new HashMap<>();
-    state.jobModelManager.jobModel().getContainers().keySet().forEach((containerId) -> {
-      String host = Optional.ofNullable(localityModel.getProcessorLocality(containerId))
+    state.jobModelManager.jobModel().getContainers().keySet().forEach((processorId) -> {
+      String host = Optional.ofNullable(localityModel.getProcessorLocality(processorId))
           .map(ProcessorLocality::host)
           .filter(StringUtils::isNotBlank)
           .orElse(null);
-      processorToHost.put(containerId, host);
+      processorToHost.put(processorId, host);
     });
     if (jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
       // don't request resource for container that is already running
-      state.runningProcessors.keySet().forEach(processorToHost::remove);
+      state.runningProcessors.forEach((processorId, samzaResource) -> {
+        LOG.info("Not requesting container for processorId: {} since its already running as containerId: {}",
+            processorId, samzaResource.getContainerId());
+        processorToHost.remove(processorId);
+        if (restartContainers) {
+          clusterResourceManager.stopStreamProcessor(samzaResource);

Review comment:
       thanks for clarifying

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -331,9 +332,24 @@ public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder
   public void stopStreamProcessor(SamzaResource resource) {
     synchronized (lock) {
       Container container = allocatedResources.get(resource);
+      /*
+       * 1. Stop the container through NMClient if the container was instantiated as part of NMClient lifecycle.
+       * 2. Stop the container through AMClient by release the assigned container if the container was from the previous
+       *    attempt and managed by the AM due to AM-HA
+       * 3. Ignore the request if the container associated with the resource isn't present in the bookeeping.
+       */
       if (container != null) {
         log.info("Stopping Container ID: {} on host: {}", resource.getContainerId(), resource.getHost());
         this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
+      } else {
+        YarnContainer yarnContainer = state.runningProcessors.get(resource.getContainerId());
+        if (yarnContainer != null) {
+          log.info("Stopping container from previous attempt with Container ID: {} on host: {}",
+              resource.getContainerId(), resource.getHost());
+          amClient.releaseAssignedContainer(yarnContainer.id());

Review comment:
       awesome! thanks for telling me. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1450: SAMZA-2610: Handle Metadata changes for AM HA orchestration

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1450:
URL: https://github.com/apache/samza/pull/1450#discussion_r538903941



##########
File path: samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
##########
@@ -192,4 +196,38 @@ public void testToArgs() {
     assertEquals(expected.size(), actual.size());
     assertTrue(actual.containsAll(expected));
   }
+
+  @Test
+  public void testGenerateAndUpdateJobCoordinatorMetadata() {
+    Config jobConfig = new MapConfig(configMap);
+    when(CoordinatorStreamUtil.readConfigFromCoordinatorStream(anyObject())).thenReturn(jobConfig);
+    ClusterBasedJobCoordinator clusterBasedJobCoordinator =
+        spy(ClusterBasedJobCoordinatorRunner.createFromMetadataStore(jobConfig));
+
+    JobCoordinatorMetadata previousMetadata = mock(JobCoordinatorMetadata.class);
+    JobCoordinatorMetadata newMetadata = mock(JobCoordinatorMetadata.class);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = mock(JobCoordinatorMetadataManager.class);
+    JobModel mockJobModel = mock(JobModel.class);
+
+    when(jobCoordinatorMetadataManager.readJobCoordinatorMetadata()).thenReturn(previousMetadata);
+    when(jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(any(), any())).thenReturn(newMetadata);
+    when(jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, previousMetadata)).thenReturn(false);
+    when(clusterBasedJobCoordinator.createJobCoordinatorMetadataManager()).thenReturn(jobCoordinatorMetadataManager);
+
+    /*
+     * Verify if there are no changes to metadata, the metadata changed flag remains false and no interactions
+     * with job coordinator metadata manager
+     */
+    clusterBasedJobCoordinator.generateAndUpdateJobCoordinatorMetadata(mockJobModel);
+    assertFalse("JC metadata should remain unchanged", clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts());
+    verify(jobCoordinatorMetadataManager, times(0)).writeJobCoordinatorMetadata(any());
+
+    /*
+     * Verify if there are changes to metadata, we persist the new metadata & update the metadata changed flag
+     */
+    when(jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, previousMetadata)).thenReturn(true);
+    clusterBasedJobCoordinator.generateAndUpdateJobCoordinatorMetadata(mockJobModel);
+    assertTrue("JC metadata should be true", clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts());

Review comment:
       nit: add changed "JC metadata changed should be true"

##########
File path: samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
##########
@@ -272,10 +273,60 @@ public void run() {
 
     // Verify only 1 was requested with allocator
     assertEquals(1, allocator.requestedContainers);
+    assertTrue("Ensure no processors were forcefully restarted", callback.resourceStatuses.isEmpty());
 
     cpm.stop();
   }
 
+  @Test
+  public void testOnInitToForceRestartAMHighAvailability() throws Exception {
+    Map<String, String> configMap = new HashMap<>(configVals);
+    configMap.put(JobConfig.YARN_AM_HIGH_AVAILABILITY_ENABLED, "true");
+    Config conf = new MapConfig(configMap);
+
+    SamzaApplicationState state = new SamzaApplicationState(getJobModelManager(2));
+    state.runningProcessors.put("0", new SamzaResource(1, 1024, "host", "0"));
+
+    MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
+    ClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));
+    ContainerManager containerManager =
+        buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
+            clusterManagerConfig.getHostAffinityEnabled(), false);
+
+    ContainerProcessManager cpm =
+        buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty(), true);
+
+    MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
+        clusterResourceManager,
+        conf,
+        state,
+        containerManager);
+
+    getPrivateFieldFromCpm("containerAllocator", cpm).set(cpm, allocator);
+    CountDownLatch latch = new CountDownLatch(1);
+    getPrivateFieldFromCpm("allocatorThread", cpm).set(cpm, new Thread() {
+      public void run() {
+        isRunning = true;
+        latch.countDown();
+      }
+    });
+
+    cpm.start();

Review comment:
       should we also check if the ClusterResourceManager.stopStreamProcessor was invoked and with the correct SamzaResource?

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -331,9 +332,24 @@ public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder
   public void stopStreamProcessor(SamzaResource resource) {
     synchronized (lock) {
       Container container = allocatedResources.get(resource);
+      /*
+       * 1. Stop the container through NMClient if the container was instantiated as part of NMClient lifecycle.
+       * 2. Stop the container through AMClient by release the assigned container if the container was from the previous
+       *    attempt and managed by the AM due to AM-HA
+       * 3. Ignore the request if the container associated with the resource isn't present in the bookeeping.
+       */
       if (container != null) {
         log.info("Stopping Container ID: {} on host: {}", resource.getContainerId(), resource.getHost());
         this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
+      } else {
+        YarnContainer yarnContainer = state.runningProcessors.get(resource.getContainerId());
+        if (yarnContainer != null) {
+          log.info("Stopping container from previous attempt with Container ID: {} on host: {}",
+              resource.getContainerId(), resource.getHost());
+          amClient.releaseAssignedContainer(yarnContainer.id());

Review comment:
       after amClient.releaseAssignedContainer(yarnContainer.id()); is done, what do we get the call back as?
   because we should use that call back to restart the container right (as earlier we allocated resources only for processors not runnning in prev attempt).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat merged pull request #1450: SAMZA-2610: Handle Metadata changes for AM HA orchestration

Posted by GitBox <gi...@apache.org>.
mynameborat merged pull request #1450:
URL: https://github.com/apache/samza/pull/1450


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on pull request #1450: SAMZA-2610: Handle Metadata changes for AM HA orchestration

Posted by GitBox <gi...@apache.org>.
mynameborat commented on pull request #1450:
URL: https://github.com/apache/samza/pull/1450#issuecomment-737668815


   Will handle the merge conflicts once the other dependent PRs #1448 & #1449 are committed. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org