You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2022/01/11 13:41:23 UTC

[ignite-3] branch main updated: IGNITE-16256 ItInternalTableScanTest.testMultipleRowScan is flaky

This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 16aed16  IGNITE-16256 ItInternalTableScanTest.testMultipleRowScan is flaky
16aed16 is described below

commit 16aed16be2acbf52f7179c0cf8ae47ebf6cbc8b2
Author: sanpwc <al...@gridgain.com>
AuthorDate: Tue Jan 11 12:28:00 2022 +0300

    IGNITE-16256 ItInternalTableScanTest.testMultipleRowScan is flaky
---
 .../distributed/ItInternalTableScanTest.java       | 174 ++++++++++++---------
 1 file changed, 101 insertions(+), 73 deletions(-)

diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
index 1eddc47..ebb727d 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.distributed;
 
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -36,13 +35,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -257,70 +256,25 @@ public class ItInternalTableScanTest {
     }
 
     /**
-     * Checks whether {@link IllegalArgumentException} is thrown and inner storage cursor is closes in case of invalid requested amount of
+     * Checks whether {@link IllegalArgumentException} is thrown and inner storage cursor is closes in case of negative requested amount of
      * items.
      *
      * @throws Exception If any.
      */
     @Test()
-    public void testInvalidRequestedAmountScan() throws Exception {
-        AtomicBoolean cursorClosed = new AtomicBoolean(false);
-
-        when(mockStorage.scan(any())).thenAnswer(invocation -> {
-            var cursor = mock(Cursor.class);
-
-            doAnswer(
-                    invocationClose -> {
-                        cursorClosed.set(true);
-                        return null;
-                    }
-            ).when(cursor).close();
-
-            when(cursor.hasNext()).thenAnswer(hnInvocation -> {
-                throw new StorageException("test");
-            });
-
-            return cursor;
-        });
-
-        for (long n : new long[]{-1, 0}) {
-            AtomicReference<Throwable> gotException = new AtomicReference<>();
-
-            cursorClosed.set(false);
-
-            internalTbl.scan(0, null).subscribe(new Subscriber<>() {
-                @Override
-                public void onSubscribe(Subscription subscription) {
-                    subscription.request(n);
-                }
-
-                @Override
-                public void onNext(BinaryRow item) {
-                    fail("Should never get here.");
-                }
-
-                @Override
-                public void onError(Throwable throwable) {
-                    gotException.set(throwable);
-                }
-
-                @Override
-                public void onComplete() {
-                    fail("Should never get here.");
-                }
-            });
-
-            assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
-
-            assertTrue(waitForCondition(cursorClosed::get, 1_000));
+    public void testNegativeRequestedAmountScan() throws Exception {
+        invalidRequestNtest(-1);
+    }
 
-            assertThrows(
-                    IllegalArgumentException.class,
-                    () -> {
-                        throw gotException.get();
-                    }
-            );
-        }
+    /**
+     * Checks whether {@link IllegalArgumentException} is thrown and inner storage cursor is closes in case of zero requested amount of
+     * items.
+     *
+     * @throws Exception If any.
+     */
+    @Test()
+    public void testZeroRequestedAmountScan() throws Exception {
+        invalidRequestNtest(0);
     }
 
     /**
@@ -328,9 +282,11 @@ public class ItInternalTableScanTest {
      */
     @Test
     public void testExceptionRowScanCursorHasNext() throws Exception {
-        AtomicReference<Throwable> gotException = new AtomicReference<>();
+        // The latch that allows to await Subscriber.onComplete() before asserting test invariants
+        // and avoids the race between closing the cursor and stopping the node.
+        CountDownLatch subscriberFinishedLatch = new CountDownLatch(2);
 
-        AtomicBoolean cursorClosed = new AtomicBoolean(false);
+        AtomicReference<Throwable> gotException = new AtomicReference<>();
 
         when(mockStorage.scan(any())).thenAnswer(invocation -> {
             var cursor = mock(Cursor.class);
@@ -343,7 +299,7 @@ public class ItInternalTableScanTest {
 
             doAnswer(
                     invocationClose -> {
-                        cursorClosed.set(true);
+                        subscriberFinishedLatch.countDown();
                         return null;
                     }
             ).when(cursor).close();
@@ -366,6 +322,7 @@ public class ItInternalTableScanTest {
             @Override
             public void onError(Throwable throwable) {
                 gotException.set(throwable);
+                subscriberFinishedLatch.countDown();
             }
 
             @Override
@@ -374,11 +331,9 @@ public class ItInternalTableScanTest {
             }
         });
 
-        assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
+        subscriberFinishedLatch.await();
 
         assertEquals(gotException.get().getCause().getClass(), NoSuchElementException.class);
-
-        assertTrue(waitForCondition(cursorClosed::get, 1_000));
     }
 
     /**
@@ -386,6 +341,9 @@ public class ItInternalTableScanTest {
      */
     @Test
     public void testExceptionRowScan() throws Exception {
+        // The latch that allows to await Subscriber.onError() before asserting test invariants.
+        CountDownLatch gotExceptionLatch = new CountDownLatch(1);
+
         AtomicReference<Throwable> gotException = new AtomicReference<>();
 
         when(mockStorage.scan(any())).thenThrow(new StorageException("Some storage exception"));
@@ -405,6 +363,7 @@ public class ItInternalTableScanTest {
             @Override
             public void onError(Throwable throwable) {
                 gotException.set(throwable);
+                gotExceptionLatch.countDown();
             }
 
             @Override
@@ -413,7 +372,7 @@ public class ItInternalTableScanTest {
             }
         });
 
-        assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
+        gotExceptionLatch.await();
 
         assertEquals(gotException.get().getCause().getClass(), StorageException.class);
     }
@@ -466,6 +425,9 @@ public class ItInternalTableScanTest {
             }
         });
 
