You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/03/13 17:11:01 UTC
[samza] branch master updated: Fix flaky tests for Container
Placements (#1314)
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new c864c4c Fix flaky tests for Container Placements (#1314)
c864c4c is described below
commit c864c4cc1abcc9a5a88f6b0dbf493a096e40bce6
Author: Sanil Jain <sa...@gmail.com>
AuthorDate: Fri Mar 13 10:10:52 2020 -0700
Fix flaky tests for Container Placements (#1314)
---
.../TestContainerPlacementActions.java | 57 ++++++++++++++++------
1 file changed, 42 insertions(+), 15 deletions(-)
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index ebe4c72..18e6cb4 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -94,7 +94,7 @@ public class TestContainerPlacementActions {
private CoordinatorStreamStore coordinatorStreamStore;
private ContainerPlacementMetadataStore containerPlacementMetadataStore;
- private SamzaApplicationState state;
+ volatile private SamzaApplicationState state;
private ContainerManager containerManager;
private MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity;
private ContainerProcessManager cpm;
@@ -238,8 +238,17 @@ public class TestContainerPlacementActions {
fail("timed out waiting for the containers to start");
}
- while (metadata.getActionStatus() != ContainerPlacementMessage.StatusCode.SUCCEEDED) {
+ Optional<ContainerPlacementResponseMessage> responseMessage =
+ containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+ // Wait for the placement action to be complete & get written to the underlying metastore
+ while (true) {
+ if (metadata.getActionStatus() == ContainerPlacementMessage.StatusCode.SUCCEEDED && responseMessage.isPresent()
+ && responseMessage.get().getStatusCode() == ContainerPlacementMessage.StatusCode.SUCCEEDED) {
+ break;
+ }
Thread.sleep(100);
+ responseMessage = containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
}
assertEquals(state.preferredHostRequests.get(), 3);
@@ -249,9 +258,6 @@ public class TestContainerPlacementActions {
assertEquals(state.anyHostRequests.get(), 0);
assertEquals(metadata.getActionStatus(), ContainerPlacementMessage.StatusCode.SUCCEEDED);
- Optional<ContainerPlacementResponseMessage> responseMessage =
- containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
-
assertTrue(responseMessage.isPresent());
assertEquals(responseMessage.get().getStatusCode(), ContainerPlacementMessage.StatusCode.SUCCEEDED);
assertResponseMessage(responseMessage.get(), requestMessage);
@@ -411,8 +417,19 @@ public class TestContainerPlacementActions {
ContainerPlacementMetadata metadata =
containerManager.registerContainerPlacementActionForTest(requestMessage, allocatorWithHostAffinity);
- while (metadata.getActionStatus() != ContainerPlacementMessage.StatusCode.FAILED) {
+
+ Optional<ContainerPlacementResponseMessage> responseMessage =
+ containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+ // Wait for the placement action to be complete & get written to the underlying metastore
+ while (true) {
+ if (metadata.getActionStatus() == ContainerPlacementMessage.StatusCode.FAILED
+ && responseMessage.isPresent()
+ && responseMessage.get().getStatusCode() == ContainerPlacementMessage.StatusCode.FAILED) {
+ break;
+ }
Thread.sleep(100);
+ responseMessage = containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
}
assertEquals(state.preferredHostRequests.get(), 3);
@@ -422,9 +439,6 @@ public class TestContainerPlacementActions {
assertEquals(state.runningProcessors.get("1").getHost(), "host-2");
assertEquals(state.anyHostRequests.get(), 0);
- Optional<ContainerPlacementResponseMessage> responseMessage =
- containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
-
assertTrue(responseMessage.isPresent());
assertEquals(responseMessage.get().getStatusCode(), ContainerPlacementMessage.StatusCode.FAILED);
assertResponseMessage(responseMessage.get(), requestMessage);
@@ -723,8 +737,16 @@ public class TestContainerPlacementActions {
fail("timed out waiting for the containers to start");
}
- while (metadata.getActionStatus() != ContainerPlacementMessage.StatusCode.SUCCEEDED) {
+ Optional<ContainerPlacementResponseMessage> responseMessage =
+ containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+ while (true) {
+ if (metadata.getActionStatus() == ContainerPlacementMessage.StatusCode.SUCCEEDED && responseMessage.isPresent()
+ && responseMessage.get().getStatusCode() == ContainerPlacementMessage.StatusCode.SUCCEEDED) {
+ break;
+ }
Thread.sleep(100);
+ responseMessage = containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
}
// We should have no preferred host request
@@ -739,9 +761,6 @@ public class TestContainerPlacementActions {
// Action should success
assertEquals(ContainerPlacementMessage.StatusCode.SUCCEEDED, metadata.getActionStatus());
- Optional<ContainerPlacementResponseMessage> responseMessage =
- containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
-
assertTrue(responseMessage.isPresent());
assertEquals(responseMessage.get().getStatusCode(), ContainerPlacementMessage.StatusCode.SUCCEEDED);
assertResponseMessage(responseMessage.get(), requestMessage);
@@ -967,7 +986,7 @@ public class TestContainerPlacementActions {
}
private void assertBadRequests(String processorId, String destinationHost, ContainerManager containerManager,
- ContainerAllocator allocator) {
+ ContainerAllocator allocator) throws InterruptedException {
ContainerPlacementRequestMessage requestMessage =
new ContainerPlacementRequestMessage(UUID.randomUUID(), "app-Attemp-001", processorId, destinationHost,
System.currentTimeMillis());
@@ -978,7 +997,15 @@ public class TestContainerPlacementActions {
Optional<ContainerPlacementResponseMessage> responseMessage =
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
- assertTrue(responseMessage.isPresent());
+ while (true) {
+ if (responseMessage.isPresent()
+ && responseMessage.get().getStatusCode() == ContainerPlacementMessage.StatusCode.BAD_REQUEST) {
+ break;
+ }
+ Thread.sleep(100);
+ responseMessage = containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+ }
+
assertEquals(responseMessage.get().getStatusCode(), ContainerPlacementMessage.StatusCode.BAD_REQUEST);
assertResponseMessage(responseMessage.get(), requestMessage);
// Request shall be deleted as soon as it is acted upon