You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/09/10 16:48:39 UTC
samza git commit: SAMZA-1824: Handle errors from the async-NMClient
when launching containers
Repository: samza
Updated Branches:
refs/heads/master ed621b269 -> 666835186
SAMZA-1824: Handle errors from the async-NMClient when launching containers
- Updated internal state that tracks "pending" containers correctly
- Refactored `YarnClusterResourceManager` for testability. Add an unit test
Author: Jagadish <jv...@linkedin.com>
Reviewers: Jake Maes<jm...@linkedin.com>
Closes #615 from vjagadish1989/container-launch-error
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/66683518
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/66683518
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/66683518
Branch: refs/heads/master
Commit: 6668351867a0548a944660f72f953baf74d35707
Parents: ed621b2
Author: Jagadish <jv...@linkedin.com>
Authored: Mon Sep 10 09:48:35 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Sep 10 09:48:35 2018 -0700
----------------------------------------------------------------------
.../job/yarn/YarnClusterResourceManager.java | 41 ++++++----
.../yarn/TestYarnClusterResourceManager.java | 81 ++++++++++++++++++++
2 files changed, 107 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/66683518/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 6f175ea..53b61d9 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -30,11 +30,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
@@ -114,17 +112,30 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
private final ConcurrentHashMap<SamzaResource, Container> allocatedResources = new ConcurrentHashMap<>();
private final ConcurrentHashMap<SamzaResourceRequest, AMRMClient.ContainerRequest> requestsMap = new ConcurrentHashMap<>();
- private final ConcurrentHashMap<ContainerId, Container> containersPendingStartup = new ConcurrentHashMap<>();
-
private final SamzaAppMasterMetrics metrics;
- final AtomicBoolean started = new AtomicBoolean(false);
+ private final AtomicBoolean started = new AtomicBoolean(false);
private final Object lock = new Object();
private final NMClientAsync nmClientAsync;
private static final Logger log = LoggerFactory.getLogger(YarnClusterResourceManager.class);
private final Config config;
+ YarnClusterResourceManager(AMRMClientAsync amClientAsync, NMClientAsync nmClientAsync, Callback callback,
+ YarnAppState yarnAppState, SamzaYarnAppMasterLifecycle lifecycle, SamzaYarnAppMasterService service,
+ SamzaAppMasterMetrics metrics, YarnConfiguration yarnConfiguration, Config config) {
+ super(callback);
+ this.yarnConfiguration = yarnConfiguration;
+ this.metrics = metrics;
+ this.yarnConfig = new YarnConfig(config);
+ this.config = config;
+ this.amClient = amClientAsync;
+ this.state = yarnAppState;
+ this.lifecycle = lifecycle;
+ this.service = service;
+ this.nmClientAsync = nmClientAsync;
+ }
+
/**
* Creates an YarnClusterResourceManager from config, a jobModelReader and a callback.
* @param config to instantiate the container manager with
@@ -513,18 +524,20 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
}
@Override
- public void onStartContainerError(ContainerId containerId, Throwable t) {
- log.error(String.format("Container: %s could not start.", containerId), t);
+ public void onStartContainerError(ContainerId yarnContainerId, Throwable t) {
+ log.error(String.format("Yarn Container: %s could not start.", yarnContainerId), t);
- Container container = containersPendingStartup.remove(containerId);
+ String samzaContainerId = getPendingSamzaContainerId(yarnContainerId);
- if (container != null) {
- SamzaResource resource = new SamzaResource(container.getResource().getVirtualCores(),
- container.getResource().getMemory(), container.getNodeId().getHost(), containerId.toString());
- log.info("Invoking failure callback for container: {}", containerId);
+ if (samzaContainerId != null) {
+ YarnContainer container = state.pendingYarnContainers.remove(samzaContainerId);
+ log.info("Failed Yarn Container: {} had Samza ContainerId: {} ", yarnContainerId, samzaContainerId);
+ SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
+ container.resource().getMemory(), container.nodeId().getHost(), yarnContainerId.toString());
+ log.info("Invoking failure callback for container: {}", yarnContainerId);
clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new SamzaContainerLaunchException(t));
} else {
- log.info("Got an invalid notification for container: {}", containerId);
+ log.info("Got an invalid notification for container: {}", yarnContainerId);
}
}
@@ -680,7 +693,6 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
log.info("Container ID {} using environment variables: {}", samzaContainerId, sb.toString());
}
-
/**
* Gets the environment variables from the specified {@link CommandBuilder} and escapes certain characters.
*
@@ -726,5 +738,4 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
return null;
}
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/66683518/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
new file mode 100644
index 0000000..7503c5b
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.samza.clustermanager.ClusterResourceManager;
+import org.apache.samza.config.Config;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.*;
+
+public class TestYarnClusterResourceManager {
+
+ @Test
+ public void testErrorInStartContainerShouldUpdateState() {
+ // create mocks
+ final int samzaContainerId = 1;
+ YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
+ SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
+ Config config = mock(Config.class);
+ AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
+ YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081);
+ SamzaYarnAppMasterLifecycle lifecycle = mock(SamzaYarnAppMasterLifecycle.class);
+ SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
+ NMClientAsync asyncNMClient = mock(NMClientAsync.class);
+ ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
+
+ // start the cluster manager
+ YarnClusterResourceManager yarnClusterResourceManager = new YarnClusterResourceManager(asyncClient, asyncNMClient,
+ callback, yarnAppState, lifecycle, service, metrics, yarnConfiguration, config);
+
+ yarnAppState.pendingYarnContainers.put(String.valueOf(samzaContainerId),
+ new YarnContainer(Container.newInstance(
+ ContainerId.newContainerId(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(10000l, 1), 1), 1),
+ NodeId.newInstance("host1", 8088), "http://host1",
+ Resource.newInstance(1024, 1), Priority.newInstance(1),
+ Token.newInstance("id".getBytes(), "read", "password".getBytes(), "service"))));
+
+ yarnClusterResourceManager.start();
+ assertEquals(1, yarnAppState.pendingYarnContainers.size());
+
+ yarnClusterResourceManager.onStartContainerError(ContainerId.newContainerId(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(10000l, 1), 1), 1),
+ new Exception());
+
+ assertEquals(0, yarnAppState.pendingYarnContainers.size());
+ verify(callback, times(1)).onStreamProcessorLaunchFailure(anyObject(), any(Exception.class));
+ }
+}
\ No newline at end of file