You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/03/04 22:54:21 UTC
[samza] branch master updated: SAMZA-2475: Add a allocated resource
expiry timeout in samza yarn type of apps (#1303)
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 9d5c9ef SAMZA-2475: Add a allocated resource expiry timeout in samza yarn type of apps (#1303)
9d5c9ef is described below
commit 9d5c9ef87e0c27d35e136998f4ef9f1df561cb4c
Author: mynameborat <bh...@apache.org>
AuthorDate: Wed Mar 4 14:54:12 2020 -0800
SAMZA-2475: Add a allocated resource expiry timeout in samza yarn type of apps (#1303)
---
.../clustermanager/ClusterResourceManager.java | 9 +++++
.../samza/clustermanager/ContainerAllocator.java | 8 ++++
.../samza/clustermanager/ContainerManager.java | 23 +++++++++++
.../apache/samza/clustermanager/SamzaResource.java | 18 +++++++++
.../clustermanager/MockClusterResourceManager.java | 12 ++++++
.../TestContainerAllocatorWithHostAffinity.java | 45 ++++++++++++++++++++++
.../samza/job/yarn/YarnClusterResourceManager.java | 10 +++++
7 files changed, 125 insertions(+)
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
index 276bb4c..8ea3c30 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
@@ -132,6 +132,15 @@ public abstract class ClusterResourceManager {
public abstract void stop(SamzaApplicationState.SamzaAppStatus status);
+ /**
+ * Checks if the allocated resource is expired. If the {@link ClusterResourceManager} does not have a
+ * concept of expired allocated resource we assume allocated resources never expire
+ * @param resource allocated resource
+ * @return if the allocated resource is expired
+ */
+ public boolean isResourceExpired(SamzaResource resource) {
+ return false;
+ }
/***
* Defines a callback interface for interacting with notifications from a ClusterResourceManager
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 2e223fc..2661611 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -261,6 +261,14 @@ public class ContainerAllocator implements Runnable {
throw new SamzaException("Expected resource for Processor ID: " + request.getProcessorId() + " was unavailable on host: " + preferredHost);
}
+ /**
+ * If the allocated resource has expired then release the expired resource and re-request the resources from {@link ClusterResourceManager}
+ */
+ if (clusterResourceManager.isResourceExpired(resource)) {
+ containerManager.handleExpiredResource(request, resource, preferredHost, resourceRequestState, this);
+ return;
+ }
+
// Update state
resourceRequestState.updateStateAfterAssignment(request, preferredHost, resource);
String processorId = request.getProcessorId();
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 3e3a060..7ba97ee 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -305,6 +305,29 @@ public class ContainerManager {
}
/**
+ * Handles expired allocated resource by requesting the same resource again and release the expired allocated resource
+ *
+ * @param request pending request for the preferred host
+ * @param resource resource allocated from {@link ClusterResourceManager} which has expired
+ * @param preferredHost host on which container is requested to be deployed
+ * @param resourceRequestState state of request in {@link ContainerAllocator}
+ * @param allocator allocator for requesting resources
+ */
+ void handleExpiredResource(SamzaResourceRequest request, SamzaResource resource, String preferredHost,
+ ResourceRequestState resourceRequestState, ContainerAllocator allocator) {
+ LOG.info("Allocated resource {} has expired for Processor ID: {} request: {}. Re-requesting resource again",
+ resource, request.getProcessorId(), request);
+ resourceRequestState.releaseUnstartableContainer(resource, preferredHost);
+ resourceRequestState.cancelResourceRequest(request);
+ SamzaResourceRequest newResourceRequest = allocator.getResourceRequest(request.getProcessorId(), request.getPreferredHost());
+ if (hasActiveContainerPlacementAction(newResourceRequest.getProcessorId())) {
+ ContainerPlacementMetadata metadata = getPlacementActionMetadata(request.getProcessorId()).get();
+ metadata.recordResourceRequest(newResourceRequest);
+ }
+ allocator.issueResourceRequest(newResourceRequest);
+ }
+
+ /**
* Registers a container placement action to move the running container to destination host, if destination host is same as the
* host on which container is running, container placement action is treated as a restart.
*
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
index 4d0bf91..30c0902 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
@@ -19,6 +19,9 @@
package org.apache.samza.clustermanager;
+import com.google.common.annotations.VisibleForTesting;
+
+
/**
* Specification of a Samza Resource. A resource is identified by a unique resource ID.
* A resource is currently comprised of CPUs and Memory resources on a host.
@@ -28,6 +31,7 @@ public class SamzaResource {
private final int memoryMb;
private final String host;
private final String containerId;
+ private final long timestamp;
//TODO: Investigate adding disk space. Mesos supports disk based reservations.
@@ -36,6 +40,16 @@ public class SamzaResource {
this.memoryMb = memoryMb;
this.host = host;
this.containerId = containerId;
+ this.timestamp = System.currentTimeMillis();
+ }
+
+ @VisibleForTesting
+ SamzaResource(int numCores, int memoryMb, String host, String containerId, long timestamp) {
+ this.numCores = numCores;
+ this.memoryMb = memoryMb;
+ this.host = host;
+ this.containerId = containerId;
+ this.timestamp = timestamp;
}
@Override
@@ -82,4 +96,8 @@ public class SamzaResource {
public String getContainerId() {
return containerId;
}
+
+ public long getTimestamp() {
+ return timestamp;
+ }
}
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
index d50ce59..e4be156 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
@@ -20,6 +20,7 @@
package org.apache.samza.clustermanager;
import com.google.common.collect.ImmutableList;
+import java.time.Duration;
import org.apache.samza.job.CommandBuilder;
import org.junit.Assert;
@@ -108,6 +109,13 @@ public class MockClusterResourceManager extends ClusterResourceManager {
clusterManagerCallback.onResourcesCompleted(statList);
}
+ @Override
+ public boolean isResourceExpired(SamzaResource resource) {
+ Duration yarnAllocatedResourceExpiry = Duration.ofMinutes(10).minus(Duration.ofSeconds(30));
+ return System.currentTimeMillis() - resource.getTimestamp() > yarnAllocatedResourceExpiry.toMillis();
+ }
+
+
public void registerContainerListener(MockContainerListener listener) {
mockContainerListeners.add(listener);
}
@@ -116,6 +124,10 @@ public class MockClusterResourceManager extends ClusterResourceManager {
mockContainerListeners.clear();
}
+ public boolean containsReleasedResource(SamzaResource resource) {
+ return releasedResources.contains(resource);
+ }
+
@Override
public void stop(SamzaApplicationState.SamzaAppStatus status) {
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index bf2eabf..593ddb9 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -511,6 +511,51 @@ public class TestContainerAllocatorWithHostAffinity {
containerAllocator.stop();
}
+ @Test(timeout = 5000)
+ public void testExpiredAllocatedResourcesAreReleased() throws Exception {
+ ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
+ MockClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
+ ContainerManager spyContainerManager =
+ spy(new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false));
+
+ SamzaResource expiredAllocatedResource = new SamzaResource(1, 1000, "host-0", "id0",
+ System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
+ spyAllocator = Mockito.spy(
+ new ContainerAllocator(mockClusterResourceManager, config, state, true, spyContainerManager));
+ spyAllocator.addResource(expiredAllocatedResource);
+ spyAllocator.addResource(new SamzaResource(1, 1000, "host-1", "1d1"));
+
+ // Request Preferred Resources
+ spyAllocator.requestResources(new HashMap<String, String>() {
+ {
+ put("0", "host-0");
+ put("1", "host-1");
+ }
+ });
+
+ spyAllocatorThread = new Thread(spyAllocator);
+ // Start the container allocator thread periodic assignment
+ spyAllocatorThread.start();
+
+ // Wait until allocated resource is expired
+ while (state.preferredHostRequests.get() != 3) {
+ Thread.sleep(100);
+ }
+
+ // Verify that handleExpiredResource was invoked once for expired allocated resource
+ ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
+ ArgumentCaptor<SamzaResource> resourceArgumentCaptor = ArgumentCaptor.forClass(SamzaResource.class);
+ verify(spyContainerManager, times(1)).handleExpiredResource(resourceRequestCaptor.capture(),
+ resourceArgumentCaptor.capture(), eq("host-0"), any(), any());
+ resourceRequestCaptor.getAllValues()
+ .forEach(resourceRequest -> assertEquals(resourceRequest.getProcessorId(), "0"));
+ resourceArgumentCaptor.getAllValues()
+ .forEach(resource -> assertEquals(resource.getHost(), "host-0"));
+ // Verify resources were released
+ assertTrue(mockClusterResourceManager.containsReleasedResource(expiredAllocatedResource));
+ containerAllocator.stop();
+ }
+
//@Test
public void testExpiryWithNonResponsiveClusterManager() throws Exception {
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 43c49cc..8d23e04 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -19,6 +19,7 @@
package org.apache.samza.job.yarn;
+import java.time.Duration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -575,6 +576,15 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
}
}
+ @Override
+ public boolean isResourceExpired(SamzaResource resource) {
+ // Time from which resource was allocated > Yarn Expiry Timeout - 30 sec (to account for clock skew)
+ Duration yarnAllocatedResourceExpiry =
+ Duration.ofMinutes(YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS)
+ .minus(Duration.ofSeconds(30));
+ return System.currentTimeMillis() - resource.getTimestamp() > yarnAllocatedResourceExpiry.toMillis();
+ }
+
/**
* Runs a process as specified by the command builder on the container.
* @param processorId id of the samza processor to run (passed as a command line parameter to the process)