You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/06/08 22:35:34 UTC

[samza] branch 1.5.0 updated: SAMZA-2504: Improve Container Placement Flaky Test & Running Time

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch 1.5.0
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/1.5.0 by this push:
     new 6fe7efd  SAMZA-2504: Improve Container Placement Flaky Test & Running Time
6fe7efd is described below

commit 6fe7efd7d2e186a266d3c04395d94db001c1f7f8
Author: Sanil15 <sa...@gmail.com>
AuthorDate: Mon Jun 8 15:35:03 2020 -0700

    SAMZA-2504: Improve Container Placement Flaky Test & Running Time
    
    Improvement [Bug fix]:
    
    - Fix a flaky test for Container Placements on Request status
    - Improve the running time of Test suite from 40 secs to under 4 seconds
    
    API changes: None
    
    Upgrade Instructions: None
    
    Usage Instructions: None
    
    Author: Sanil15 <sa...@gmail.com>
    
    Reviewers: mynameborat <bh...@apache.org>
    
    Closes #1376 from Sanil15/SAMZA-2504
    
    (cherry picked from commit bcee407801d5966015009e9c1d0337ce7e13fc96)
    Signed-off-by: mynameborat <bh...@gmail.com>
---
 .../ContainerPlacementRequestAllocator.java        | 24 ++++++++++++++++++++--
 .../TestContainerPlacementActions.java             | 22 ++++++++++++--------
 2 files changed, 35 insertions(+), 11 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
index 5161cfb..7eb6175 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.clustermanager.container.placement;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.samza.clustermanager.ContainerProcessManager;
 import org.apache.samza.config.ApplicationConfig;
@@ -50,7 +51,10 @@ public class ContainerPlacementRequestAllocator implements Runnable {
    * RunId of the app
    */
   private final String appRunId;
-
+  /**
+   * Sleep time for container placement handler thread
+   */
+  private final int containerPlacementHandlerSleepMs;
   public ContainerPlacementRequestAllocator(ContainerPlacementMetadataStore containerPlacementMetadataStore, ContainerProcessManager manager, ApplicationConfig config) {
     Preconditions.checkNotNull(containerPlacementMetadataStore, "containerPlacementMetadataStore cannot be null");
     Preconditions.checkNotNull(manager, "ContainerProcessManager cannot be null");
@@ -58,6 +62,22 @@ public class ContainerPlacementRequestAllocator implements Runnable {
     this.containerPlacementMetadataStore = containerPlacementMetadataStore;
     this.isRunning = true;
     this.appRunId = config.getRunId();
+    this.containerPlacementHandlerSleepMs = DEFAULT_CLUSTER_MANAGER_CONTAINER_PLACEMENT_HANDLER_SLEEP_MS;
+  }
+
+  @VisibleForTesting
+  /**
+   * Should only get used for testing, cannot make it package private because end to end integeration test
+   * need package private methods which live in org.apache.samza.clustermanager
+   */
+  public ContainerPlacementRequestAllocator(ContainerPlacementMetadataStore containerPlacementMetadataStore, ContainerProcessManager manager, ApplicationConfig config, int containerPlacementHandlerSleepMs) {
+    Preconditions.checkNotNull(containerPlacementMetadataStore, "containerPlacementMetadataStore cannot be null");
+    Preconditions.checkNotNull(manager, "ContainerProcessManager cannot be null");
+    this.containerProcessManager = manager;
+    this.containerPlacementMetadataStore = containerPlacementMetadataStore;
+    this.isRunning = true;
+    this.appRunId = config.getRunId();
+    this.containerPlacementHandlerSleepMs = containerPlacementHandlerSleepMs;
   }
 
   @Override
@@ -75,7 +95,7 @@ public class ContainerPlacementRequestAllocator implements Runnable {
             containerPlacementMetadataStore.deleteAllContainerPlacementMessages(message.getUuid());
           }
         }
-        Thread.sleep(DEFAULT_CLUSTER_MANAGER_CONTAINER_PLACEMENT_HANDLER_SLEEP_MS);
+        Thread.sleep(containerPlacementHandlerSleepMs);
       } catch (InterruptedException e) {
         LOG.warn("Got InterruptedException in ContainerPlacementRequestAllocator thread.", e);
         Thread.currentThread().interrupt();
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index 87bf85e..49b013d 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -64,6 +64,10 @@ import static org.mockito.Mockito.*;
 
 /**
  * Set of Integration tests for container placement actions
+ *
+ * Please note that semaphores are used wherever possible, there are some Thread.sleep used for the main thread to check
+ * on state changes to atomic variables or synchroized metadata objects because of difficulty of plugging semaphores to
+ * those pieces of logic
  */
 @RunWith(MockitoJUnitRunner.class)
 public class TestContainerPlacementActions {
@@ -275,7 +279,7 @@ public class TestContainerPlacementActions {
   public void testActionQueuingForConsecutivePlacementActions() throws Exception {
     // Spawn a Request Allocator Thread
     ContainerPlacementRequestAllocator requestAllocator =
-        new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config));
+        new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config), 100);
     Thread requestAllocatorThread = new Thread(requestAllocator, "ContainerPlacement Request Allocator Thread");
 
     requestAllocatorThread.start();
@@ -345,7 +349,7 @@ public class TestContainerPlacementActions {
               == ContainerPlacementMessage.StatusCode.SUCCEEDED) {
         break;
       }
-      Thread.sleep(Duration.ofSeconds(5).toMillis());
+      Thread.sleep(100);
     }
 
     assertEquals(state.preferredHostRequests.get(), 4);
@@ -647,8 +651,9 @@ public class TestContainerPlacementActions {
       fail("timed out waiting for the containers to start");
     }
 
-    // Wait for both the containers to be in running state
-    while (state.runningProcessors.size() != 2) {
+    // Wait for both the containers to be in running state & control action metadata to succeed
+    while (state.runningProcessors.size() != 2
+        && metadata.getActionStatus() != ContainerPlacementMessage.StatusCode.SUCCEEDED) {
       Thread.sleep(100);
     }
 
@@ -660,8 +665,6 @@ public class TestContainerPlacementActions {
     assertEquals(state.anyHostRequests.get(), 0);
     // Failed processors must be empty
     assertEquals(state.failedProcessors.size(), 0);
-    // Control Action should be success in this case
-    assertEquals(metadata.getActionStatus(), ContainerPlacementMessage.StatusCode.SUCCEEDED);
   }
 
   @Test(timeout = 10000)
@@ -850,8 +853,9 @@ public class TestContainerPlacementActions {
 
     // Spawn a Request Allocator Thread
     ContainerPlacementRequestAllocator requestAllocator =
-        new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config));
+        new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config), 100);
     Thread requestAllocatorThread = new Thread(requestAllocator, "ContainerPlacement Request Allocator Thread");
+
     requestAllocatorThread.start();
 
     doAnswer(new Answer<Void>() {
@@ -923,7 +927,7 @@ public class TestContainerPlacementActions {
               == ContainerPlacementMessage.StatusCode.BAD_REQUEST) {
         break;
       }
-      Thread.sleep(Duration.ofSeconds(5).toMillis());
+      Thread.sleep(100);
     }
 
     // App running state should remain the same
@@ -960,7 +964,7 @@ public class TestContainerPlacementActions {
               == ContainerPlacementMessage.StatusCode.SUCCEEDED) {
         break;
       }
-      Thread.sleep(Duration.ofSeconds(5).toMillis());
+      Thread.sleep(100);
     }
 
     assertEquals(4, state.runningProcessors.size());