You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/01/02 20:17:23 UTC
samza git commit: SAMZA-1528: Change ClusterResourceManager to use
the async NMClient
Repository: samza
Updated Branches:
refs/heads/master 93219c78d -> 882f61a69
SAMZA-1528: Change ClusterResourceManager to use the async NMClient
- Rewrite container handling to be asynchronous
- Verified various failure scenarios using Unit tests, and deployments of a local Samza job.
Author: Jagadish <jv...@linkedin.com>
Author: Fred Ji <ha...@gmail.com>
Author: Srinivasulu Punuru <sp...@linkedin.com>
Reviewers: Jacob Maes<jm...@linkedin.com>, Xinyu Liu<xi...@gmail.com>
Closes #380 from vjagadish1989/cluster-mgr-refactor1
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/882f61a6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/882f61a6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/882f61a6
Branch: refs/heads/master
Commit: 882f61a6925d3950e408ec8e202c0e8b6cdff450
Parents: 93219c7
Author: Jagadish <jv...@linkedin.com>
Authored: Tue Jan 2 12:16:51 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Jan 2 12:16:51 2018 -0800
----------------------------------------------------------------------
.../AbstractContainerAllocator.java | 17 +-
.../clustermanager/ClusterResourceManager.java | 26 +-
.../clustermanager/ContainerProcessManager.java | 74 ++++-
.../clustermanager/SamzaApplicationState.java | 8 +-
.../samza/clustermanager/SamzaResource.java | 8 +
.../samza/operators/impl/OperatorImpl.java | 16 +-
.../MockClusterResourceManager.java | 19 +-
.../MockClusterResourceManagerCallback.java | 10 +
.../clustermanager/TestContainerAllocator.java | 55 ----
.../TestContainerProcessManager.java | 101 +++---
.../TestHostAwareContainerAllocator.java | 56 ----
.../samza/operators/impl/TestOperatorImpl.java | 1 +
.../org/apache/samza/job/yarn/YarnAppState.java | 4 +-
.../job/yarn/YarnClusterResourceManager.java | 310 +++++++++++++++++--
.../samza/job/yarn/YarnContainerRunner.java | 272 ----------------
15 files changed, 498 insertions(+), 479 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index b83d83c..1b70857 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -151,20 +151,11 @@ public abstract class AbstractContainerAllocator implements Runnable {
log.info("Found available resources on {}. Assigning request for container_id {} with "
+ "timestamp {} to resource {}",
new Object[]{preferredHost, String.valueOf(containerID), request.getRequestTimestampMs(), resource.getResourceID()});
- try {
- //launches a StreamProcessor on the resource
- clusterResourceManager.launchStreamProcessor(resource, builder);
- if (state.neededContainers.decrementAndGet() == 0) {
- state.jobHealthy.set(true);
- }
- state.runningContainers.put(request.getContainerID(), resource);
-
- } catch (SamzaContainerLaunchException e) {
- log.warn(String.format("Got exception while starting resource %s. Requesting a new resource on any host", resource), e);
- resourceRequestState.releaseUnstartableContainer(resource);
- requestResource(containerID, ResourceRequestState.ANY_HOST);
- }
+ //Submit a request to launch a StreamProcessor on the provided resource. To match with the response returned later
+ //in the callback, we should also store state about the container whose launch is pending.
+ clusterResourceManager.launchStreamProcessor(resource, builder);
+ state.pendingContainers.put(containerID, resource);
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
index 715cf66..f8a8c8b 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
@@ -105,15 +105,21 @@ public abstract class ClusterResourceManager {
public abstract void releaseResources(SamzaResource resource);
/***
- * Requests the launch of a StreamProcessor with the specified context on the resource.
+ * Requests the launch of a StreamProcessor with the specified context on the resource asynchronously.
+ *
+ * <p>
+ * Either {@link Callback#onStreamProcessorLaunchSuccess(SamzaResource)} or
+ * {@link Callback#onStreamProcessorLaunchFailure(SamzaResource, Throwable)} will be invoked
+ * to indicate the result of this operation.
+ * </p>
+ *
* @param resource the specified resource
* @param builder A builder implementation that encapsulates the context for the
* StreamProcessor. A builder encapsulates the ID for the processor, the
* build environment, the command to execute etc.
- * @throws SamzaContainerLaunchException when there's an error during the requesting launch.
*
*/
- public abstract void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) throws SamzaContainerLaunchException;
+ public abstract void launchStreamProcessor(SamzaResource resource, CommandBuilder builder);
public abstract void stop(SamzaApplicationState.SamzaAppStatus status);
@@ -143,6 +149,20 @@ public abstract class ClusterResourceManager {
*/
void onResourcesCompleted(List<SamzaResourceStatus> resources);
+
+ /**
+ * Callback invoked when the launch request for a StreamProcessor on the {@link SamzaResource} is successful.
+ * @param resource the resource on which the StreamProcessor is launched
+ */
+ void onStreamProcessorLaunchSuccess(SamzaResource resource);
+
+ /**
+ * Callback invoked when there is a failure in launching a StreamProcessor on the provided {@link SamzaResource}.
+ * @param resource the resource on which the StreamProcessor was submitted for launching
+ * @param t the error in launching the StreamProcessor
+ */
+ void onStreamProcessorLaunchFailure(SamzaResource resource, Throwable t);
+
/***
* This callback is invoked when there is an error in the ClusterResourceManager. This is
* guaranteed to be invoked when there is an uncaught exception in any other
http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 6a18b84..474ac8c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -105,6 +105,18 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
*/
private final ContainerProcessManagerMetrics metrics;
+ //for testing
+ ContainerProcessManager(Config config, SamzaApplicationState state, MetricsRegistryMap registry, AbstractContainerAllocator allocator, ClusterResourceManager manager) {
+ this.state = state;
+ this.clusterManagerConfig = new ClusterManagerConfig(config);
+ this.jobConfig = new JobConfig(config);
+ this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
+ this.clusterResourceManager = manager;
+ this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
+ this.containerAllocator = allocator;
+ this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
+ }
+
public ContainerProcessManager(Config config,
SamzaApplicationState state,
MetricsRegistryMap registry) {
@@ -153,7 +165,6 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
log.info("finished initialization of samza task manager");
-
}
public boolean shouldShutdown() {
@@ -383,6 +394,50 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
}
}
+ @Override
+ public void onStreamProcessorLaunchSuccess(SamzaResource resource) {
+
+ // 1. Obtain the Samza container Id for the pending container on this resource.
+ String containerId = getPendingContainerId(resource.getResourceID());
+ log.info("Successfully started container ID: {} on resource: {}", containerId, resource);
+
+ // 2. Remove the container from the pending buffer and add it to the running buffer. Additionally, update the
+ // job-health metric.
+ if (containerId != null) {
+ log.info("Moving containerID: {} on resource: {} from pending to running state", containerId, resource);
+ state.pendingContainers.remove(containerId);
+ state.runningContainers.put(containerId, resource);
+
+ if (state.neededContainers.decrementAndGet() == 0) {
+ state.jobHealthy.set(true);
+ }
+ } else {
+ log.warn("SamzaResource {} was not in pending state. Got an invalid callback for a launch request that " +
+ "was not issued", resource);
+ }
+ }
+
+ @Override
+ public void onStreamProcessorLaunchFailure(SamzaResource resource, Throwable t) {
+ log.error("Got a launch failure for SamzaResource {} with exception {}", resource, t);
+ // 1. Release resources for containers that failed back to YARN
+ log.info("Releasing unstartable container {}", resource.getResourceID());
+ clusterResourceManager.releaseResources(resource);
+
+ // 2. Obtain the Samza container Id for the pending container on this resource.
+ String containerId = getPendingContainerId(resource.getResourceID());
+ log.info("Failed container ID: {} for resourceId: {}", containerId, resource.getResourceID());
+
+ // 3. Re-request resources on ANY_HOST in case of launch failures on the preferred host.
+ if (containerId != null) {
+ log.info("Launch of container ID: {} failed on host: {}. Falling back to ANY_HOST", containerId, resource.getHost());
+ containerAllocator.requestResource(containerId, ResourceRequestState.ANY_HOST);
+ } else {
+ log.warn("SamzaResource {} was not in pending state. Got an invalid callback for a launch request that was " +
+ "not issued", resource);
+ }
+ }
+
/**
* An error in the callback terminates the JobCoordinator
* @param e the underlying exception/error
@@ -419,6 +474,21 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
return factory;
}
+ /**
+ * Obtains the ID of the Samza container pending launch on the provided resource.
+ *
+ * @param resourceId the Id of the resource
+ * @return the Id of the Samza container on this resource
+ */
+ private String getPendingContainerId(String resourceId) {
+ for (Map.Entry<String, SamzaResource> entry: state.pendingContainers.entrySet()) {
+ if (entry.getValue().getResourceID().equals(resourceId)) {
+ log.info("Matching container ID found " + entry.getKey() + " " + entry.getValue());
+ return entry.getKey();
+ }
+ }
+ return null;
+ }
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index adc6e51..0dcaace 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -99,7 +99,13 @@ public class SamzaApplicationState {
public final AtomicInteger neededContainers = new AtomicInteger(0);
/**
- * Map of the samzaContainerId to the {@link SamzaResource} on which it is running
+ * Map of the samzaContainerId to the {@link SamzaResource} on which it is submitted for launch.
+ * Modified by both the NMCallback and the ContainerAllocator thread.
+ */
+ public final ConcurrentMap<String, SamzaResource> pendingContainers = new ConcurrentHashMap<String, SamzaResource>(0);
+
+ /**
+ * Map of the samzaContainerId to the {@link SamzaResource} on which it is running.
* Modified by both the AMRMCallbackThread and the ContainerAllocator thread
*/
public final ConcurrentMap<String, SamzaResource> runningContainers = new ConcurrentHashMap<String, SamzaResource>(0);
http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
index ba6ca2c..2927491 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
@@ -60,6 +60,14 @@ public class SamzaResource {
return result;
}
+ @Override
+ public String toString() {
+ return "SamzaResource{" +
+ "host='" + host + '\'' +
+ ", resourceID='" + resourceID + '\'' +
+ '}';
+ }
+
public int getNumCores() {
return numCores;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 862e5f9..9b2b4cf 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -63,7 +63,7 @@ public abstract class OperatorImpl<M, RM> {
private Counter numMessage;
private Timer handleMessageNs;
private Timer handleTimerNs;
- private long inputWatermark = WatermarkStates.WATERMARK_NOT_EXIST;
+ private long currentWatermark = WatermarkStates.WATERMARK_NOT_EXIST;
private long outputWatermark = WatermarkStates.WATERMARK_NOT_EXIST;
private TaskName taskName;
// Although the operator node is in the operator graph, the current task may not consume any message in it.
@@ -340,22 +340,22 @@ public abstract class OperatorImpl<M, RM> {
inputWatermarkMin = prevOperators.stream().map(op -> op.getOutputWatermark()).min(Long::compare).get();
}
- if (inputWatermark < inputWatermarkMin) {
+ if (currentWatermark < inputWatermarkMin) {
// advance the watermark time of this operator
- inputWatermark = inputWatermarkMin;
- LOG.trace("Advance input watermark to {} in operator {}", inputWatermark, getOpImplId());
+ currentWatermark = inputWatermarkMin;
+ LOG.trace("Advance input watermark to {} in operator {}", currentWatermark, getOpImplId());
final Long outputWm;
final Collection<RM> output;
final WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
if (watermarkFn != null) {
// user-overrided watermark handling here
- output = (Collection<RM>) watermarkFn.processWatermark(inputWatermark);
+ output = (Collection<RM>) watermarkFn.processWatermark(currentWatermark);
outputWm = watermarkFn.getOutputWatermark();
} else {
// use samza-provided watermark handling
// default is to propagate the input watermark
- output = handleWatermark(inputWatermark, collector, coordinator);
+ output = handleWatermark(currentWatermark, collector, coordinator);
outputWm = getOutputWatermark();
}
@@ -398,7 +398,7 @@ public abstract class OperatorImpl<M, RM> {
/* package private for testing */
final long getInputWatermark() {
- return this.inputWatermark;
+ return this.currentWatermark;
}
/**
@@ -409,7 +409,7 @@ public abstract class OperatorImpl<M, RM> {
protected long getOutputWatermark() {
if (usedInCurrentTask) {
// default as input
- return getInputWatermark();
+ return this.currentWatermark;
} else {
// always emit the max to indicate no input will be emitted afterwards
return Long.MAX_VALUE;
http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
index 0d13fb1..452cadc 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
@@ -19,10 +19,10 @@
package org.apache.samza.clustermanager;
+import com.google.common.collect.ImmutableList;
import org.apache.samza.job.CommandBuilder;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -47,9 +47,11 @@ public class MockClusterResourceManager extends ClusterResourceManager {
@Override
public void requestResources(SamzaResourceRequest resourceRequest) {
- SamzaResource resource = new SamzaResource(resourceRequest.getNumCores(), resourceRequest.getMemoryMB(), resourceRequest.getPreferredHost(), UUID.randomUUID().toString());
- List<SamzaResource> resources = Collections.singletonList(resource);
- resourceRequests.addAll(resources);
+ SamzaResource resource = new SamzaResource(resourceRequest.getNumCores(), resourceRequest.getMemoryMB(),
+ resourceRequest.getPreferredHost(), UUID.randomUUID().toString());
+ resourceRequests.add(resource);
+
+ clusterManagerCallback.onResourcesAvailable(ImmutableList.of(resource));
}
@Override
@@ -63,15 +65,16 @@ public class MockClusterResourceManager extends ClusterResourceManager {
}
@Override
- public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) throws SamzaContainerLaunchException {
+ public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) {
if (nextException != null) {
- throw new SamzaContainerLaunchException(nextException);
+ clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new SamzaContainerLaunchException(nextException));
+ } else {
+ launchedResources.add(resource);
+ clusterManagerCallback.onStreamProcessorLaunchSuccess(resource);
}
- launchedResources.add(resource);
for (MockContainerListener listener : mockContainerListeners) {
listener.postRunContainer(launchedResources.size());
}
-
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
index 5079625..4e6e2c9 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java
@@ -38,6 +38,16 @@ public class MockClusterResourceManagerCallback implements ClusterResourceManage
}
@Override
+ public void onStreamProcessorLaunchSuccess(SamzaResource resource) {
+ // no op
+ }
+
+ @Override
+ public void onStreamProcessorLaunchFailure(SamzaResource resource, Throwable t) {
+ // no op
+ }
+
+ @Override
public void onError(Throwable e) {
error = e;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
index 734043a..4596673 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
@@ -30,7 +30,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
@@ -202,58 +201,4 @@ public class TestContainerAllocator {
listener.verify();
}
-
- /**
- * If the container fails to start e.g because it fails to connect to a NM on a host that
- * is down, the allocator should request a new container on a different host.
- */
- @Test
- public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
- final SamzaResource container = new SamzaResource(1, 1024, "2", "id0");
- final SamzaResource container1 = new SamzaResource(1, 1024, "2", "id0");
- manager.nextException = new IOException("Cant connect to RM");
-
- // Set up our final asserts before starting the allocator thread
- MockContainerListener listener = new MockContainerListener(2, 1, 2, 0, null, new Runnable() {
- @Override
- public void run() {
- // The failed container should be released. The successful one should not.
- assertNotNull(manager.releasedResources);
- assertEquals(1, manager.releasedResources.size());
- assertTrue(manager.releasedResources.contains(container));
- }
- },
- new Runnable() {
- @Override
- public void run() {
- // Test that the first request assignment had a preferred host and the retry didn't
- assertEquals(2, requestState.assignedRequests.size());
-
- SamzaResourceRequest request = requestState.assignedRequests.remove();
- assertEquals("0", request.getContainerID());
- assertEquals("2", request.getPreferredHost());
-
- request = requestState.assignedRequests.remove();
- assertEquals("0", request.getContainerID());
- assertEquals("ANY_HOST", request.getPreferredHost());
-
- // This routine should be called after the retry is assigned, but before it's started.
- // So there should still be 1 container needed.
- assertEquals(1, state.neededContainers.get());
- }
- }, null
- );
- state.neededContainers.set(1);
- requestState.registerContainerListener(listener);
-
- containerAllocator.requestResource("0", "2");
- containerAllocator.addResource(container);
- containerAllocator.addResource(container1);
- allocatorThread.start();
-
- listener.verify();
-
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index e252b7d..5c2fe4a 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -48,7 +48,7 @@ import static org.mockito.Mockito.when;
public class TestContainerProcessManager {
private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
- private final MockClusterResourceManager manager = new MockClusterResourceManager(callback);
+ private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback);
private static volatile boolean isRunning = false;
@@ -125,7 +125,7 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- manager
+ clusterResourceManager
);
AbstractContainerAllocator allocator =
@@ -145,7 +145,7 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- manager
+ clusterResourceManager
);
allocator =
@@ -165,11 +165,11 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- manager
+ clusterResourceManager
);
MockContainerAllocator allocator = new MockContainerAllocator(
- manager,
+ clusterResourceManager,
conf,
state);
@@ -207,7 +207,7 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- manager
+ clusterResourceManager
);
taskManager.start();
@@ -231,11 +231,11 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- manager
+ clusterResourceManager
);
MockContainerAllocator allocator = new MockContainerAllocator(
- manager,
+ clusterResourceManager,
conf,
state);
@@ -258,6 +258,7 @@ public class TestContainerProcessManager {
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
+ taskManager.onStreamProcessorLaunchSuccess(container);
assertFalse(taskManager.shouldShutdown());
taskManager.onResourceCompleted(new SamzaResourceStatus("id0", "diagnostics", SamzaResourceStatus.SUCCESS));
@@ -278,11 +279,11 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- manager
+ clusterResourceManager
);
MockContainerAllocator allocator = new MockContainerAllocator(
- manager,
+ clusterResourceManager,
conf,
state);
@@ -305,7 +306,7 @@ public class TestContainerProcessManager {
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
-
+ taskManager.onStreamProcessorLaunchSuccess(container);
// Create first container failure
taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "diagnostics", 1));
@@ -316,8 +317,8 @@ public class TestContainerProcessManager {
assertFalse(taskManager.shouldShutdown());
assertFalse(state.jobHealthy.get());
- assertEquals(2, manager.resourceRequests.size());
- assertEquals(0, manager.releasedResources.size());
+ assertEquals(2, clusterResourceManager.resourceRequests.size());
+ assertEquals(0, clusterResourceManager.releasedResources.size());
taskManager.onResourceAllocated(container);
@@ -325,7 +326,7 @@ public class TestContainerProcessManager {
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
-
+ taskManager.onStreamProcessorLaunchSuccess(container);
assertTrue(state.jobHealthy.get());
// Create a second failure
@@ -334,8 +335,8 @@ public class TestContainerProcessManager {
// The above failure should trigger a job shutdown because our retry count is set to 1
assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
- assertEquals(2, manager.resourceRequests.size());
- assertEquals(0, manager.releasedResources.size());
+ assertEquals(2, clusterResourceManager.resourceRequests.size());
+ assertEquals(0, clusterResourceManager.releasedResources.size());
assertFalse(state.jobHealthy.get());
assertTrue(taskManager.shouldShutdown());
assertEquals(SamzaApplicationState.SamzaAppStatus.FAILED, state.status);
@@ -355,11 +356,11 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- manager
+ clusterResourceManager
);
MockContainerAllocator allocator = new MockContainerAllocator(
- manager,
+ clusterResourceManager,
conf,
state);
getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
@@ -367,7 +368,7 @@ public class TestContainerProcessManager {
Thread thread = new Thread(allocator);
getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread);
- // Start the task manager
+ // Start the task clusterResourceManager
taskManager.start();
SamzaResource container = new SamzaResource(1, 1000, "abc", "id1");
@@ -389,6 +390,29 @@ public class TestContainerProcessManager {
}
@Test
+ public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
+ state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(1));
+ Map<String, String> configMap = new HashMap<>();
+ configMap.putAll(getConfig());
+
+ MockContainerAllocator allocator = new MockContainerAllocator(
+ clusterResourceManager,
+ new MapConfig(config),
+ state);
+
+ ContainerProcessManager manager = new ContainerProcessManager(config, state, new MetricsRegistryMap(), allocator,
+ clusterResourceManager);
+
+ manager.start();
+ SamzaResource resource = new SamzaResource(1, 1024, "abc", "resource-1");
+ state.pendingContainers.put("1", resource);
+ Assert.assertEquals(clusterResourceManager.resourceRequests.size(), 1);
+ manager.onStreamProcessorLaunchFailure(resource, new Exception("cannot launch container!"));
+ Assert.assertEquals(clusterResourceManager.resourceRequests.size(), 2);
+ Assert.assertEquals(clusterResourceManager.resourceRequests.get(1).getHost(), ResourceRequestState.ANY_HOST);
+ }
+
+ @Test
public void testDuplicateNotificationsDoNotAffectJobHealth() throws Exception {
Config conf = getConfig();
@@ -400,11 +424,11 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- manager
+ clusterResourceManager
);
MockContainerAllocator allocator = new MockContainerAllocator(
- manager,
+ clusterResourceManager,
conf,
state);
getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
@@ -424,6 +448,7 @@ public class TestContainerProcessManager {
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
+ taskManager.onStreamProcessorLaunchSuccess(container1);
assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
// Create container failure - with ContainerExitStatus.DISKS_FAILED
@@ -433,8 +458,8 @@ public class TestContainerProcessManager {
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
assertFalse(taskManager.shouldShutdown());
assertFalse(state.jobHealthy.get());
- assertEquals(2, manager.resourceRequests.size());
- assertEquals(0, manager.releasedResources.size());
+ assertEquals(2, clusterResourceManager.resourceRequests.size());
+ assertEquals(0, clusterResourceManager.releasedResources.size());
assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
SamzaResource container2 = new SamzaResource(1, 1000, "abc", "id2");
@@ -444,14 +469,16 @@ public class TestContainerProcessManager {
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
+ taskManager.onStreamProcessorLaunchSuccess(container2);
+
assertTrue(state.jobHealthy.get());
// Simulate a duplicate notification for container 1 with a different exit code
taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "Disk failure", SamzaResourceStatus.PREEMPTED));
// assert that a duplicate notification does not change metrics (including job health)
assertEquals(state.redundantNotifications.get(), 1);
- assertEquals(2, manager.resourceRequests.size());
- assertEquals(0, manager.releasedResources.size());
+ assertEquals(2, clusterResourceManager.resourceRequests.size());
+ assertEquals(0, clusterResourceManager.releasedResources.size());
assertTrue(state.jobHealthy.get());
}
@@ -471,11 +498,11 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- manager
+ clusterResourceManager
);
MockContainerAllocator allocator = new MockContainerAllocator(
- manager,
+ clusterResourceManager,
conf,
state);
getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator);
@@ -483,7 +510,7 @@ public class TestContainerProcessManager {
Thread thread = new Thread(allocator);
getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread);
- // Start the task manager
+ // Start the task clusterResourceManager
taskManager.start();
assertFalse(taskManager.shouldShutdown());
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
@@ -496,7 +523,7 @@ public class TestContainerProcessManager {
fail("timed out waiting for the containers to start");
}
assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
-
+ taskManager.onStreamProcessorLaunchSuccess(container1);
// Create container failure - with ContainerExitStatus.DISKS_FAILED
taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "Disk failure", SamzaResourceStatus.DISK_FAIL));
@@ -504,8 +531,8 @@ public class TestContainerProcessManager {
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
assertFalse(taskManager.shouldShutdown());
assertFalse(state.jobHealthy.get());
- assertEquals(2, manager.resourceRequests.size());
- assertEquals(0, manager.releasedResources.size());
+ assertEquals(2, clusterResourceManager.resourceRequests.size());
+ assertEquals(0, clusterResourceManager.releasedResources.size());
assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
SamzaResource container2 = new SamzaResource(1, 1000, "abc", "id2");
@@ -515,10 +542,11 @@ public class TestContainerProcessManager {
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
+ taskManager.onStreamProcessorLaunchSuccess(container2);
// Create container failure - with ContainerExitStatus.PREEMPTED
taskManager.onResourceCompleted(new SamzaResourceStatus(container2.getResourceID(), "Preemption", SamzaResourceStatus.PREEMPTED));
- assertEquals(3, manager.resourceRequests.size());
+ assertEquals(3, clusterResourceManager.resourceRequests.size());
// The above failure should trigger a container request
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
@@ -532,14 +560,15 @@ public class TestContainerProcessManager {
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
+ taskManager.onStreamProcessorLaunchSuccess(container3);
// Create container failure - with ContainerExitStatus.ABORTED
taskManager.onResourceCompleted(new SamzaResourceStatus(container3.getResourceID(), "Aborted", SamzaResourceStatus.ABORTED));
// The above failure should trigger a container request
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
- assertEquals(4, manager.resourceRequests.size());
- assertEquals(0, manager.releasedResources.size());
+ assertEquals(4, clusterResourceManager.resourceRequests.size());
+ assertEquals(0, clusterResourceManager.releasedResources.size());
assertFalse(taskManager.shouldShutdown());
assertFalse(state.jobHealthy.get());
assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
@@ -556,7 +585,7 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- manager
+ clusterResourceManager
);
taskManager.start();
SamzaResource container2 = new SamzaResource(1, 1024, "", "id0");
@@ -570,7 +599,7 @@ public class TestContainerProcessManager {
new MapConfig(config),
state,
new MetricsRegistryMap(),
- manager
+ clusterResourceManager
);
taskManager1.start();
taskManager1.onResourceAllocated(container2);
http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index 00198e9..6260b71 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -18,7 +18,6 @@
*/
package org.apache.samza.clustermanager;
-import java.io.IOException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
@@ -204,61 +203,6 @@ public class TestHostAwareContainerAllocator {
}
/**
- * If the container fails to start e.g because it fails to connect to a NM on a host that
- * is down, the allocator should request a new container on a different host.
- */
- @Test
- public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
-
- final SamzaResource container = new SamzaResource(1, 1024, "2", "id0");
- final SamzaResource container1 = new SamzaResource(1, 1024, "1", "id1");
- manager.nextException = new IOException("Cant connect to RM");
-
- // Set up our final asserts before starting the allocator thread
- MockContainerListener listener = new MockContainerListener(2, 1, 2, 0, null, new Runnable() {
- @Override
- public void run() {
- // The failed container should be released. The successful one should not.
- assertNotNull(manager.releasedResources);
- assertEquals(1, manager.releasedResources.size());
- assertTrue(manager.releasedResources.contains(container));
- }
- },
- new Runnable() {
- @Override
- public void run() {
- // Test that the first request assignment had a preferred host and the retry didn't
- assertEquals(2, requestState.assignedRequests.size());
-
- SamzaResourceRequest request = requestState.assignedRequests.remove();
- assertEquals("0", request.getContainerID());
- assertEquals("2", request.getPreferredHost());
-
- request = requestState.assignedRequests.remove();
- assertEquals("0", request.getContainerID());
- assertEquals("ANY_HOST", request.getPreferredHost());
-
- // This routine should be called after the retry is assigned, but before it's started.
- // So there should still be 1 container needed.
- assertEquals(1, state.neededContainers.get());
- }
- }, null
- );
- state.neededContainers.set(1);
- requestState.registerContainerListener(listener);
-
- // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry)
- containerAllocator.requestResource("0", "2");
- containerAllocator.addResource(container1);
- containerAllocator.addResource(container);
-
- allocatorThread.start();
-
- listener.verify();
- }
-
-
- /**
* Handles expired requests correctly and assigns ANY_HOST
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 904367b..a4f0354 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -21,6 +21,7 @@ package org.apache.samza.operators.impl;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
+
import org.apache.samza.config.Config;
import org.apache.samza.container.TaskContextImpl;
import org.apache.samza.metrics.Counter;
http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
index db67de6..fb9a9c2 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
@@ -49,7 +49,9 @@ public class YarnAppState {
* Modified by both the AMRMCallbackThread and the ContainerAllocator thread
*/
- public Map<String, YarnContainer> runningYarnContainers = new ConcurrentHashMap<String, YarnContainer>() ;
+ public Map<String, YarnContainer> runningYarnContainers = new ConcurrentHashMap<String, YarnContainer>();
+
+ public Map<String, YarnContainer> pendingYarnContainers = new ConcurrentHashMap<String, YarnContainer>();
public ConcurrentMap<String, ContainerStatus> failedContainersStatus = new ConcurrentHashMap<String, ContainerStatus>();
http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 96a4488..9be8475 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -19,30 +19,46 @@
package org.apache.samza.job.yarn;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.*;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.samza.SamzaException;
import org.apache.samza.clustermanager.*;
import org.apache.samza.clustermanager.SamzaApplicationState;
import org.apache.samza.clustermanager.SamzaContainerLaunchException;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.YarnConfig;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.job.CommandBuilder;
import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.util.Util;
import org.apache.samza.util.hadoop.HttpFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -61,7 +77,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
*/
-public class YarnClusterResourceManager extends ClusterResourceManager implements AMRMClientAsync.CallbackHandler {
+public class YarnClusterResourceManager extends ClusterResourceManager implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
private final String INVALID_YARN_CONTAINER_ID = "-1";
@@ -71,14 +87,9 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
private final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;
/**
- * A helper class to launch Yarn containers.
- */
- private final YarnContainerRunner yarnContainerRunner;
-
- /**
* Configuration and state specific to Yarn.
*/
- private final YarnConfiguration hConfig;
+ private final YarnConfiguration yarnConfiguration;
private final YarnAppState state;
/**
@@ -100,12 +111,16 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
private final ConcurrentHashMap<SamzaResource, Container> allocatedResources = new ConcurrentHashMap<>();
private final ConcurrentHashMap<SamzaResourceRequest, AMRMClient.ContainerRequest> requestsMap = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<ContainerId, Container> containersPendingStartup = new ConcurrentHashMap<>();
+
private final SamzaAppMasterMetrics metrics;
final AtomicBoolean started = new AtomicBoolean(false);
private final Object lock = new Object();
+ private final NMClientAsync nmClientAsync;
private static final Logger log = LoggerFactory.getLogger(YarnClusterResourceManager.class);
+ private final Config config;
/**
* Creates an YarnClusterResourceManager from config, a jobModelReader and a callback.
@@ -116,15 +131,15 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
*/
public YarnClusterResourceManager(Config config, JobModelManager jobModelManager, ClusterResourceManager.Callback callback, SamzaApplicationState samzaAppState ) {
super(callback);
- hConfig = new YarnConfiguration();
- hConfig.set("fs.http.impl", HttpFileSystem.class.getName());
+ yarnConfiguration = new YarnConfiguration();
+ yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
// Use the Samza job config "fs.<scheme>.impl" and "fs.<scheme>.impl.*" for YarnConfiguration
FileSystemImplConfig fsImplConfig = new FileSystemImplConfig(config);
fsImplConfig.getSchemes().forEach(
scheme -> {
fsImplConfig.getSchemeConfig(scheme).forEach(
- (confKey, confValue) -> hConfig.set(confKey, confValue)
+ (confKey, confValue) -> yarnConfiguration.set(confKey, confValue)
);
}
);
@@ -143,6 +158,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
int nodeHttpPort = Integer.parseInt(nodeHttpPortString);
YarnConfig yarnConfig = new YarnConfig(config);
this.yarnConfig = yarnConfig;
+ this.config = config;
int interval = yarnConfig.getAMPollIntervalMs();
//Instantiate the AM Client.
@@ -151,7 +167,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
this.state = new YarnAppState(-1, containerId, nodeHostString, nodePort, nodeHttpPort);
log.info("Initialized YarnAppState: {}", state.toString());
- this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, registry, hConfig);
+ this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, registry, yarnConfiguration);
log.info("ContainerID str {}, Nodehost {} , Nodeport {} , NodeHttpport {}", new Object [] {containerIdStr, nodeHostString, nodePort, nodeHttpPort});
ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
@@ -162,8 +178,8 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
state,
amClient
);
+ this.nmClientAsync = NMClientAsync.createNMClientAsync(this);
- yarnContainerRunner = new YarnContainerRunner(config, hConfig);
}
/**
@@ -179,8 +195,10 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
metrics.start();
service.onInit();
log.info("Starting YarnContainerManager.");
- amClient.init(hConfig);
+ amClient.init(yarnConfiguration);
amClient.start();
+ nmClientAsync.init(yarnConfiguration);
+ nmClientAsync.start();
lifecycle.onInit();
if(lifecycle.shouldShutdown()) {
@@ -262,19 +280,22 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
*/
@Override
- public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) throws SamzaContainerLaunchException {
+ public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) {
String containerIDStr = builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID());
- log.info("Received launch request for {} on hostname {}", containerIDStr , resource.getHost());
-
+ log.info("Received launch request for {} on hostname {}", containerIDStr, resource.getHost());
synchronized (lock) {
- Container container = allocatedResources.get(resource);
- if (container == null) {
- log.info("Resource {} already released. ", resource);
- return;
- }
+ try {
+ Container container = allocatedResources.get(resource);
+ if (container == null) {
+ log.info("Resource {} already released. ", resource);
+ return;
+ }
- state.runningYarnContainers.put(containerIDStr, new YarnContainer(container));
- yarnContainerRunner.runContainer(containerIDStr, container, builder);
+ runContainer(containerIDStr, container, builder);
+ } catch (Throwable t) {
+ log.error("Error in launching stream processor:", t);
+ clusterManagerCallback.onStreamProcessorLaunchFailure(resource, t);
+ }
}
}
@@ -338,6 +359,8 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
lifecycle.onShutdown(status);
amClient.stop();
log.info("Stopping the AM service " );
+ nmClientAsync.stop();
+ log.info("Stopping the NM service " );
service.onShutdown();
metrics.stop();
@@ -358,7 +381,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
FileSystem fs = null;
try {
- fs = FileSystem.get(hConfig);
+ fs = FileSystem.get(yarnConfiguration);
} catch (IOException e) {
log.error("Unable to clean up file system: {}", e);
return;
@@ -454,4 +477,243 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
clusterManagerCallback.onError(e);
}
+ @Override
+ public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
+ log.info("Received a containerStart notification from the NodeManager for container: {} ", containerId);
+ String samzaContainerId = getPendingSamzaContainerId(containerId);
+
+ if (samzaContainerId != null) {
+ // 1. Move the container from pending to running state
+ final YarnContainer container = state.pendingYarnContainers.remove(samzaContainerId);
+ log.info("Samza containerId:{} has started", samzaContainerId);
+
+ state.runningYarnContainers.put(samzaContainerId, container);
+
+ // 2. Invoke the success callback.
+ SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(),
+ container.resource().getMemory(), container.nodeId().getHost(), containerId.toString());
+ clusterManagerCallback.onStreamProcessorLaunchSuccess(resource);
+ } else {
+ log.info("Got an invalid notification from YARN for container: {}", containerId);
+ }
+ }
+
+ @Override
+ public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
+ log.info("Got a status from the NodeManager. Container: {} Status: {}", containerId, containerStatus.getState());
+ }
+
+ @Override
+ public void onContainerStopped(ContainerId containerId) {
+ log.info("Got a notification from the NodeManager for a stopped container. ContainerId: {}", containerId);
+ }
+
+ @Override
+ public void onStartContainerError(ContainerId containerId, Throwable t) {
+ log.error(String.format("Container: %s could not start.", containerId), t);
+
+ Container container = containersPendingStartup.remove(containerId);
+
+ if (container != null) {
+ SamzaResource resource = new SamzaResource(container.getResource().getVirtualCores(),
+ container.getResource().getMemory(), container.getNodeId().getHost(), containerId.toString());
+ log.info("Invoking failure callback for container: {}", containerId);
+ clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new SamzaContainerLaunchException(t));
+ } else {
+ log.info("Got an invalid notification for container: {}", containerId);
+ }
+ }
+
+ @Override
+ public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
+ log.info("Got an error on getContainerStatus from the NodeManager. ContainerId: {}. Error: {}", containerId, t);
+ }
+
+ @Override
+ public void onStopContainerError(ContainerId containerId, Throwable t) {
+ log.info("Got an error when stopping container from the NodeManager. ContainerId: {}. Error: {}", containerId, t);
+ }
+
+ /**
+ * Runs a process as specified by the command builder on the container.
+ * @param samzaContainerId id of the samza Container to run (passed as a command line parameter to the process)
+ * @param container the samza container to run.
+ * @param cmdBuilder the command builder that encapsulates the command, and the context
+ *
+ */
+ public void runContainer(String samzaContainerId, Container container, CommandBuilder cmdBuilder) throws IOException {
+ String containerIdStr = ConverterUtils.toString(container.getId());
+ log.info("Got available container ID ({}) for container: {}", samzaContainerId, container);
+
+ // check if we have framework path specified. If yes - use it, if not use default ./__package/
+ String jobLib = ""; // in case of separate framework, this directory will point at the job's libraries
+ String cmdPath = "./__package/";
+
+ String fwkPath = JobConfig.getFwkPath(this.config);
+ if(fwkPath != null && (! fwkPath.isEmpty())) {
+ cmdPath = fwkPath;
+ jobLib = "export JOB_LIB_DIR=./__package/lib";
+ }
+ log.info("In runContainer in util: fwkPath= " + fwkPath + ";cmdPath=" + cmdPath + ";jobLib=" + jobLib);
+ cmdBuilder.setCommandPath(cmdPath);
+
+
+ String command = cmdBuilder.buildCommand();
+ log.info("Container ID {} using command {}", samzaContainerId, command);
+
+ Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
+ env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString()));
+ printContainerEnvironmentVariables(samzaContainerId, env);
+
+ log.info("Samza FWK path: " + command + "; env=" + env);
+
+ Path packagePath = new Path(yarnConfig.getPackagePath());
+ log.info("Starting container ID {} using package path {}", samzaContainerId, packagePath);
+ state.pendingYarnContainers.put(samzaContainerId, new YarnContainer(container));
+
+ startContainer(
+ packagePath,
+ container,
+ env,
+ getFormattedCommand(
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+ jobLib,
+ command,
+ ApplicationConstants.STDOUT,
+ ApplicationConstants.STDERR)
+ );
+
+
+ log.info("Claimed container ID {} for container {} on node {} (http://{}/node/containerlogs/{}).",
+ new Object[]{
+ samzaContainerId,
+ containerIdStr,
+ container.getNodeId().getHost(),
+ container.getNodeHttpAddress(),
+ containerIdStr}
+ );
+
+ log.info("Started container ID {}", samzaContainerId);
+ }
+
+ /**
+ * Runs a command as a process on the container. All binaries needed by the physical process are packaged in the URL
+ * specified by packagePath.
+ */
+ private void startContainer(Path packagePath,
+ Container container,
+ Map<String, String> env,
+ final String cmd) throws IOException {
+ log.info("Starting container {} {} {} {}",
+ new Object[]{packagePath, container, env, cmd});
+
+ LocalResource packageResource = Records.newRecord(LocalResource.class);
+ URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath);
+ FileStatus fileStatus;
+ fileStatus = packagePath.getFileSystem(yarnConfiguration).getFileStatus(packagePath);
+ packageResource.setResource(packageUrl);
+ log.info("Set package resource in YarnContainerRunner for {}", packageUrl);
+ packageResource.setSize(fileStatus.getLen());
+ packageResource.setTimestamp(fileStatus.getModificationTime());
+ packageResource.setType(LocalResourceType.ARCHIVE);
+ packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
+
+ ByteBuffer allTokens;
+ // copy tokens to start the container
+ Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+
+ // now remove the AM->RM token so that containers cannot access it
+ Iterator iter = credentials.getAllTokens().iterator();
+ while (iter.hasNext()) {
+ TokenIdentifier token = ((org.apache.hadoop.security.token.Token) iter.next()).decodeIdentifier();
+ if (token != null && token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+ iter.remove();
+ }
+ }
+ allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+ Map<String, LocalResource> localResourceMap = new HashMap<>();
+ localResourceMap.put("__package", packageResource);
+
+ // include the resources from the universal resource configurations
+ LocalizerResourceMapper resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), yarnConfiguration);
+ localResourceMap.putAll(resourceMapper.getResourceMap());
+
+ ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
+ context.setEnvironment(env);
+ context.setTokens(allTokens.duplicate());
+ context.setCommands(new ArrayList<String>() {{add(cmd);}});
+ context.setLocalResources(localResourceMap);
+
+ log.debug("Setting localResourceMap to {}", localResourceMap);
+ log.debug("Setting context to {}", context);
+
+ StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class);
+ startContainerRequest.setContainerLaunchContext(context);
+
+ log.info("Making an async start request for container {}", container);
+ nmClientAsync.startContainerAsync(container, context);
+ }
+
+ /**
+ * @param samzaContainerId the Samza container Id for logging purposes.
+ * @param env the Map of environment variables to their respective values.
+ */
+ private void printContainerEnvironmentVariables(String samzaContainerId, Map<String, String> env) {
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<String, String> entry : env.entrySet()) {
+ sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue()));
+ }
+ log.info("Container ID {} using environment variables: {}", samzaContainerId, sb.toString());
+ }
+
+
+ /**
+ * Gets the environment variables from the specified {@link CommandBuilder} and escapes certain characters.
+ *
+ * @param cmdBuilder the command builder containing the environment variables.
+ * @return the map containing the escaped environment variables.
+ */
+ private Map<String, String> getEscapedEnvironmentVariablesMap(CommandBuilder cmdBuilder) {
+ Map<String, String> env = new HashMap<String, String>();
+ for (Map.Entry<String, String> entry : cmdBuilder.buildEnvironment().entrySet()) {
+ String escapedValue = Util.envVarEscape(entry.getValue());
+ env.put(entry.getKey(), escapedValue);
+ }
+ return env;
+ }
+
+
+ private String getFormattedCommand(String logDirExpansionVar,
+ String jobLib,
+ String command,
+ String stdOut,
+ String stdErr) {
+ if (!jobLib.isEmpty()) {
+ jobLib = "&& " + jobLib; // add job's libraries exported to an env variable
+ }
+
+ return String
+ .format("export SAMZA_LOG_DIR=%s %s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s", logDirExpansionVar,
+ jobLib, logDirExpansionVar, command, stdOut, stdErr);
+ }
+
+ /**
+ * Returns the Id of the Samza container that corresponds to the provided Yarn {@link ContainerId}
+ * @param containerId the Yarn ContainerId
+ * @return the id of the Samza container corresponding to the {@link ContainerId} that is pending launch
+ */
+ private String getPendingSamzaContainerId(ContainerId containerId) {
+ for (String samzaContainerId: state.pendingYarnContainers.keySet()) {
+ YarnContainer yarnContainer = state.pendingYarnContainers.get(samzaContainerId);
+ if (yarnContainer.id().equals(containerId)) {
+ return samzaContainerId;
+ }
+ }
+ return null;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/882f61a6/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
deleted file mode 100644
index cdcf2d1..0000000
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.job.yarn;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.samza.clustermanager.SamzaContainerLaunchException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.ShellCommandConfig;
-import org.apache.samza.config.YarnConfig;
-import org.apache.samza.job.CommandBuilder;
-import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * A Helper class to run container processes on Yarn. This encapsulates quite a bit of YarnContainer
- * boiler plate.
- */
-public class YarnContainerRunner {
- private static final Logger log = LoggerFactory.getLogger(YarnContainerRunner.class);
-
- private final Config config;
- private final YarnConfiguration yarnConfiguration;
-
- private final NMClient nmClient;
- private final YarnConfig yarnConfig;
-
- /**
- * Create a new Runner from a Config.
- * @param config to instantiate the runner with
- * @param yarnConfiguration the yarn config for the cluster to connect to.
- */
-
- public YarnContainerRunner(Config config,
- YarnConfiguration yarnConfiguration) {
- this.config = config;
- this.yarnConfiguration = yarnConfiguration;
-
- this.nmClient = NMClient.createNMClient();
- nmClient.init(this.yarnConfiguration);
-
- this.yarnConfig = new YarnConfig(config);
- }
-
- /**
- * Runs a process as specified by the command builder on the container.
- * @param samzaContainerId id of the samza Container to run (passed as a command line parameter to the process)
- * @param container the samza container to run.
- * @param cmdBuilder the command builder that encapsulates the command, and the context
- *
- * @throws SamzaContainerLaunchException when there's an exception in submitting the request to the RM.
- *
- */
- public void runContainer(String samzaContainerId, Container container, CommandBuilder cmdBuilder) throws SamzaContainerLaunchException {
- String containerIdStr = ConverterUtils.toString(container.getId());
- log.info("Got available container ID ({}) for container: {}", samzaContainerId, container);
-
- // check if we have framework path specified. If yes - use it, if not use default ./__package/
- String jobLib = ""; // in case of separate framework, this directory will point at the job's libraries
- String cmdPath = "./__package/";
-
- String fwkPath = JobConfig.getFwkPath(config);
- if(fwkPath != null && (! fwkPath.isEmpty())) {
- cmdPath = fwkPath;
- jobLib = "export JOB_LIB_DIR=./__package/lib";
- }
- log.info("In runContainer in util: fwkPath= " + fwkPath + ";cmdPath=" + cmdPath + ";jobLib=" + jobLib);
- cmdBuilder.setCommandPath(cmdPath);
-
-
- String command = cmdBuilder.buildCommand();
- log.info("Container ID {} using command {}", samzaContainerId, command);
-
- Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
- env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString()));
- printContainerEnvironmentVariables(samzaContainerId, env);
-
- log.info("Samza FWK path: " + command + "; env=" + env);
-
- Path packagePath = new Path(yarnConfig.getPackagePath());
- log.info("Starting container ID {} using package path {}", samzaContainerId, packagePath);
-
- startContainer(
- packagePath,
- container,
- env,
- getFormattedCommand(
- ApplicationConstants.LOG_DIR_EXPANSION_VAR,
- jobLib,
- command,
- ApplicationConstants.STDOUT,
- ApplicationConstants.STDERR)
- );
-
-
- log.info("Claimed container ID {} for container {} on node {} (http://{}/node/containerlogs/{}).",
- new Object[]{
- samzaContainerId,
- containerIdStr,
- container.getNodeId().getHost(),
- container.getNodeHttpAddress(),
- containerIdStr}
- );
-
- log.info("Started container ID {}", samzaContainerId);
- }
-
- /**
- * Runs a command as a process on the container. All binaries needed by the physical process are packaged in the URL
- * specified by packagePath.
- */
- private void startContainer(Path packagePath,
- Container container,
- Map<String, String> env,
- final String cmd) throws SamzaContainerLaunchException {
- log.info("starting container {} {} {} {}",
- new Object[]{packagePath, container, env, cmd});
-
- // TODO: SAMZA-1144 remove the customized approach for package resource and use the common one.
- // But keep it now for backward compatibility.
- // set the local package so that the containers and app master are provisioned with it
- LocalResource packageResource = Records.newRecord(LocalResource.class);
- URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath);
- FileStatus fileStatus;
- try {
- fileStatus = packagePath.getFileSystem(yarnConfiguration).getFileStatus(packagePath);
- } catch (IOException ioe) {
- log.error("IO Exception when accessing the package status from the filesystem", ioe);
- throw new SamzaContainerLaunchException("IO Exception when accessing the package status from the filesystem");
- }
-
- packageResource.setResource(packageUrl);
- log.info("set package Resource in YarnContainerRunner for {}", packageUrl);
- packageResource.setSize(fileStatus.getLen());
- packageResource.setTimestamp(fileStatus.getModificationTime());
- packageResource.setType(LocalResourceType.ARCHIVE);
- packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
-
- ByteBuffer allTokens;
- // copy tokens (copied from dist shell example)
- try {
- Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
- DataOutputBuffer dob = new DataOutputBuffer();
- credentials.writeTokenStorageToStream(dob);
-
- // now remove the AM->RM token so that containers cannot access it
- Iterator iter = credentials.getAllTokens().iterator();
- while (iter.hasNext()) {
- TokenIdentifier token = ((Token) iter.next()).decodeIdentifier();
- if (token != null && token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
- iter.remove();
- }
- }
- allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-
- } catch (IOException ioe) {
- log.error("IOException when writing credentials.", ioe);
- throw new SamzaContainerLaunchException("IO Exception when writing credentials to output buffer");
- }
-
- Map<String, LocalResource> localResourceMap = new HashMap<>();
- localResourceMap.put("__package", packageResource);
-
- // include the resources from the universal resource configurations
- LocalizerResourceMapper resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), yarnConfiguration);
- localResourceMap.putAll(resourceMapper.getResourceMap());
-
- ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
- context.setEnvironment(env);
- context.setTokens(allTokens.duplicate());
- context.setCommands(new ArrayList<String>() {{add(cmd);}});
- context.setLocalResources(localResourceMap);
-
- log.debug("setting localResourceMap to {}", localResourceMap);
- log.debug("setting context to {}", context);
-
- StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class);
- startContainerRequest.setContainerLaunchContext(context);
- try {
- nmClient.startContainer(container, context);
- } catch (YarnException ye) {
- log.error("Received YarnException when starting container: " + container.getId(), ye);
- throw new SamzaContainerLaunchException("Received YarnException when starting container: " + container.getId(), ye);
- } catch (IOException ioe) {
- log.error("Received IOException when starting container: " + container.getId(), ioe);
- throw new SamzaContainerLaunchException("Received IOException when starting container: " + container.getId(), ioe);
- }
- }
-
-
- /**
- * @param samzaContainerId the Samza container Id for logging purposes.
- * @param env the Map of environment variables to their respective values.
- */
- private void printContainerEnvironmentVariables(String samzaContainerId, Map<String, String> env) {
- StringBuilder sb = new StringBuilder();
- for (Map.Entry<String, String> entry : env.entrySet()) {
- sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue()));
- }
- log.info("Container ID {} using environment variables: {}", samzaContainerId, sb.toString());
- }
-
-
- /**
- * Gets the environment variables from the specified {@link CommandBuilder} and escapes certain characters.
- *
- * @param cmdBuilder the command builder containing the environment variables.
- * @return the map containing the escaped environment variables.
- */
- private Map<String, String> getEscapedEnvironmentVariablesMap(CommandBuilder cmdBuilder) {
- Map<String, String> env = new HashMap<String, String>();
- for (Map.Entry<String, String> entry : cmdBuilder.buildEnvironment().entrySet()) {
- String escapedValue = Util.envVarEscape(entry.getValue());
- env.put(entry.getKey(), escapedValue);
- }
- return env;
- }
-
-
- private String getFormattedCommand(String logDirExpansionVar,
- String jobLib,
- String command,
- String stdOut,
- String stdErr) {
- if (!jobLib.isEmpty()) {
- jobLib = "&& " + jobLib; // add job's libraries exported to an env variable
- }
-
- return String
- .format("export SAMZA_LOG_DIR=%s %s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s", logDirExpansionVar,
- jobLib, logDirExpansionVar, command, stdOut, stdErr);
- }
-}