You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/11/25 03:37:58 UTC

[GitHub] [samza] lakshmi-manasa-g opened a new pull request #1448: SAMZA-2606: container orchestration for AM HA

lakshmi-manasa-g opened a new pull request #1448:
URL: https://github.com/apache/samza/pull/1448


   Feature: Main feature is YARN AM high availability. The feature ensures that the new AM can establish connection with already running containers to avoid restarting all running containers when AM dies. This PR enables the new AM to accept the list fo already running container provided by the resource manager and launch only those containers that are part of the job model but not in the running container list.
   
   Changes:
   1. ClientHelper: job submit to RM indicates to keep containers alive across attempts
   2. SamzaYarnAppMasterLifecycle: new AM uses the yarnid-samza id mapping and accepts the list of running containers given by RM and builds its internal state (SamzaApplicationState and YarnAppState) correctly
   3. ContainerProcessManager: removes running containers from the needed processor list prior to placing resource requests
   
   Tests: added unit tests and deployed hello-samza
   
   API changes: None
   
   Usage instructions: to enable AM HA set config "yarn.am.high-availability.enabled" to "true", default value is "false".
   
   Upgrade instructions: None


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1448: SAMZA-2606: container orchestration for AM HA

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1448:
URL: https://github.com/apache/samza/pull/1448#discussion_r532152731



##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
##########
@@ -48,6 +52,19 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
     val maxCapability = response.getMaximumResourceCapability
     val maxMem = maxCapability.getMemory
     val maxCpu = maxCapability.getVirtualCores
