You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/03/13 17:11:01 UTC

[samza] branch master updated: Fix flaky tests for Container Placements (#1314)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new c864c4c  Fix flaky tests for Container Placements (#1314)
c864c4c is described below

commit c864c4cc1abcc9a5a88f6b0dbf493a096e40bce6
Author: Sanil Jain <sa...@gmail.com>
AuthorDate: Fri Mar 13 10:10:52 2020 -0700

    Fix flaky tests for Container Placements (#1314)
---
 .../TestContainerPlacementActions.java             | 57 ++++++++++++++++------
 1 file changed, 42 insertions(+), 15 deletions(-)

diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index ebe4c72..18e6cb4 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -94,7 +94,7 @@ public class TestContainerPlacementActions {
   private CoordinatorStreamStore coordinatorStreamStore;
   private ContainerPlacementMetadataStore containerPlacementMetadataStore;
 
-  private SamzaApplicationState state;
+  volatile private SamzaApplicationState state;
   private ContainerManager containerManager;
   private MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity;
   private ContainerProcessManager cpm;
@@ -238,8 +238,17 @@ public class TestContainerPlacementActions {
       fail("timed out waiting for the containers to start");
     }
 
-    while (metadata.getActionStatus() != ContainerPlacementMessage.StatusCode.SUCCEEDED) {
+    Optional<ContainerPlacementResponseMessage> responseMessage =
+        containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+    // Wait for the placement action to be complete & get written to the underlying metastore
+    while (true) {
+      if (metadata.getActionStatus() == ContainerPlacementMessage.StatusCode.SUCCEEDED && responseMessage.isPresent()
+          && responseMessage.get().getStatusCode() == ContainerPlacementMessage.StatusCode.SUCCEEDED) {
+        break;
+      }
       Thread.sleep(100);
+      responseMessage = containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
     }
 
     assertEquals(state.preferredHostRequests.get(), 3);
@@ -249,9 +258,6 @@ public class TestContainerPlacementActions {
     assertEquals(state.anyHostRequests.get(), 0);
     assertEquals(metadata.getActionStatus(), ContainerPlacementMessage.StatusCode.SUCCEEDED);
 
-    Optional<ContainerPlacementResponseMessage> responseMessage =
-        containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
-
     assertTrue(responseMessage.isPresent());
     assertEquals(responseMessage.get().getStatusCode(), ContainerPlacementMessage.StatusCode.SUCCEEDED);
     assertResponseMessage(responseMessage.get(), requestMessage);
@@ -411,8 +417,19 @@ public class TestContainerPlacementActions {
     ContainerPlacementMetadata metadata =
         containerManager.registerContainerPlacementActionForTest(requestMessage, allocatorWithHostAffinity);
 
-    while (metadata.getActionStatus() != ContainerPlacementMessage.StatusCode.FAILED) {
+
+    Optional<ContainerPlacementResponseMessage> responseMessage =
+        containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+    // Wait for the placement action to be complete & get written to the underlying metastore
+    while (true) {
+      if (metadata.getActionStatus() == ContainerPlacementMessage.StatusCode.FAILED
+          && responseMessage.isPresent()
+          && responseMessage.get().getStatusCode() == ContainerPlacementMessage.StatusCode.FAILED) {
+        break;
+      }
       Thread.sleep(100);
+      responseMessage = containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
     }
 
     assertEquals(state.preferredHostRequests.get(), 3);
@@ -422,9 +439,6 @@ public class TestContainerPlacementActions {
     assertEquals(state.runningProcessors.get("1").getHost(), "host-2");
     assertEquals(state.anyHostRequests.get(), 0);
 
-    Optional<ContainerPlacementResponseMessage> responseMessage =
-        containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
-
     assertTrue(responseMessage.isPresent());
     assertEquals(responseMessage.get().getStatusCode(), ContainerPlacementMessage.StatusCode.FAILED);
     assertResponseMessage(responseMessage.get(), requestMessage);
@@ -723,8 +737,16 @@ public class TestContainerPlacementActions {
       fail("timed out waiting for the containers to start");
     }
 
-    while (metadata.getActionStatus() != ContainerPlacementMessage.StatusCode.SUCCEEDED) {
+    Optional<ContainerPlacementResponseMessage> responseMessage =
+        containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+    while (true) {
+      if (metadata.getActionStatus() == ContainerPlacementMessage.StatusCode.SUCCEEDED && responseMessage.isPresent()
+          && responseMessage.get().getStatusCode() == ContainerPlacementMessage.StatusCode.SUCCEEDED) {
+        break;
+      }
       Thread.sleep(100);
+      responseMessage = containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
     }
 
     // We should have no preferred host request
@@ -739,9 +761,6 @@ public class TestContainerPlacementActions {
     // Action should success
     assertEquals(ContainerPlacementMessage.StatusCode.SUCCEEDED, metadata.getActionStatus());
 
-    Optional<ContainerPlacementResponseMessage> responseMessage =
-        containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
-
     assertTrue(responseMessage.isPresent());
     assertEquals(responseMessage.get().getStatusCode(), ContainerPlacementMessage.StatusCode.SUCCEEDED);
     assertResponseMessage(responseMessage.get(), requestMessage);
@@ -967,7 +986,7 @@ public class TestContainerPlacementActions {
   }
 
   private void assertBadRequests(String processorId, String destinationHost, ContainerManager containerManager,
-      ContainerAllocator allocator) {
+      ContainerAllocator allocator) throws InterruptedException {
     ContainerPlacementRequestMessage requestMessage =
         new ContainerPlacementRequestMessage(UUID.randomUUID(), "app-Attemp-001", processorId, destinationHost,
             System.currentTimeMillis());
@@ -978,7 +997,15 @@ public class TestContainerPlacementActions {
     Optional<ContainerPlacementResponseMessage> responseMessage =
         containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
 
-    assertTrue(responseMessage.isPresent());
+    while (true) {
+      if (responseMessage.isPresent()
+          && responseMessage.get().getStatusCode() == ContainerPlacementMessage.StatusCode.BAD_REQUEST) {
+        break;
+      }
+      Thread.sleep(100);
+      responseMessage = containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+    }
+
     assertEquals(responseMessage.get().getStatusCode(), ContainerPlacementMessage.StatusCode.BAD_REQUEST);
     assertResponseMessage(responseMessage.get(), requestMessage);
     // Request shall be deleted as soon as it is acted upon