You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/09/10 16:48:39 UTC

samza git commit: SAMZA-1824: Handle errors from the async-NMClient when launching containers

Repository: samza
Updated Branches:
  refs/heads/master ed621b269 -> 666835186


SAMZA-1824: Handle errors from the async-NMClient when launching containers

- Updated internal state that tracks "pending" containers correctly
- Refactored `YarnClusterResourceManager` for testability. Add an unit test

Author: Jagadish <jv...@linkedin.com>

Reviewers: Jake Maes<jm...@linkedin.com>

Closes #615 from vjagadish1989/container-launch-error


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/66683518
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/66683518
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/66683518

Branch: refs/heads/master
Commit: 6668351867a0548a944660f72f953baf74d35707
Parents: ed621b2
Author: Jagadish <jv...@linkedin.com>
Authored: Mon Sep 10 09:48:35 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Sep 10 09:48:35 2018 -0700

----------------------------------------------------------------------
 .../job/yarn/YarnClusterResourceManager.java    | 41 ++++++----
 .../yarn/TestYarnClusterResourceManager.java    | 81 ++++++++++++++++++++
 2 files changed, 107 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/66683518/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 6f175ea..53b61d9 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -30,11 +30,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
@@ -114,17 +112,30 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
   private final ConcurrentHashMap<SamzaResource, Container> allocatedResources = new ConcurrentHashMap<>();
   private final ConcurrentHashMap<SamzaResourceRequest, AMRMClient.ContainerRequest> requestsMap = new ConcurrentHashMap<>();
 
-  private final ConcurrentHashMap<ContainerId, Container> containersPendingStartup = new ConcurrentHashMap<>();
-
   private final SamzaAppMasterMetrics metrics;
 
-  final AtomicBoolean started = new AtomicBoolean(false);
+  private final AtomicBoolean started = new AtomicBoolean(false);
   private final Object lock = new Object();
   private final NMClientAsync nmClientAsync;
 
   private static final Logger log = LoggerFactory.getLogger(YarnClusterResourceManager.class);
   private final Config config;
 
+  YarnClusterResourceManager(AMRMClientAsync amClientAsync, NMClientAsync nmClientAsync, Callback callback,
+      YarnAppState yarnAppState, SamzaYarnAppMasterLifecycle lifecycle, SamzaYarnAppMasterService service,
+      SamzaAppMasterMetrics metrics, YarnConfiguration yarnConfiguration, Config config) {
+    super(callback);
+    this.yarnConfiguration  = yarnConfiguration;
+    this.metrics = metrics;
+    this.yarnConfig = new YarnConfig(config);
+    this.config = config;
+    this.amClient = amClientAsync;
+    this.state = yarnAppState;
+    this.lifecycle = lifecycle;
+    this.service = service;
+    this.nmClientAsync = nmClientAsync;
+  }
+
   /**
    * Creates an YarnClusterResourceManager from config, a jobModelReader and a callback.
    * @param config to instantiate the container manager with
@@ -513,18 +524,20 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
   }
 
   @Override
-  public void onStartContainerError(ContainerId containerId, Throwable t) {
-    log.error(String.format("Container: %s could not start.", containerId), t);
+  public void onStartContainerError(ContainerId yarnContainerId, Throwable t) {
+    log.error(String.format("Yarn Container: %s could not start.", yarnContainerId), t);
 
-    Container container = containersPendingStartup.remove(containerId);
+    String samzaContainerId = getPendingSamzaContainerId(yarnContainerId);
 
-    if (container != null) {
-      SamzaResource resource = new SamzaResource(container.getResource().getVirtualCores(),
-          container.getResource().getMemory(), container.getNodeId().getHost(), containerId.toString());
-      log.info("Invoking failure callback for container: {}", containerId);
+    if (samzaContainerId != null) {
+      YarnContainer container = state.pendingYarnContainers.remove(samzaContainerId);
+      log.info("Failed Yarn Container: {} had Samza ContainerId: {} ", yarnContainerId, samzaContainerId);
+      SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
+          container.resource().getMemory(), container.nodeId().getHost(), yarnContainerId.toString());
+      log.info("Invoking failure callback for container: {}", yarnContainerId);
       clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new SamzaContainerLaunchException(t));
     } else {
-      log.info("Got an invalid notification for container: {}", containerId);
+      log.info("Got an invalid notification for container: {}", yarnContainerId);
     }
   }
 
@@ -680,7 +693,6 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     log.info("Container ID {} using environment variables: {}", samzaContainerId, sb.toString());
   }
 
-
   /**
    * Gets the environment variables from the specified {@link CommandBuilder} and escapes certain characters.
    *
@@ -726,5 +738,4 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     return null;
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/66683518/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
new file mode 100644
index 0000000..7503c5b
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.samza.clustermanager.ClusterResourceManager;
+import org.apache.samza.config.Config;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.*;
+
+public class TestYarnClusterResourceManager {
+
+  @Test
+  public void testErrorInStartContainerShouldUpdateState() {
+    // create mocks
+    final int samzaContainerId = 1;
+    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
+    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
+    Config config = mock(Config.class);
+    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
+    YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081);
+    SamzaYarnAppMasterLifecycle lifecycle = mock(SamzaYarnAppMasterLifecycle.class);
+    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
+    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
+    ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
+
+    // start the cluster manager
+    YarnClusterResourceManager yarnClusterResourceManager = new YarnClusterResourceManager(asyncClient, asyncNMClient,
+        callback, yarnAppState, lifecycle, service, metrics, yarnConfiguration, config);
+
+    yarnAppState.pendingYarnContainers.put(String.valueOf(samzaContainerId),
+        new YarnContainer(Container.newInstance(
+            ContainerId.newContainerId(
+                ApplicationAttemptId.newInstance(
+                    ApplicationId.newInstance(10000l, 1), 1), 1),
+            NodeId.newInstance("host1", 8088), "http://host1",
+            Resource.newInstance(1024, 1), Priority.newInstance(1),
+            Token.newInstance("id".getBytes(), "read", "password".getBytes(), "service"))));
+
+    yarnClusterResourceManager.start();
+    assertEquals(1, yarnAppState.pendingYarnContainers.size());
+
+    yarnClusterResourceManager.onStartContainerError(ContainerId.newContainerId(
+        ApplicationAttemptId.newInstance(
+            ApplicationId.newInstance(10000l, 1), 1), 1),
+        new Exception());
+
+    assertEquals(0, yarnAppState.pendingYarnContainers.size());
+    verify(callback, times(1)).onStreamProcessorLaunchFailure(anyObject(), any(Exception.class));
+  }
+}
\ No newline at end of file