+    if (isApplicationMasterHighAvailabilityEnabled) {
+      val yarnIdToprocIdMap = new HashMap[String, String]()
+      samzaAppState.processorToExecutionId.asScala foreach { entry => yarnIdToprocIdMap.put(entry._2, entry._1) }
+      response.getContainersFromPreviousAttempts.asScala foreach { (ctr: Container) =>
+        val samzaProcId = yarnIdToprocIdMap.get(ctr.getId.toString)
+        if (samzaProcId != null) {
+          info("Received container from previous attempt with samza processor id %s and yarn container id %s" format(samzaProcId, ctr.getId.toString))
+          samzaAppState.runningProcessors.put(samzaProcId,
+            new SamzaResource(ctr.getResource.getVirtualCores, ctr.getResource.getMemory, ctr.getNodeId.getHost, ctr.getId.toString))
+          state.runningProcessors.put(samzaProcId, new YarnContainer(ctr))

Review comment:
       We need to handle the start & uptime within the `YarnContainer`. It looks like we are using it in the AM web servlet and that will be broken going forward with this change for containers from previous attempts.
   
   We should either document it or handle it depending on the complexity it adds to this PR.

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -192,7 +193,8 @@ public YarnClusterResourceManager(Config config, JobModelManager jobModelManager
         clusterManagerConfig.getNumCores(),
         samzaAppState,
         state,
-        amClient
+        amClient,
+        new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()

Review comment:
       After some digging through, realized we will need to handle populating allocated resources.
   `StopStreamProcessor` method will error out in the absence of that information.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -413,6 +428,9 @@ public void onStreamProcessorLaunchSuccess(SamzaResource resource) {
           processorId, containerId, containerHost);
       state.pendingProcessors.remove(processorId);
       state.runningProcessors.put(processorId, resource);
+      if (jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
+        state.processorToExecutionId.put(processorId, containerId);
+      }
 
       if (state.neededProcessors.decrementAndGet() == 0) {
         state.jobHealthy.set(true);

Review comment:
       The current flow doesn't set the job as healthy in the event that all the containers are from the previous attempt and there is no necessity to spin up a new one.
   
   

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
##########
@@ -105,6 +105,22 @@
    */
   public final ConcurrentMap<String, SamzaResource> runningProcessors = new ConcurrentHashMap<>(0);
 
+  /**
+   * Map of Samza processor Id (aka logical id) to execution environment container id (aka physical id ex: yarn container id).
+   * This map will be used during the start up phase of new AM in AM-HA.
+   *
+   * This map is populated at startup of ClusterBasedJobCoordinator.
+   * It initially holds the processId to execution id mapping (if any) present in the coordinator stream.
+   * This could correspond to containers currently running or from previous attempt or previous deploy.
+   *
+   * If # of containers in map is same as current JobModel's containers, and mapping is from previous deploy,
+   * then they will get overwritten by new container incarnations in the current deploy in {@link ContainerProcessManager}.onStreamProcessorLaunchSuccess.
+   * If # of containers in map is lesser, then map entries will get overwritten by current containers and new ones will be added to map.
+   * If # of containers in map is greater, this map is wiped clear in {@link ContainerProcessManager}.start
+   * to avoid having mapping for containers which are not part of the current JobModel.
+   */
+  public final ConcurrentMap<String, String> processorToExecutionId = new ConcurrentHashMap<>(0);

Review comment:
       Since we don't need this information beyond the construction phase, I am going to suggest to not update the information in other places.
   
   Ideally, have it wired during construction although if that doesn't seem straight forward, we can follow it up after the initial PR.

##########
File path: samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetExecutionContainerIdMapping;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.serializers.Serde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used for persisting and reading the execution environment container id information into the metadata store.
+ * Processor id (logical Samza container id) to execution environment container id (ex: yarn container id) is written.
+ **/
+public class ExecutionContainerIdManager {
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionContainerIdManager.class);
+
+  private final Serde<String> valueSerde;
+  private final MetadataStore metadataStore;
+
+  /**
+   * Builds the ExecutionContainerIdManager based upon the provided {@link MetadataStore} that is instantiated.
+   * Uses the {@link CoordinatorStreamValueSerde} to serialize messages before reading/writing into metadata store.
+   * @param metadataStore an instance of {@link MetadataStore} to read/write the container locality.
+   */
+  public ExecutionContainerIdManager(MetadataStore metadataStore) {
+    this.metadataStore = metadataStore;
+    this.valueSerde = new CoordinatorStreamValueSerde(SetExecutionContainerIdMapping.TYPE);
+  }
+
+  public void writeExecutionEnvironmentContainerIdMapping(String processorId, String executionEnvContainerId) {
+    Preconditions.checkNotNull(processorId, "Container's logical processor id can not be null.");
+    Preconditions.checkNotNull(executionEnvContainerId, "Container's physical execution environment container id can not be null.");
+    LOG.info("Container {} has executionEnvContainerId as {}", processorId, executionEnvContainerId);
+    metadataStore.put(processorId, valueSerde.toBytes(executionEnvContainerId));
+    metadataStore.flush();
+  }
+
+  public Map<String, String> readExecutionEnvironmentContainerIdMapping() {
+    Map<String, String> executionEnvironmentContainerIdMapping = new HashMap<>();
+    metadataStore.all().forEach((processorId, valueBytes) -> {
+      if (valueBytes != null) {
+        String executionEnvContainerId = valueSerde.fromBytes(valueBytes);
+        executionEnvironmentContainerIdMapping.put(processorId, executionEnvContainerId);
+      }
+    });
+    if (LOG.isDebugEnabled()) {
+      for (Map.Entry<String, String> entry : executionEnvironmentContainerIdMapping.entrySet()) {
+        LOG.debug("Execution evironment container id for container {}: {}", entry.getKey(), entry.getValue());

Review comment:
       same as above. Be consistent with the log message and replace container with `processor` and use `executionEnvContainerId` instead.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionContainerIdMapping.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * SetContainerIdExecutionContainerIdMapping is used internally by the Samza framework to
+ * persist the container processorId-to-executionEnvContainerId mappings.
+ *
+ * Structure of the message looks like:
+ * {
+ *     Key: $ContainerId

Review comment:
       s/ContainerId/ProcessorId

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionContainerIdMapping.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * SetContainerIdExecutionContainerIdMapping is used internally by the Samza framework to
+ * persist the container processorId-to-executionEnvContainerId mappings.
+ *
+ * Structure of the message looks like:
+ * {
+ *     Key: $ContainerId
+ *     Type: set-container-id-execution-id-assignment
+ *     Source: "SamzaContainer-$ContainerId"
+ *     MessageMap:
+ *     {
+ *         execution-env-container-id: execution environment container id
+ *     }
+ * }
+ * */
+public class SetExecutionContainerIdMapping extends CoordinatorStreamMessage {
+  public static final String TYPE = "set-container-id-execution-id-assignment";

Review comment:
       `set-execution-container-id-mapping` to be consistent with the class name similar to others.

##########
File path: samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
##########
@@ -226,6 +227,56 @@ public void run() {
     cpm.stop();
   }
 
+  @Test
+  public void testOnInitAMHighAvailability() throws Exception {
+    configVals.put(JobConfig.YARN_AM_HIGH_AVAILABILITY_ENABLED, "true");

Review comment:
       Seems like you are modifying a static map here and the remnants of the test is going to impact other tests in the class.
   
   Can we make a copy instead and add AM-HA configs?

##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
##########
@@ -48,6 +52,19 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
     val maxCapability = response.getMaximumResourceCapability
     val maxMem = maxCapability.getMemory
     val maxCpu = maxCapability.getVirtualCores
+    if (isApplicationMasterHighAvailabilityEnabled) {
+      val yarnIdToprocIdMap = new HashMap[String, String]()
+      samzaAppState.processorToExecutionId.asScala foreach { entry => yarnIdToprocIdMap.put(entry._2, entry._1) }

Review comment:
       Seems like a `BiMap` for `processorToExecutionId` is what is needed and should eliminate this piece of code here.
   

##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
##########
@@ -48,6 +52,19 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
     val maxCapability = response.getMaximumResourceCapability
     val maxMem = maxCapability.getMemory
     val maxCpu = maxCapability.getVirtualCores
+    if (isApplicationMasterHighAvailabilityEnabled) {
+      val yarnIdToprocIdMap = new HashMap[String, String]()
+      samzaAppState.processorToExecutionId.asScala foreach { entry => yarnIdToprocIdMap.put(entry._2, entry._1) }
+      response.getContainersFromPreviousAttempts.asScala foreach { (ctr: Container) =>
+        val samzaProcId = yarnIdToprocIdMap.get(ctr.getId.toString)
+        if (samzaProcId != null) {

Review comment:
       What happens if this is null? Shouldn't non-null be an invariant here?

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -192,7 +193,8 @@ public YarnClusterResourceManager(Config config, JobModelManager jobModelManager
         clusterManagerConfig.getNumCores(),
         samzaAppState,
         state,
-        amClient
+        amClient,
+        new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()

Review comment:
       Ideally, you want to follow through the `OnContainersAllocated` code path to ensure that all the state is intact when updating AM with the previous set of running containers.
   
   We also need to ensure both `YarnAppState` and `SamzaApplicationState` are up-to-date on top of the other internal state held by `YarnResourceClusterManager` and `ContainerAllocator` barring the state that is interim during the container assignment & starting.

##########
File path: samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetExecutionContainerIdMapping;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.serializers.Serde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used for persisting and reading the execution environment container id information into the metadata store.
+ * Processor id (logical Samza container id) to execution environment container id (ex: yarn container id) is written.
+ **/
+public class ExecutionContainerIdManager {
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionContainerIdManager.class);
+
+  private final Serde<String> valueSerde;
+  private final MetadataStore metadataStore;
+
+  /**
+   * Builds the ExecutionContainerIdManager based upon the provided {@link MetadataStore} that is instantiated.
+   * Uses the {@link CoordinatorStreamValueSerde} to serialize messages before reading/writing into metadata store.
+   * @param metadataStore an instance of {@link MetadataStore} to read/write the container locality.
+   */
+  public ExecutionContainerIdManager(MetadataStore metadataStore) {
+    this.metadataStore = metadataStore;
+    this.valueSerde = new CoordinatorStreamValueSerde(SetExecutionContainerIdMapping.TYPE);
+  }
+
+  public void writeExecutionEnvironmentContainerIdMapping(String processorId, String executionEnvContainerId) {
+    Preconditions.checkNotNull(processorId, "Container's logical processor id can not be null.");
+    Preconditions.checkNotNull(executionEnvContainerId, "Container's physical execution environment container id can not be null.");
+    LOG.info("Container {} has executionEnvContainerId as {}", processorId, executionEnvContainerId);

Review comment:
       nit: `Processor: {} has executionEnvContainerId: {}` to be consistent in other places. I remember Prateek made a whole clean up PR for this :)

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionContainerIdMapping.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * SetContainerIdExecutionContainerIdMapping is used internally by the Samza framework to
+ * persist the container processorId-to-executionEnvContainerId mappings.

Review comment:
       s/persist the container/persist

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionContainerIdMapping.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * SetContainerIdExecutionContainerIdMapping is used internally by the Samza framework to
+ * persist the container processorId-to-executionEnvContainerId mappings.
+ *
+ * Structure of the message looks like:
+ * {
+ *     Key: $ContainerId
+ *     Type: set-container-id-execution-id-assignment
+ *     Source: "SamzaContainer-$ContainerId"
+ *     MessageMap:
+ *     {
+ *         execution-env-container-id: execution environment container id
+ *     }
+ * }
+ * */
+public class SetExecutionContainerIdMapping extends CoordinatorStreamMessage {

Review comment:
       `SetExecutionEnvContainerIdMapping` instead of `SetExecutionContainerIdMapping` to be consistent in usage.

##########
File path: samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
##########
@@ -147,6 +149,14 @@ private static void run(
         heartbeatMonitor.start();
       }
 
+      if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
+        ExecutionContainerIdManager executionContainerIdManager = new ExecutionContainerIdManager(
+            new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetExecutionContainerIdMapping.TYPE));
+        if (executionContainerIdManager != null && execEnvContainerId.isPresent()) {

Review comment:
       we just create the manager above. why is this null check? should be `execEnvContainerId` instead. 
   
   I still think you don't need this check. You can use `Optional::ifPresent` and write it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1448: SAMZA-2606: container orchestration for AM HA

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1448:
URL: https://github.com/apache/samza/pull/1448#discussion_r536479945



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -236,11 +236,20 @@ public void start() {
       diagnosticsManager.get().start();
     }
 
+    if (jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
+      LOG.info(
+          "Set neededProcessors prior to starting clusterResourceManager because it gets running containres from prev attempts in AM HA.");
+      state.processorCount.set(state.jobModelManager.jobModel().getContainers().size());
+      state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size());
+    }
+
     LOG.info("Starting the cluster resource manager");
     clusterResourceManager.start();
 
-    state.processorCount.set(state.jobModelManager.jobModel().getContainers().size());
-    state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size());
+    if (!jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
+      state.processorCount.set(state.jobModelManager.jobModel().getContainers().size());
+      state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size());
+    }

Review comment:
       If i am reading this right, we are just doing the same thing regardless of enabled vs not except logging it additionally.
   
   1. Why can't we move the initializations before `clusterResourceManager.start()` and consolidate this?

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -403,29 +412,23 @@ public void onResourcesCompleted(List<SamzaResourceStatus> resourceStatuses) {
 
   @Override
   public void onStreamProcessorLaunchSuccess(SamzaResource resource) {
-    String containerId = resource.getContainerId();
-    String containerHost = resource.getHost();
-
-    // 1. Obtain the processor ID for the pending container on this resource.
-    String processorId = getPendingProcessorId(containerId);
-    LOG.info("Successfully started Processor ID: {} on Container ID: {} on host: {}",
-        processorId, containerId, containerHost);
-
-    // 2. Remove the container from the pending buffer and add it to the running buffer. Additionally, update the
-    // job-health metric.
-    if (processorId != null) {
-      LOG.info("Moving Processor ID: {} on Container ID: {} on host: {} from pending to running state.",
-          processorId, containerId, containerHost);
-      state.pendingProcessors.remove(processorId);
-      state.runningProcessors.put(processorId, resource);
-      if (state.neededProcessors.decrementAndGet() == 0) {
-        state.jobHealthy.set(true);
-      }
-      containerManager.handleContainerLaunchSuccess(processorId, containerHost);
-    } else {
-      LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. " +
-          "Ignoring invalid/redundant notification.", containerId, containerHost);
+    // Scenario 1: processor belongs to current attempt of the job.
+    // This means, the current AM had placed a request for the processor
+    // and hence containerId should be found in the pendingProcessor map
+    if (state.pendingProcessors.containsValue(resource)) {
+      handleNewProcessorLaunchSuccess(resource);
+      return;
+    }
+    // Scenario 2: Due to AM HA, processor could belong to the previous attempt of the job.
+    // This means, the current AM did not place a request for the processor as it was already running.
+    // Hence it will be in the runningProcessors map and not in the pendingProcessor Map
+    if (jobConfig.getApplicationMasterHighAvailabilityEnabled() && state.runningProcessors.containsValue(resource)) {
+      handleRunningProcessorLaunchSuccess(resource);
+      return;
     }
+
+    LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. "
+        + "Ignoring invalid/redundant notification.", resource.getContainerId(), resource.getHost());

Review comment:
       would suggest do 
   ```
   if ()
   else if()
   else 
   ```
   instead as it eliminates the need to use multiple control statements (return) across the function to make it more readable. It becomes tedious especially if this function evolves and starts having updates to local state.

##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
##########
@@ -44,25 +45,26 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
   var validResourceRequest = true
   var shutdownMessage: String = null
   var webApp: SamzaYarnAppMasterService = null
-  def onInit() {
+  def onInit(): util.Map[ContainerId, YarnContainer] = {
     val host = state.nodeHost
     val response = amClient.registerApplicationMaster(host, state.rpcUrl.getPort, "%s:%d" format (host, state.trackingUrl.getPort))
 
     // validate that the YARN cluster can handle our container resource requirements
     val maxCapability = response.getMaximumResourceCapability
     val maxMem = maxCapability.getMemory
     val maxCpu = maxCapability.getVirtualCores
+    val previousAttemptContainers = new HashMap[ContainerId, YarnContainer]()
     if (isApplicationMasterHighAvailabilityEnabled) {
       val yarnIdToprocIdMap = new HashMap[String, String]()
       samzaAppState.processorToExecutionId.asScala foreach { entry => yarnIdToprocIdMap.put(entry._2, entry._1) }
       response.getContainersFromPreviousAttempts.asScala foreach { (ctr: Container) =>
         val samzaProcId = yarnIdToprocIdMap.get(ctr.getId.toString)
-        if (samzaProcId != null) {
-          info("Received container from previous attempt with samza processor id %s and yarn container id %s" format(samzaProcId, ctr.getId.toString))
-          samzaAppState.runningProcessors.put(samzaProcId,
-            new SamzaResource(ctr.getResource.getVirtualCores, ctr.getResource.getMemory, ctr.getNodeId.getHost, ctr.getId.toString))
-          state.runningProcessors.put(samzaProcId, new YarnContainer(ctr))
-        }
+        info("Received container from previous attempt with samza processor id %s and yarn container id %s" format(samzaProcId, ctr.getId.toString))
+        samzaAppState.runningProcessors.put(samzaProcId,
+          new SamzaResource(ctr.getResource.getVirtualCores, ctr.getResource.getMemory, ctr.getNodeId.getHost, ctr.getId.toString))

Review comment:
       You don't need to populate the runningProcessors in the new flow right. If you were to populate the `pendingProcessors` instead and with the new flow `YarnClusterResourceManager` invokes a callback on `clusterManagerCallback.onStreamProcessorLaunchSuccess`.
   By doing so, you don't need modify any of the `ContainerPlacementManager`'s code and job health, `pendingProcessor` decrement and `runningProcessor` updates should be available for free.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -653,6 +656,20 @@ private String getPendingProcessorId(String containerId) {
     return null;
   }
 
+  /**
+   * Obtains the ID of the processor which is already running at a resource.
+   * @param resource where the processor is running
+   * @return the logical processorId of the processor (e.g., 0, 1, 2 ..)
+   */
+  private String getRunningProcessorId(SamzaResource resource) {
+    return state.runningProcessors.entrySet()
+        .stream()
+        .filter(e -> e.getValue().equals(resource))
+        .map(Map.Entry::getKey)
+        .findFirst()
+        .orElse(null);
+  }
+

Review comment:
       Why not use `getRunningProcessor` below by just using `getRunningProcessorId(resource.getContainerId()` instead?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1448: SAMZA-2606: container orchestration for AM HA

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1448:
URL: https://github.com/apache/samza/pull/1448#discussion_r536450494



##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -192,7 +193,8 @@ public YarnClusterResourceManager(Config config, JobModelManager jobModelManager
         clusterManagerConfig.getNumCores(),
         samzaAppState,
         state,
-        amClient
+        amClient,
+        new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()

Review comment:
       updated to invoke OnContainerStarted instead of OnContainersAllocated -- as that seemed to be the more appropriate one (setting job healthy etc). 
   fixed the npe arising from not having previous attempt containers in the allocatedResources map  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1448: SAMZA-2606: container orchestration for AM HA

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1448:
URL: https://github.com/apache/samza/pull/1448#discussion_r537826816



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -254,10 +257,10 @@ public void start() {
     });
     if (jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
       // don't request resource for container that is already running
-      state.runningProcessors.keySet().forEach(containerId -> {
-        processorToHost.remove(containerId);
-        state.neededProcessors.decrementAndGet();
-      });
+      LOG.info("have in samza app state: " + state.runningProcessors.size() + " running processors");
+      LOG.info("have in processorToHost " + processorToHost.size());
+      state.runningProcessors.keySet().forEach(processorToHost::remove);
+      LOG.info("after removing running processors : have in processorToHost " + processorToHost.size());

Review comment:
       can we consolidate the log statements? guess it is used for testing

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -420,6 +423,7 @@ public void onStreamProcessorLaunchSuccess(SamzaResource resource) {
       state.pendingProcessors.remove(processorId);
       state.runningProcessors.put(processorId, resource);
       if (state.neededProcessors.decrementAndGet() == 0) {
+        LOG.info("Manasa#: setting job healhty to true");

Review comment:
       remove the log :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1448: SAMZA-2606: container orchestration for AM HA

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1448:
URL: https://github.com/apache/samza/pull/1448#discussion_r533083809



##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
##########
@@ -48,6 +52,19 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
     val maxCapability = response.getMaximumResourceCapability
     val maxMem = maxCapability.getMemory
     val maxCpu = maxCapability.getVirtualCores
+    if (isApplicationMasterHighAvailabilityEnabled) {
+      val yarnIdToprocIdMap = new HashMap[String, String]()
+      samzaAppState.processorToExecutionId.asScala foreach { entry => yarnIdToprocIdMap.put(entry._2, entry._1) }
+      response.getContainersFromPreviousAttempts.asScala foreach { (ctr: Container) =>
+        val samzaProcId = yarnIdToprocIdMap.get(ctr.getId.toString)
+        if (samzaProcId != null) {
+          info("Received container from previous attempt with samza processor id %s and yarn container id %s" format(samzaProcId, ctr.getId.toString))
+          samzaAppState.runningProcessors.put(samzaProcId,
+            new SamzaResource(ctr.getResource.getVirtualCores, ctr.getResource.getMemory, ctr.getNodeId.getHost, ctr.getId.toString))
+          state.runningProcessors.put(samzaProcId, new YarnContainer(ctr))

Review comment:
       `YarnAppState` running processors has a map from `YarnContainerId` -> `YarnContainer`. This should be `ctr.getId.toString()` instead of `samzaProcId`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat merged pull request #1448: SAMZA-2606: container orchestration for AM HA

Posted by GitBox <gi...@apache.org>.
mynameborat merged pull request #1448:
URL: https://github.com/apache/samza/pull/1448


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1448: SAMZA-2606: container orchestration for AM HA

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1448:
URL: https://github.com/apache/samza/pull/1448#discussion_r532984363



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
##########
@@ -105,6 +105,22 @@
    */
   public final ConcurrentMap<String, SamzaResource> runningProcessors = new ConcurrentHashMap<>(0);
 
+  /**
+   * Map of Samza processor Id (aka logical id) to execution environment container id (aka physical id ex: yarn container id).
+   * This map will be used during the start up phase of new AM in AM-HA.
+   *
+   * This map is populated at startup of ClusterBasedJobCoordinator.
+   * It initially holds the processId to execution id mapping (if any) present in the coordinator stream.
+   * This could correspond to containers currently running or from previous attempt or previous deploy.
+   *
+   * If # of containers in map is same as current JobModel's containers, and mapping is from previous deploy,
+   * then they will get overwritten by new container incarnations in the current deploy in {@link ContainerProcessManager}.onStreamProcessorLaunchSuccess.
+   * If # of containers in map is lesser, then map entries will get overwritten by current containers and new ones will be added to map.
+   * If # of containers in map is greater, this map is wiped clear in {@link ContainerProcessManager}.start
+   * to avoid having mapping for containers which are not part of the current JobModel.
+   */
+  public final ConcurrentMap<String, String> processorToExecutionId = new ConcurrentHashMap<>(0);

Review comment:
       agreed. 
   
   I did consider not persisting this mapping in the state throughout lifecycle of the AM but decided to go with it as other option of passing mapping/metadata store to the life cycle will require changing some interfaces. 
   However, after our offline sync it does sound better to update the interface than persist this map.
   
   created a jira to follow it up SAMZA-2607.
   also removed the updates to this mapping in the state to keep in fresh in CPM




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1448: SAMZA-2606: container orchestration for AM HA

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1448:
URL: https://github.com/apache/samza/pull/1448#discussion_r537659102



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -236,11 +236,20 @@ public void start() {
       diagnosticsManager.get().start();
     }
 
+    if (jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
+      LOG.info(
+          "Set neededProcessors prior to starting clusterResourceManager because it gets running containres from prev attempts in AM HA.");
+      state.processorCount.set(state.jobModelManager.jobModel().getContainers().size());
+      state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size());
+    }
+
     LOG.info("Starting the cluster resource manager");
     clusterResourceManager.start();
 
-    state.processorCount.set(state.jobModelManager.jobModel().getContainers().size());
-    state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size());
+    if (!jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
+      state.processorCount.set(state.jobModelManager.jobModel().getContainers().size());
+      state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size());
+    }

Review comment:
       (i had written the response but forgot to submit it.)
   the reason for this flow is to keep the non-AMHA exactly the same as before.
   the big difference is setting neededProcessors prior to ClusterResourceManager.start() as it will invoke launch sucess which inturn will decrement neededProcessors and set job healthy accordingly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1448: SAMZA-2606: container orchestration for AM HA

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1448:
URL: https://github.com/apache/samza/pull/1448#discussion_r537618206



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionContainerIdMapping.java
##########
@@ -0,0 +1,65 @@
+/*

Review comment:
       Either needs a merge from latest master or this is an extra file that needs to be removed since it got renamed in master and the merge is keeping this as is.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -236,11 +236,20 @@ public void start() {
       diagnosticsManager.get().start();
     }
 
+    if (jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
+      LOG.info(
+          "Set neededProcessors prior to starting clusterResourceManager because it gets running containres from prev attempts in AM HA.");
+      state.processorCount.set(state.jobModelManager.jobModel().getContainers().size());
+      state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size());
+    }
+
     LOG.info("Starting the cluster resource manager");
     clusterResourceManager.start();
 
-    state.processorCount.set(state.jobModelManager.jobModel().getContainers().size());
-    state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size());
+    if (!jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
+      state.processorCount.set(state.jobModelManager.jobModel().getContainers().size());
+      state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size());
+    }

Review comment:
       Did you miss this comment?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1448: SAMZA-2606: container orchestration for AM HA

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1448:
URL: https://github.com/apache/samza/pull/1448#discussion_r537620505



##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
##########
@@ -48,6 +52,19 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
     val maxCapability = response.getMaximumResourceCapability
     val maxMem = maxCapability.getMemory
     val maxCpu = maxCapability.getVirtualCores
+    if (isApplicationMasterHighAvailabilityEnabled) {
+      val yarnIdToprocIdMap = new HashMap[String, String]()
+      samzaAppState.processorToExecutionId.asScala foreach { entry => yarnIdToprocIdMap.put(entry._2, entry._1) }

Review comment:
       Guess you are leaving it as is. Can you respond and close the comment?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1448: SAMZA-2606: container orchestration for AM HA

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1448:
URL: https://github.com/apache/samza/pull/1448#discussion_r536481015



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -653,6 +656,20 @@ private String getPendingProcessorId(String containerId) {
     return null;
   }
 
+  /**
+   * Obtains the ID of the processor which is already running at a resource.
+   * @param resource where the processor is running
+   * @return the logical processorId of the processor (e.g., 0, 1, 2 ..)
+   */
+  private String getRunningProcessorId(SamzaResource resource) {
+    return state.runningProcessors.entrySet()
+        .stream()
+        .filter(e -> e.getValue().equals(resource))
+        .map(Map.Entry::getKey)
+        .findFirst()
+        .orElse(null);
+  }
+

Review comment:
       Why not use `getRunningProcessor` below by just using `getRunningProcessorId(resource.getContainerId()` instead?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on pull request #1448: SAMZA-2606: container orchestration for AM HA

Posted by GitBox <gi...@apache.org>.
mynameborat commented on pull request #1448:
URL: https://github.com/apache/samza/pull/1448#issuecomment-739110018


   By the way, I really like leveraging the CPM `onStreamProcessorLaunchSuccess`. It makes it clean & consistent w.r.t updating state and orchestration.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org