You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/03/11 17:33:02 UTC
[samza] branch master updated: SAMZA-2475: Fix allocated resource
expiry bug (#1310)
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new b8309aa SAMZA-2475: Fix allocated resource expiry bug (#1310)
b8309aa is described below
commit b8309aa1950ad393cdf54063c2727d78d4f9d0bc
Author: Sanil Jain <sa...@gmail.com>
AuthorDate: Wed Mar 11 10:32:49 2020 -0700
SAMZA-2475: Fix allocated resource expiry bug (#1310)
---
.../samza/job/yarn/YarnClusterResourceManager.java | 2 +-
.../job/yarn/TestYarnClusterResourceManager.java | 25 ++++++++++++++++++++++
2 files changed, 26 insertions(+), 1 deletion(-)
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 8d23e04..eb97b69 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -580,7 +580,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
public boolean isResourceExpired(SamzaResource resource) {
// Time from which resource was allocated > Yarn Expiry Timeout - 30 sec (to account for clock skew)
Duration yarnAllocatedResourceExpiry =
- Duration.ofMinutes(YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS)
+ Duration.ofMillis(YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS)
.minus(Duration.ofSeconds(30));
return System.currentTimeMillis() - resource.getTimestamp() > yarnAllocatedResourceExpiry.toMillis();
}
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
index 042c91c..8f19eab 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
@@ -19,6 +19,7 @@
package org.apache.samza.job.yarn;
+import java.time.Duration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -31,7 +32,9 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.samza.clustermanager.ClusterResourceManager;
+import org.apache.samza.clustermanager.SamzaResource;
import org.apache.samza.config.Config;
+import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -78,4 +81,26 @@ public class TestYarnClusterResourceManager {
assertEquals(0, yarnAppState.pendingProcessors.size());
verify(callback, times(1)).onStreamProcessorLaunchFailure(anyObject(), any(Exception.class));
}
+
+ @Test
+ public void testAllocatedResourceExpiryForYarn() {
+ YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
+ SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
+ Config config = mock(Config.class);
+ AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
+ YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081);
+ SamzaYarnAppMasterLifecycle lifecycle = mock(SamzaYarnAppMasterLifecycle.class);
+ SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
+ NMClientAsync asyncNMClient = mock(NMClientAsync.class);
+ ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
+
+ // start the cluster manager
+ YarnClusterResourceManager yarnClusterResourceManager = new YarnClusterResourceManager(asyncClient, asyncNMClient,
+ callback, yarnAppState, lifecycle, service, metrics, yarnConfiguration, config);
+
+ SamzaResource allocatedResource = mock(SamzaResource.class);
+ when(allocatedResource.getTimestamp()).thenReturn(System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
+
+ Assert.assertTrue(yarnClusterResourceManager.isResourceExpired(allocatedResource));
+ }
}
\ No newline at end of file