+        // The latch that allows to await Subscriber.onError() before asserting test invariants.
+        CountDownLatch gotExceptionLatch = new CountDownLatch(1);
+
         AtomicReference<Throwable> gotException = new AtomicReference<>();
 
         scan.subscribe(new Subscriber<>() {
@@ -482,6 +444,7 @@ public class ItInternalTableScanTest {
             @Override
             public void onError(Throwable throwable) {
                 gotException.set(throwable);
+                gotExceptionLatch.countDown();
             }
 
             @Override
@@ -490,7 +453,7 @@ public class ItInternalTableScanTest {
             }
         });
 
-        assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
+        gotExceptionLatch.await();
 
         assertEquals(gotException.get().getClass(), IllegalStateException.class);
     }
@@ -529,7 +492,7 @@ public class ItInternalTableScanTest {
     /**
      * Checks whether publisher provides all existing data and then completes if requested by reqAmount rows at a time.
      *
-     * @param submittedItems Items to be pushed by ublisher.
+     * @param submittedItems Items to be pushed by publisher.
      * @param reqAmount      Amount of rows to request at a time.
      * @throws Exception If Any.
      */
@@ -548,7 +511,8 @@ public class ItInternalTableScanTest {
             return cursor;
         });
 
-        AtomicBoolean noMoreData = new AtomicBoolean(false);
+        // The latch that allows to await Subscriber.onError() before asserting test invariants.
+        CountDownLatch subscriberAllDataAwaitLatch = new CountDownLatch(1);
 
         internalTbl.scan(0, null).subscribe(new Subscriber<>() {
             private Subscription subscription;
@@ -576,11 +540,13 @@ public class ItInternalTableScanTest {
 
             @Override
             public void onComplete() {
-                noMoreData.set(true);
+                subscriberAllDataAwaitLatch.countDown();
             }
         });
 
-        assertTrue(waitForCondition(() -> retrievedItems.size() == submittedItems.size(), 2_000));
+        subscriberAllDataAwaitLatch.await();
+
+        assertEquals(submittedItems.size(), retrievedItems.size());
 
         List<byte[]> expItems = submittedItems.stream().map(DataRow::valueBytes).collect(Collectors.toList());
         List<byte[]> gotItems = retrievedItems.stream().map(BinaryRow::bytes).collect(Collectors.toList());
@@ -588,7 +554,69 @@ public class ItInternalTableScanTest {
         for (int i = 0; i < expItems.size(); i++) {
             assertTrue(Arrays.equals(expItems.get(i), gotItems.get(i)));
         }
+    }
+
+    /**
+     * Checks whether {@link IllegalArgumentException} is thrown and inner storage cursor is closes in case of invalid requested amount of
+     * items.
+     *
+     * @param reqAmount  Amount of rows to request at a time.
+     * @throws Exception If Any.
+     */
+    private void invalidRequestNtest(int reqAmount) throws InterruptedException {
+        // The latch that allows to await Subscriber.onComplete() before asserting test invariants
+        // and avoids the race between closing the cursor and stopping the node.
+        CountDownLatch subscriberFinishedLatch = new CountDownLatch(2);
+
+        when(mockStorage.scan(any())).thenAnswer(invocation -> {
+            var cursor = mock(Cursor.class);
+
+            doAnswer(
+                    invocationClose -> {
+                        subscriberFinishedLatch.countDown();
+                        return null;
+                    }
+            ).when(cursor).close();
 
-        assertTrue(noMoreData.get(), "More data is not expected.");
+            when(cursor.hasNext()).thenAnswer(hnInvocation -> {
+                throw new StorageException("test");
+            });
+
+            return cursor;
+        });
+
+        AtomicReference<Throwable> gotException = new AtomicReference<>();
+
+        internalTbl.scan(0, null).subscribe(new Subscriber<>() {
+            @Override
+            public void onSubscribe(Subscription subscription) {
+                subscription.request(reqAmount);
+            }
+
+            @Override
+            public void onNext(BinaryRow item) {
+                fail("Should never get here.");
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                gotException.set(throwable);
+                subscriberFinishedLatch.countDown();
+            }
+
+            @Override
+            public void onComplete() {
+                fail("Should never get here.");
+            }
+        });
+
+        subscriberFinishedLatch.await();
+
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> {
+                    throw gotException.get();
+                }
+        );
     }
 }