You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/06/08 21:36:52 UTC
[samza] branch 1.5.0 updated: SAMZA-2545: Fix
testBatchOperationTriggeredByBatchSize flakiness
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch 1.5.0
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/1.5.0 by this push:
new 1a05c48 SAMZA-2545: Fix testBatchOperationTriggeredByBatchSize flakiness
1a05c48 is described below
commit 1a05c487650fa3ba9fc568a8f2b0a08ca31b6ac6
Author: mynameborat <bh...@gmail.com>
AuthorDate: Mon Jun 8 14:36:29 2020 -0700
SAMZA-2545: Fix testBatchOperationTriggeredByBatchSize flakiness
**Problem**: The test uses sleep and expects the future to complete at the end of the sleep duration. It introduces flakiness and results in false negatives.
**Change**: Modified the tests to use latch instead of sleep
**Tests**: None
**API Changes**: None
**Upgrade Instructions**: None
**Usage Instructions**: None
Author: mynameborat <bh...@gmail.com>
Reviewers: Dengpanyin <dy...@linkedin.com>
Closes #1379 from mynameborat/SAMZA-2545
(cherry picked from commit 89538a915d92e153dd3ff221f2abc9719dd69982)
Signed-off-by: mynameborat <bh...@gmail.com>
---
.../samza/table/batching/TestBatchProcessor.java | 44 +++++++++++-----------
1 file changed, 23 insertions(+), 21 deletions(-)
diff --git a/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
index 2de3170..44ea246 100644
--- a/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
@@ -23,25 +23,21 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import org.apache.samza.table.ReadWriteTable;
import org.junit.Assert;
import org.junit.Test;
-import static java.lang.Thread.*;
-import static org.mockito.Mockito.*;
+import static java.lang.Thread.sleep;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class TestBatchProcessor {
- private static final int SLOW_OPERATION_TIME_MS = 500;
- private static final Supplier<Void> SLOW_UPDATE_SUPPLIER = () -> {
- try {
- sleep(SLOW_OPERATION_TIME_MS);
- } catch (InterruptedException e) {
- // ignore
- }
- return null;
- };
public static class TestCreate {
@Test
@@ -86,9 +82,18 @@ public class TestBatchProcessor {
@Test
public void testBatchOperationTriggeredByBatchSize() {
final int maxBatchSize = 3;
+ final CountDownLatch batchCompletionTriggerLatch = new CountDownLatch(1);
+ final Supplier<Void> tableUpdateSupplier = () -> {
+ try {
+ batchCompletionTriggerLatch.await();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ return null;
+ };
final ReadWriteTable<Integer, Integer> table = mock(ReadWriteTable.class);
- when(table.putAllAsync(anyList())).thenReturn(CompletableFuture.supplyAsync(SLOW_UPDATE_SUPPLIER));
+ when(table.putAllAsync(anyList())).thenReturn(CompletableFuture.supplyAsync(tableUpdateSupplier));
final BatchProcessor<Integer, Integer> batchProcessor =
createBatchProcessor(table, maxBatchSize, Integer.MAX_VALUE);
@@ -104,15 +109,12 @@ public class TestBatchProcessor {
}
Assert.assertEquals(0, batchProcessor.size());
- try {
- sleep(SLOW_OPERATION_TIME_MS * 2);
- } catch (InterruptedException e) {
- // ignore
- }
-
- for (int i = 0; i < maxBatchSize; i++) {
- Assert.assertTrue(futureList.get(i).isDone());
- }
+ // Complete the async call to the underlying table
+ batchCompletionTriggerLatch.countDown();
+ // The latch should eventually trigger completion to the future returned by the batch processor
+ CompletableFuture
+ .allOf(futureList.toArray(new CompletableFuture[0]))
+ .join();
}
@Test