You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/11/30 13:28:02 UTC

[GitHub] [samza] mynameborat commented on a change in pull request #1448: SAMZA-2606: container orchestration for AM HA

mynameborat commented on a change in pull request #1448:
URL: https://github.com/apache/samza/pull/1448#discussion_r532152731



##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
##########
@@ -48,6 +52,19 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
     val maxCapability = response.getMaximumResourceCapability
     val maxMem = maxCapability.getMemory
     val maxCpu = maxCapability.getVirtualCores
+    if (isApplicationMasterHighAvailabilityEnabled) {
+      val yarnIdToprocIdMap = new HashMap[String, String]()
+      samzaAppState.processorToExecutionId.asScala foreach { entry => yarnIdToprocIdMap.put(entry._2, entry._1) }
+      response.getContainersFromPreviousAttempts.asScala foreach { (ctr: Container) =>
+        val samzaProcId = yarnIdToprocIdMap.get(ctr.getId.toString)
+        if (samzaProcId != null) {
+          info("Received container from previous attempt with samza processor id %s and yarn container id %s" format(samzaProcId, ctr.getId.toString))
+          samzaAppState.runningProcessors.put(samzaProcId,
+            new SamzaResource(ctr.getResource.getVirtualCores, ctr.getResource.getMemory, ctr.getNodeId.getHost, ctr.getId.toString))
+          state.runningProcessors.put(samzaProcId, new YarnContainer(ctr))

Review comment:
       We need to handle the start & uptime within the `YarnContainer`. It looks like we are using it in the AM web servlet and that will be broken going forward with this change for containers from previous attempts.
   
   We should either document it or handle it depending on the complexity it adds to this PR.

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -192,7 +193,8 @@ public YarnClusterResourceManager(Config config, JobModelManager jobModelManager
         clusterManagerConfig.getNumCores(),
         samzaAppState,
         state,
-        amClient
+        amClient,
+        new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()

Review comment:
       After some digging through, realized we will need to handle populating allocated resources.
   `StopStreamProcessor` method will error out in the absence of that information.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -413,6 +428,9 @@ public void onStreamProcessorLaunchSuccess(SamzaResource resource) {
           processorId, containerId, containerHost);
       state.pendingProcessors.remove(processorId);
       state.runningProcessors.put(processorId, resource);
+      if (jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
+        state.processorToExecutionId.put(processorId, containerId);
+      }
 
       if (state.neededProcessors.decrementAndGet() == 0) {
         state.jobHealthy.set(true);

Review comment:
       The current flow doesn't set the job as healthy in the event that all the containers are from the previous attempt and there is no necessity to spin up a new one.
   
   

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
##########
@@ -105,6 +105,22 @@
    */
   public final ConcurrentMap<String, SamzaResource> runningProcessors = new ConcurrentHashMap<>(0);
 
+  /**
+   * Map of Samza processor Id (aka logical id) to execution environment container id (aka physical id ex: yarn container id).
+   * This map will be used during the start up phase of new AM in AM-HA.
+   *
+   * This map is populated at startup of ClusterBasedJobCoordinator.
+   * It initially holds the processId to execution id mapping (if any) present in the coordinator stream.
+   * This could correspond to containers currently running or from previous attempt or previous deploy.
+   *
+   * If # of containers in map is same as current JobModel's containers, and mapping is from previous deploy,
+   * then they will get overwritten by new container incarnations in the current deploy in {@link ContainerProcessManager}.onStreamProcessorLaunchSuccess.
+   * If # of containers in map is lesser, then map entries will get overwritten by current containers and new ones will be added to map.
+   * If # of containers in map is greater, this map is wiped clear in {@link ContainerProcessManager}.start
+   * to avoid having mapping for containers which are not part of the current JobModel.
+   */
+  public final ConcurrentMap<String, String> processorToExecutionId = new ConcurrentHashMap<>(0);

Review comment:
       Since we don't need this information beyond the construction phase, I am going to suggest to not update the information in other places.
   
   Ideally, have it wired during construction although if that doesn't seem straight forward, we can follow it up after the initial PR.

##########
File path: samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetExecutionContainerIdMapping;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.serializers.Serde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used for persisting and reading the execution environment container id information into the metadata store.
+ * Processor id (logical Samza container id) to execution environment container id (ex: yarn container id) is written.
+ **/
+public class ExecutionContainerIdManager {
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionContainerIdManager.class);
+
+  private final Serde<String> valueSerde;
+  private final MetadataStore metadataStore;
+
+  /**
+   * Builds the ExecutionContainerIdManager based upon the provided {@link MetadataStore} that is instantiated.
+   * Uses the {@link CoordinatorStreamValueSerde} to serialize messages before reading/writing into metadata store.
+   * @param metadataStore an instance of {@link MetadataStore} to read/write the container locality.
+   */
+  public ExecutionContainerIdManager(MetadataStore metadataStore) {
+    this.metadataStore = metadataStore;
+    this.valueSerde = new CoordinatorStreamValueSerde(SetExecutionContainerIdMapping.TYPE);
+  }
+
+  public void writeExecutionEnvironmentContainerIdMapping(String processorId, String executionEnvContainerId) {
+    Preconditions.checkNotNull(processorId, "Container's logical processor id can not be null.");
+    Preconditions.checkNotNull(executionEnvContainerId, "Container's physical execution environment container id can not be null.");
+    LOG.info("Container {} has executionEnvContainerId as {}", processorId, executionEnvContainerId);
+    metadataStore.put(processorId, valueSerde.toBytes(executionEnvContainerId));
+    metadataStore.flush();
+  }
+
+  public Map<String, String> readExecutionEnvironmentContainerIdMapping() {
+    Map<String, String> executionEnvironmentContainerIdMapping = new HashMap<>();
+    metadataStore.all().forEach((processorId, valueBytes) -> {
+      if (valueBytes != null) {
+        String executionEnvContainerId = valueSerde.fromBytes(valueBytes);
+        executionEnvironmentContainerIdMapping.put(processorId, executionEnvContainerId);
+      }
+    });
+    if (LOG.isDebugEnabled()) {
+      for (Map.Entry<String, String> entry : executionEnvironmentContainerIdMapping.entrySet()) {
+        LOG.debug("Execution evironment container id for container {}: {}", entry.getKey(), entry.getValue());

Review comment:
       same as above. Be consistent with the log message and replace container with `processor` and use `executionEnvContainerId` instead.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionContainerIdMapping.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * SetContainerIdExecutionContainerIdMapping is used internally by the Samza framework to
+ * persist the container processorId-to-executionEnvContainerId mappings.
+ *
+ * Structure of the message looks like:
+ * {
+ *     Key: $ContainerId

Review comment:
       s/ContainerId/ProcessorId

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionContainerIdMapping.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * SetContainerIdExecutionContainerIdMapping is used internally by the Samza framework to
+ * persist the container processorId-to-executionEnvContainerId mappings.
+ *
+ * Structure of the message looks like:
+ * {
+ *     Key: $ContainerId
+ *     Type: set-container-id-execution-id-assignment
+ *     Source: "SamzaContainer-$ContainerId"
+ *     MessageMap:
+ *     {
+ *         execution-env-container-id: execution environment container id
+ *     }
+ * }
+ * */
+public class SetExecutionContainerIdMapping extends CoordinatorStreamMessage {
+  public static final String TYPE = "set-container-id-execution-id-assignment";

Review comment:
       `set-execution-container-id-mapping` to be consistent with the class name similar to others.

##########
File path: samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
##########
@@ -226,6 +227,56 @@ public void run() {
     cpm.stop();
   }
 
+  @Test
+  public void testOnInitAMHighAvailability() throws Exception {
+    configVals.put(JobConfig.YARN_AM_HIGH_AVAILABILITY_ENABLED, "true");

Review comment:
       Seems like you are modifying a static map here and the remnants of the test is going to impact other tests in the class.
   
   Can we make a copy instead and add AM-HA configs?

##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
##########
@@ -48,6 +52,19 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
     val maxCapability = response.getMaximumResourceCapability
     val maxMem = maxCapability.getMemory
     val maxCpu = maxCapability.getVirtualCores
+    if (isApplicationMasterHighAvailabilityEnabled) {
+      val yarnIdToprocIdMap = new HashMap[String, String]()
+      samzaAppState.processorToExecutionId.asScala foreach { entry => yarnIdToprocIdMap.put(entry._2, entry._1) }

Review comment:
       Seems like a `BiMap` for `processorToExecutionId` is what is needed and should eliminate this piece of code here.
   

##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala
##########
@@ -48,6 +52,19 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaApp
     val maxCapability = response.getMaximumResourceCapability
     val maxMem = maxCapability.getMemory
     val maxCpu = maxCapability.getVirtualCores
+    if (isApplicationMasterHighAvailabilityEnabled) {
+      val yarnIdToprocIdMap = new HashMap[String, String]()
+      samzaAppState.processorToExecutionId.asScala foreach { entry => yarnIdToprocIdMap.put(entry._2, entry._1) }
+      response.getContainersFromPreviousAttempts.asScala foreach { (ctr: Container) =>
+        val samzaProcId = yarnIdToprocIdMap.get(ctr.getId.toString)
+        if (samzaProcId != null) {

Review comment:
       What happens if this is null? Shouldn't non-null be an invariant here?

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -192,7 +193,8 @@ public YarnClusterResourceManager(Config config, JobModelManager jobModelManager
         clusterManagerConfig.getNumCores(),
         samzaAppState,
         state,
-        amClient
+        amClient,
+        new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()

Review comment:
       Ideally, you want to follow through the `OnContainersAllocated` code path to ensure that all the state is intact when updating AM with the previous set of running containers.
   
   We also need to ensure both `YarnAppState` and `SamzaApplicationState` are up-to-date on top of the other internal state held by `YarnResourceClusterManager` and `ContainerAllocator` barring the state that is interim during the container assignment & starting.

##########
File path: samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetExecutionContainerIdMapping;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.serializers.Serde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used for persisting and reading the execution environment container id information into the metadata store.
+ * Processor id (logical Samza container id) to execution environment container id (ex: yarn container id) is written.
+ **/
+public class ExecutionContainerIdManager {
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionContainerIdManager.class);
+
+  private final Serde<String> valueSerde;
+  private final MetadataStore metadataStore;
+
+  /**
+   * Builds the ExecutionContainerIdManager based upon the provided {@link MetadataStore} that is instantiated.
+   * Uses the {@link CoordinatorStreamValueSerde} to serialize messages before reading/writing into metadata store.
+   * @param metadataStore an instance of {@link MetadataStore} to read/write the container locality.
+   */
+  public ExecutionContainerIdManager(MetadataStore metadataStore) {
+    this.metadataStore = metadataStore;
+    this.valueSerde = new CoordinatorStreamValueSerde(SetExecutionContainerIdMapping.TYPE);
+  }
+
+  public void writeExecutionEnvironmentContainerIdMapping(String processorId, String executionEnvContainerId) {
+    Preconditions.checkNotNull(processorId, "Container's logical processor id can not be null.");
+    Preconditions.checkNotNull(executionEnvContainerId, "Container's physical execution environment container id can not be null.");
+    LOG.info("Container {} has executionEnvContainerId as {}", processorId, executionEnvContainerId);

Review comment:
       nit: `Processor: {} has executionEnvContainerId: {}` to be consistent in other places. I remember Prateek made a whole clean up PR for this :)

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionContainerIdMapping.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * SetContainerIdExecutionContainerIdMapping is used internally by the Samza framework to
+ * persist the container processorId-to-executionEnvContainerId mappings.

Review comment:
       s/persist the container/persist

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionContainerIdMapping.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * SetContainerIdExecutionContainerIdMapping is used internally by the Samza framework to
+ * persist the container processorId-to-executionEnvContainerId mappings.
+ *
+ * Structure of the message looks like:
+ * {
+ *     Key: $ContainerId
+ *     Type: set-container-id-execution-id-assignment
+ *     Source: "SamzaContainer-$ContainerId"
+ *     MessageMap:
+ *     {
+ *         execution-env-container-id: execution environment container id
+ *     }
+ * }
+ * */
+public class SetExecutionContainerIdMapping extends CoordinatorStreamMessage {

Review comment:
       `SetExecutionEnvContainerIdMapping` instead of `SetExecutionContainerIdMapping` to be consistent in usage.

##########
File path: samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
##########
@@ -147,6 +149,14 @@ private static void run(
         heartbeatMonitor.start();
       }
 
+      if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
+        ExecutionContainerIdManager executionContainerIdManager = new ExecutionContainerIdManager(
+            new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetExecutionContainerIdMapping.TYPE));
+        if (executionContainerIdManager != null && execEnvContainerId.isPresent()) {

Review comment:
       we just create the manager above. why is this null check? should be `execEnvContainerId` instead. 
   
   I still think you don't need this check. You can use `Optional::ifPresent` and write it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org