You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2021/11/16 19:49:47 UTC

[samza] branch master updated: [Tests] fix flaky TestContainerAllocatorWithHostAffinity.testExpiredRequestAllocationOnAnyHost (#1559)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 39f6530  [Tests] fix flaky TestContainerAllocatorWithHostAffinity.testExpiredRequestAllocationOnAnyHost (#1559)
39f6530 is described below

commit 39f65300e5dd3c9e3b6498d213383003cbd5b3c7
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Tue Nov 16 11:49:43 2021 -0800

    [Tests] fix flaky TestContainerAllocatorWithHostAffinity.testExpiredRequestAllocationOnAnyHost (#1559)
---
 .../TestContainerAllocatorWithHostAffinity.java    | 33 ++++++++++++++--------
 1 file changed, 22 insertions(+), 11 deletions(-)

diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 2c79bb1..3ec0f14 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableSet;
 import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
@@ -432,27 +433,37 @@ public class TestContainerAllocatorWithHostAffinity {
     // Start the container allocator thread periodic assignment
     spyAllocatorThread.start();
 
-    // Let the request expire, expiration timeout is 3 ms
-    Thread.sleep(1000);
+    // Let the preferred host requests and the follow-up ANY_HOST request expire, expiration timeout is 500 ms
+    Thread.sleep(1500);
 
     // Verify that all the request that were created as preferred host requests expired
     assertTrue(state.preferredHostRequests.get() == 2);
     assertTrue(state.expiredPreferredHostRequests.get() == 2);
-    verify(spyContainerManager, times(1)).handleExpiredRequest(eq("0"), eq("hostname-0"),
+    // expirations for initial preferred host requests
+    verify(spyContainerManager).handleExpiredRequest(eq("0"), eq("hostname-0"),
         any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
-    verify(spyContainerManager, times(1)).handleExpiredRequest(eq("1"), eq("hostname-1"),
+    verify(spyContainerManager).handleExpiredRequest(eq("1"), eq("hostname-1"),
+        any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
+    // expirations for follow-up ANY_HOST requests
+    // allocator keeps running in a loop so it might expire more than once during the wait time
+    verify(spyContainerManager, atLeast(1)).handleExpiredRequest(eq("0"), eq(ResourceRequestState.ANY_HOST),
+        any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
+    verify(spyContainerManager, atLeast(1)).handleExpiredRequest(eq("1"), eq(ResourceRequestState.ANY_HOST),
         any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
 
     // Verify that preferred host request were cancelled and since no surplus resources were available
     // requestResource was invoked with ANY_HOST requests
     ArgumentCaptor<SamzaResourceRequest> cancelledRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
-    // At least 2 preferred host requests were cancelled
-    verify(spyManager, atLeast(2)).cancelResourceRequest(cancelledRequestCaptor.capture());
-    assertTrue(cancelledRequestCaptor.getAllValues().stream().map(resourceRequest -> resourceRequest.getPreferredHost()).collect(
-        Collectors.toSet()).size() > 2);
-    // Check that atleast 2 ANY_HOST requests were made
+    // preferred host requests and ANY_HOST requests got cancelled
+    verify(spyManager, atLeast(4)).cancelResourceRequest(cancelledRequestCaptor.capture());
+    assertEquals(ImmutableSet.of("hostname-0", "hostname-1", ResourceRequestState.ANY_HOST),
+        cancelledRequestCaptor.getAllValues()
+            .stream()
+            .map(SamzaResourceRequest::getPreferredHost)
+            .collect(Collectors.toSet()));
     assertTrue(state.matchedResourceRequests.get() == 0);
-    assertTrue(state.anyHostRequests.get() > 2);
+    // at least 2 for expired preferred host requests, 2 for expired follow-up ANY_HOST requests
+    assertTrue(state.anyHostRequests.get() >= 4);
     spyAllocator.stop();
   }
 
@@ -480,7 +491,7 @@ public class TestContainerAllocatorWithHostAffinity {
     // Start the container allocator thread periodic assignment
     spyAllocatorThread.start();
 
-    // Let the request expire, expiration timeout is 3 ms
+    // Let the request expire, expiration timeout is 500 ms
     Thread.sleep(1000);
 
     // Verify that all the request that were created as preferred host requests expired