You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/03/04 22:54:21 UTC

[samza] branch master updated: SAMZA-2475: Add a allocated resource expiry timeout in samza yarn type of apps (#1303)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d5c9ef  SAMZA-2475: Add a allocated resource expiry timeout in samza yarn type of apps (#1303)
9d5c9ef is described below

commit 9d5c9ef87e0c27d35e136998f4ef9f1df561cb4c
Author: mynameborat <bh...@apache.org>
AuthorDate: Wed Mar 4 14:54:12 2020 -0800

    SAMZA-2475: Add a allocated resource expiry timeout in samza yarn type of apps (#1303)
---
 .../clustermanager/ClusterResourceManager.java     |  9 +++++
 .../samza/clustermanager/ContainerAllocator.java   |  8 ++++
 .../samza/clustermanager/ContainerManager.java     | 23 +++++++++++
 .../apache/samza/clustermanager/SamzaResource.java | 18 +++++++++
 .../clustermanager/MockClusterResourceManager.java | 12 ++++++
 .../TestContainerAllocatorWithHostAffinity.java    | 45 ++++++++++++++++++++++
 .../samza/job/yarn/YarnClusterResourceManager.java | 10 +++++
 7 files changed, 125 insertions(+)

diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
index 276bb4c..8ea3c30 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
@@ -132,6 +132,15 @@ public abstract class ClusterResourceManager {
 
   public abstract void stop(SamzaApplicationState.SamzaAppStatus status);
 
+  /**
+   * Checks if the allocated resource is expired. If the {@link ClusterResourceManager} does not have a
+   * concept of expired allocated resource we assume allocated resources never expire
+   * @param resource allocated resource
+   * @return if the allocated resource is expired
+   */
+  public boolean isResourceExpired(SamzaResource resource) {
+    return false;
+  }
 
   /***
    * Defines a callback interface for interacting with notifications from a ClusterResourceManager
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 2e223fc..2661611 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -261,6 +261,14 @@ public class ContainerAllocator implements Runnable {
       throw new SamzaException("Expected resource for Processor ID: " + request.getProcessorId() + " was unavailable on host: " + preferredHost);
     }
 
+    /**
+     * If the allocated resource has expired then release the expired resource and re-request the resources from {@link ClusterResourceManager}
+     */
+    if (clusterResourceManager.isResourceExpired(resource)) {
+      containerManager.handleExpiredResource(request, resource, preferredHost, resourceRequestState, this);
+      return;
+    }
+
     // Update state
     resourceRequestState.updateStateAfterAssignment(request, preferredHost, resource);
     String processorId = request.getProcessorId();
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 3e3a060..7ba97ee 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -305,6 +305,29 @@ public class ContainerManager {
   }
 
   /**
+   * Handles expired allocated resource by requesting the same resource again and release the expired allocated resource
+   *
+   * @param request pending request for the preferred host
+   * @param resource resource allocated from {@link ClusterResourceManager} which has expired
+   * @param preferredHost host on which container is requested to be deployed
+   * @param resourceRequestState state of request in {@link ContainerAllocator}
+   * @param allocator allocator for requesting resources
+   */
+  void handleExpiredResource(SamzaResourceRequest request, SamzaResource resource, String preferredHost,
+      ResourceRequestState resourceRequestState, ContainerAllocator allocator) {
+    LOG.info("Allocated resource {} has expired for Processor ID: {} request: {}. Re-requesting resource again",
+        resource, request.getProcessorId(), request);
+    resourceRequestState.releaseUnstartableContainer(resource, preferredHost);
+    resourceRequestState.cancelResourceRequest(request);
+    SamzaResourceRequest newResourceRequest = allocator.getResourceRequest(request.getProcessorId(), request.getPreferredHost());
+    if (hasActiveContainerPlacementAction(newResourceRequest.getProcessorId())) {
+      ContainerPlacementMetadata metadata = getPlacementActionMetadata(request.getProcessorId()).get();
+      metadata.recordResourceRequest(newResourceRequest);
+    }
+    allocator.issueResourceRequest(newResourceRequest);
+  }
+
+  /**
    * Registers a container placement action to move the running container to destination host, if destination host is same as the
    * host on which container is running, container placement action is treated as a restart.
    *
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
index 4d0bf91..30c0902 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
@@ -19,6 +19,9 @@
 
 package org.apache.samza.clustermanager;
 
+import com.google.common.annotations.VisibleForTesting;
+
+
 /**
  * Specification of a Samza Resource. A resource is identified by a unique resource ID.
  * A resource is currently comprised of CPUs and Memory resources on a host.
@@ -28,6 +31,7 @@ public class SamzaResource {
   private final int memoryMb;
   private final String host;
   private final String containerId;
+  private final long timestamp;
 
   //TODO: Investigate adding disk space. Mesos supports disk based reservations.
 
@@ -36,6 +40,16 @@ public class SamzaResource {
     this.memoryMb = memoryMb;
     this.host = host;
     this.containerId = containerId;
+    this.timestamp = System.currentTimeMillis();
+  }
+
+  @VisibleForTesting
+  SamzaResource(int numCores, int memoryMb, String host, String containerId, long timestamp) {
+    this.numCores = numCores;
+    this.memoryMb = memoryMb;
+    this.host = host;
+    this.containerId = containerId;
+    this.timestamp = timestamp;
   }
 
   @Override
@@ -82,4 +96,8 @@ public class SamzaResource {
   public String getContainerId() {
     return containerId;
   }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
index d50ce59..e4be156 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
@@ -20,6 +20,7 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.collect.ImmutableList;
+import java.time.Duration;
 import org.apache.samza.job.CommandBuilder;
 import org.junit.Assert;
 
@@ -108,6 +109,13 @@ public class MockClusterResourceManager extends ClusterResourceManager {
     clusterManagerCallback.onResourcesCompleted(statList);
   }
 
+  @Override
+  public boolean isResourceExpired(SamzaResource resource) {
+    Duration yarnAllocatedResourceExpiry = Duration.ofMinutes(10).minus(Duration.ofSeconds(30));
+    return System.currentTimeMillis() - resource.getTimestamp() > yarnAllocatedResourceExpiry.toMillis();
+  }
+
+
   public void registerContainerListener(MockContainerListener listener) {
     mockContainerListeners.add(listener);
   }
@@ -116,6 +124,10 @@ public class MockClusterResourceManager extends ClusterResourceManager {
     mockContainerListeners.clear();
   }
 
+  public boolean containsReleasedResource(SamzaResource resource) {
+    return releasedResources.contains(resource);
+  }
+
   @Override
   public void stop(SamzaApplicationState.SamzaAppStatus status) {
 
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index bf2eabf..593ddb9 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -511,6 +511,51 @@ public class TestContainerAllocatorWithHostAffinity {
     containerAllocator.stop();
   }
 
+  @Test(timeout = 5000)
+  public void testExpiredAllocatedResourcesAreReleased() throws Exception {
+    ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
+    MockClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
+    ContainerManager spyContainerManager =
+        spy(new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false));
+
+    SamzaResource expiredAllocatedResource = new SamzaResource(1, 1000, "host-0", "id0",
+        System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
+    spyAllocator = Mockito.spy(
+        new ContainerAllocator(mockClusterResourceManager, config, state, true, spyContainerManager));
+    spyAllocator.addResource(expiredAllocatedResource);
+    spyAllocator.addResource(new SamzaResource(1, 1000, "host-1", "1d1"));
+
+    // Request Preferred Resources
+    spyAllocator.requestResources(new HashMap<String, String>() {
+      {
+        put("0", "host-0");
+        put("1", "host-1");
+      }
+    });
+
+    spyAllocatorThread = new Thread(spyAllocator);
+    // Start the container allocator thread periodic assignment
+    spyAllocatorThread.start();
+
+    // Wait until allocated resource is expired
+    while (state.preferredHostRequests.get() != 3) {
+      Thread.sleep(100);
+    }
+
+    // Verify that handleExpiredResource was invoked once for expired allocated resource
+    ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
+    ArgumentCaptor<SamzaResource> resourceArgumentCaptor = ArgumentCaptor.forClass(SamzaResource.class);
+    verify(spyContainerManager, times(1)).handleExpiredResource(resourceRequestCaptor.capture(),
+        resourceArgumentCaptor.capture(), eq("host-0"), any(), any());
+    resourceRequestCaptor.getAllValues()
+        .forEach(resourceRequest -> assertEquals(resourceRequest.getProcessorId(), "0"));
+    resourceArgumentCaptor.getAllValues()
+        .forEach(resource -> assertEquals(resource.getHost(), "host-0"));
+    // Verify resources were released
+    assertTrue(mockClusterResourceManager.containsReleasedResource(expiredAllocatedResource));
+    containerAllocator.stop();
+  }
+
   //@Test
   public void testExpiryWithNonResponsiveClusterManager() throws Exception {
 
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 43c49cc..8d23e04 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.job.yarn;
 
+import java.time.Duration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -575,6 +576,15 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     }
   }
 
+  @Override
+  public boolean isResourceExpired(SamzaResource resource) {
+    // Time from which resource was allocated > Yarn Expiry Timeout - 30 sec (to account for clock skew)
+    Duration yarnAllocatedResourceExpiry =
+        Duration.ofMinutes(YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS)
+            .minus(Duration.ofSeconds(30));
+    return System.currentTimeMillis() - resource.getTimestamp() > yarnAllocatedResourceExpiry.toMillis();
+  }
+
   /**
    * Runs a process as specified by the command builder on the container.
    * @param processorId id of the samza processor to run (passed as a command line parameter to the process)