You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/06/08 21:36:52 UTC

[samza] branch 1.5.0 updated: SAMZA-2545: Fix testBatchOperationTriggeredByBatchSize flakiness

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch 1.5.0
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/1.5.0 by this push:
     new 1a05c48  SAMZA-2545: Fix testBatchOperationTriggeredByBatchSize flakiness
1a05c48 is described below

commit 1a05c487650fa3ba9fc568a8f2b0a08ca31b6ac6
Author: mynameborat <bh...@gmail.com>
AuthorDate: Mon Jun 8 14:36:29 2020 -0700

    SAMZA-2545: Fix testBatchOperationTriggeredByBatchSize flakiness
    
    **Problem**: The test uses sleep and expects the future to complete at the end of the sleep duration. It introduces flakiness and results in false negatives.
    **Change**: Modified the tests to use latch instead of sleep
    **Tests**: None
    **API Changes**: None
    **Upgrade Instructions**: None
    **Usage Instructions**: None
    
    Author: mynameborat <bh...@gmail.com>
    
    Reviewers: Dengpanyin <dy...@linkedin.com>
    
    Closes #1379 from mynameborat/SAMZA-2545
    
    (cherry picked from commit 89538a915d92e153dd3ff221f2abc9719dd69982)
    Signed-off-by: mynameborat <bh...@gmail.com>
---
 .../samza/table/batching/TestBatchProcessor.java   | 44 +++++++++++-----------
 1 file changed, 23 insertions(+), 21 deletions(-)

diff --git a/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
index 2de3170..44ea246 100644
--- a/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
@@ -23,25 +23,21 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.function.Supplier;
 import org.apache.samza.table.ReadWriteTable;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static java.lang.Thread.*;
-import static org.mockito.Mockito.*;
+import static java.lang.Thread.sleep;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 
 public class TestBatchProcessor {
-  private static final int SLOW_OPERATION_TIME_MS = 500;
-  private static final Supplier<Void> SLOW_UPDATE_SUPPLIER = () -> {
-    try {
-      sleep(SLOW_OPERATION_TIME_MS);
-    } catch (InterruptedException e) {
-      // ignore
-    }
-    return null;
-  };
 
   public static class TestCreate {
     @Test
@@ -86,9 +82,18 @@ public class TestBatchProcessor {
     @Test
     public void testBatchOperationTriggeredByBatchSize() {
       final int maxBatchSize = 3;
+      final CountDownLatch batchCompletionTriggerLatch = new CountDownLatch(1);
+      final Supplier<Void> tableUpdateSupplier = () -> {
+        try {
+          batchCompletionTriggerLatch.await();
+        } catch (InterruptedException e) {
+          // ignore
+        }
+        return null;
+      };
 
       final ReadWriteTable<Integer, Integer> table = mock(ReadWriteTable.class);
-      when(table.putAllAsync(anyList())).thenReturn(CompletableFuture.supplyAsync(SLOW_UPDATE_SUPPLIER));
+      when(table.putAllAsync(anyList())).thenReturn(CompletableFuture.supplyAsync(tableUpdateSupplier));
 
       final BatchProcessor<Integer, Integer> batchProcessor =
           createBatchProcessor(table, maxBatchSize, Integer.MAX_VALUE);
@@ -104,15 +109,12 @@ public class TestBatchProcessor {
       }
       Assert.assertEquals(0, batchProcessor.size());
 
-      try {
-        sleep(SLOW_OPERATION_TIME_MS * 2);
-      } catch (InterruptedException e) {
-        // ignore
-      }
-
-      for (int i = 0; i < maxBatchSize; i++) {
-        Assert.assertTrue(futureList.get(i).isDone());
-      }
+      // Complete the async call to the underlying table
+      batchCompletionTriggerLatch.countDown();
+      // The latch should eventually trigger completion to the future returned by the batch processor
+      CompletableFuture
+          .allOf(futureList.toArray(new CompletableFuture[0]))
+          .join();
     }
 
     @Test