You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2021/11/16 19:49:47 UTC
[samza] branch master updated: [Tests] fix flaky TestContainerAllocatorWithHostAffinity.testExpiredRequestAllocationOnAnyHost (#1559)
This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 39f6530 [Tests] fix flaky TestContainerAllocatorWithHostAffinity.testExpiredRequestAllocationOnAnyHost (#1559)
39f6530 is described below
commit 39f65300e5dd3c9e3b6498d213383003cbd5b3c7
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Tue Nov 16 11:49:43 2021 -0800
[Tests] fix flaky TestContainerAllocatorWithHostAffinity.testExpiredRequestAllocationOnAnyHost (#1559)
---
.../TestContainerAllocatorWithHostAffinity.java | 33 ++++++++++++++--------
1 file changed, 22 insertions(+), 11 deletions(-)
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 2c79bb1..3ec0f14 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableSet;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
@@ -432,27 +433,37 @@ public class TestContainerAllocatorWithHostAffinity {
// Start the container allocator thread periodic assignment
spyAllocatorThread.start();
- // Let the request expire, expiration timeout is 3 ms
- Thread.sleep(1000);
+ // Let the preferred host requests and the follow-up ANY_HOST request expire, expiration timeout is 500 ms
+ Thread.sleep(1500);
// Verify that all the request that were created as preferred host requests expired
assertTrue(state.preferredHostRequests.get() == 2);
assertTrue(state.expiredPreferredHostRequests.get() == 2);
- verify(spyContainerManager, times(1)).handleExpiredRequest(eq("0"), eq("hostname-0"),
+ // expirations for initial preferred host requests
+ verify(spyContainerManager).handleExpiredRequest(eq("0"), eq("hostname-0"),
any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
- verify(spyContainerManager, times(1)).handleExpiredRequest(eq("1"), eq("hostname-1"),
+ verify(spyContainerManager).handleExpiredRequest(eq("1"), eq("hostname-1"),
+ any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
+ // expirations for follow-up ANY_HOST requests
+ // allocator keeps running in a loop so it might expire more than once during the wait time
+ verify(spyContainerManager, atLeast(1)).handleExpiredRequest(eq("0"), eq(ResourceRequestState.ANY_HOST),
+ any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
+ verify(spyContainerManager, atLeast(1)).handleExpiredRequest(eq("1"), eq(ResourceRequestState.ANY_HOST),
any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
// Verify that preferred host request were cancelled and since no surplus resources were available
// requestResource was invoked with ANY_HOST requests
ArgumentCaptor<SamzaResourceRequest> cancelledRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
- // At least 2 preferred host requests were cancelled
- verify(spyManager, atLeast(2)).cancelResourceRequest(cancelledRequestCaptor.capture());
- assertTrue(cancelledRequestCaptor.getAllValues().stream().map(resourceRequest -> resourceRequest.getPreferredHost()).collect(
- Collectors.toSet()).size() > 2);
- // Check that atleast 2 ANY_HOST requests were made
+ // preferred host requests and ANY_HOST requests got cancelled
+ verify(spyManager, atLeast(4)).cancelResourceRequest(cancelledRequestCaptor.capture());
+ assertEquals(ImmutableSet.of("hostname-0", "hostname-1", ResourceRequestState.ANY_HOST),
+ cancelledRequestCaptor.getAllValues()
+ .stream()
+ .map(SamzaResourceRequest::getPreferredHost)
+ .collect(Collectors.toSet()));
assertTrue(state.matchedResourceRequests.get() == 0);
- assertTrue(state.anyHostRequests.get() > 2);
+ // at least 2 for expired preferred host requests, 2 for expired follow-up ANY_HOST requests
+ assertTrue(state.anyHostRequests.get() >= 4);
spyAllocator.stop();
}
@@ -480,7 +491,7 @@ public class TestContainerAllocatorWithHostAffinity {
// Start the container allocator thread periodic assignment
spyAllocatorThread.start();
- // Let the request expire, expiration timeout is 3 ms
+ // Let the request expire, expiration timeout is 500 ms
Thread.sleep(1000);
// Verify that all the request that were created as preferred host requests expired