You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/03/11 17:33:02 UTC

[samza] branch master updated: SAMZA-2475: Fix allocated resource expiry bug (#1310)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new b8309aa  SAMZA-2475: Fix allocated resource expiry bug (#1310)
b8309aa is described below

commit b8309aa1950ad393cdf54063c2727d78d4f9d0bc
Author: Sanil Jain <sa...@gmail.com>
AuthorDate: Wed Mar 11 10:32:49 2020 -0700

    SAMZA-2475: Fix allocated resource expiry bug (#1310)
---
 .../samza/job/yarn/YarnClusterResourceManager.java |  2 +-
 .../job/yarn/TestYarnClusterResourceManager.java   | 25 ++++++++++++++++++++++
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 8d23e04..eb97b69 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -580,7 +580,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
   public boolean isResourceExpired(SamzaResource resource) {
     // Time from which resource was allocated > Yarn Expiry Timeout - 30 sec (to account for clock skew)
     Duration yarnAllocatedResourceExpiry =
-        Duration.ofMinutes(YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS)
+        Duration.ofMillis(YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS)
             .minus(Duration.ofSeconds(30));
     return System.currentTimeMillis() - resource.getTimestamp() > yarnAllocatedResourceExpiry.toMillis();
   }
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
index 042c91c..8f19eab 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.job.yarn;
 
+import java.time.Duration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -31,7 +32,9 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.samza.clustermanager.ClusterResourceManager;
+import org.apache.samza.clustermanager.SamzaResource;
 import org.apache.samza.config.Config;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -78,4 +81,26 @@ public class TestYarnClusterResourceManager {
     assertEquals(0, yarnAppState.pendingProcessors.size());
     verify(callback, times(1)).onStreamProcessorLaunchFailure(anyObject(), any(Exception.class));
   }
+
+  @Test
+  public void testAllocatedResourceExpiryForYarn() {
+    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
+    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
+    Config config = mock(Config.class);
+    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
+    YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081);
+    SamzaYarnAppMasterLifecycle lifecycle = mock(SamzaYarnAppMasterLifecycle.class);
+    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
+    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
+    ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
+
+    // start the cluster manager
+    YarnClusterResourceManager yarnClusterResourceManager = new YarnClusterResourceManager(asyncClient, asyncNMClient,
+        callback, yarnAppState, lifecycle, service, metrics, yarnConfiguration, config);
+
+    SamzaResource allocatedResource = mock(SamzaResource.class);
+    when(allocatedResource.getTimestamp()).thenReturn(System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
+
+    Assert.assertTrue(yarnClusterResourceManager.isResourceExpired(allocatedResource));
+  }
 }
\ No newline at end of file