You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2022/01/11 13:41:23 UTC
[ignite-3] branch main updated: IGNITE-16256 ItInternalTableScanTest.testMultipleRowScan is flaky
This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 16aed16 IGNITE-16256 ItInternalTableScanTest.testMultipleRowScan is flaky
16aed16 is described below
commit 16aed16be2acbf52f7179c0cf8ae47ebf6cbc8b2
Author: sanpwc <al...@gridgain.com>
AuthorDate: Tue Jan 11 12:28:00 2022 +0300
IGNITE-16256 ItInternalTableScanTest.testMultipleRowScan is flaky
---
.../distributed/ItInternalTableScanTest.java | 174 ++++++++++++---------
1 file changed, 101 insertions(+), 73 deletions(-)
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
index 1eddc47..ebb727d 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.distributed;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -36,13 +35,13 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -257,70 +256,25 @@ public class ItInternalTableScanTest {
}
/**
- * Checks whether {@link IllegalArgumentException} is thrown and inner storage cursor is closes in case of invalid requested amount of
+ * Checks whether {@link IllegalArgumentException} is thrown and inner storage cursor is closes in case of negative requested amount of
* items.
*
* @throws Exception If any.
*/
@Test()
- public void testInvalidRequestedAmountScan() throws Exception {
- AtomicBoolean cursorClosed = new AtomicBoolean(false);
-
- when(mockStorage.scan(any())).thenAnswer(invocation -> {
- var cursor = mock(Cursor.class);
-
- doAnswer(
- invocationClose -> {
- cursorClosed.set(true);
- return null;
- }
- ).when(cursor).close();
-
- when(cursor.hasNext()).thenAnswer(hnInvocation -> {
- throw new StorageException("test");
- });
-
- return cursor;
- });
-
- for (long n : new long[]{-1, 0}) {
- AtomicReference<Throwable> gotException = new AtomicReference<>();
-
- cursorClosed.set(false);
-
- internalTbl.scan(0, null).subscribe(new Subscriber<>() {
- @Override
- public void onSubscribe(Subscription subscription) {
- subscription.request(n);
- }
-
- @Override
- public void onNext(BinaryRow item) {
- fail("Should never get here.");
- }
-
- @Override
- public void onError(Throwable throwable) {
- gotException.set(throwable);
- }
-
- @Override
- public void onComplete() {
- fail("Should never get here.");
- }
- });
-
- assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
-
- assertTrue(waitForCondition(cursorClosed::get, 1_000));
+ public void testNegativeRequestedAmountScan() throws Exception {
+ invalidRequestNtest(-1);
+ }
- assertThrows(
- IllegalArgumentException.class,
- () -> {
- throw gotException.get();
- }
- );
- }
+ /**
+ * Checks whether {@link IllegalArgumentException} is thrown and inner storage cursor is closes in case of zero requested amount of
+ * items.
+ *
+ * @throws Exception If any.
+ */
+ @Test()
+ public void testZeroRequestedAmountScan() throws Exception {
+ invalidRequestNtest(0);
}
/**
@@ -328,9 +282,11 @@ public class ItInternalTableScanTest {
*/
@Test
public void testExceptionRowScanCursorHasNext() throws Exception {
- AtomicReference<Throwable> gotException = new AtomicReference<>();
+ // The latch that allows to await Subscriber.onComplete() before asserting test invariants
+ // and avoids the race between closing the cursor and stopping the node.
+ CountDownLatch subscriberFinishedLatch = new CountDownLatch(2);
- AtomicBoolean cursorClosed = new AtomicBoolean(false);
+ AtomicReference<Throwable> gotException = new AtomicReference<>();
when(mockStorage.scan(any())).thenAnswer(invocation -> {
var cursor = mock(Cursor.class);
@@ -343,7 +299,7 @@ public class ItInternalTableScanTest {
doAnswer(
invocationClose -> {
- cursorClosed.set(true);
+ subscriberFinishedLatch.countDown();
return null;
}
).when(cursor).close();
@@ -366,6 +322,7 @@ public class ItInternalTableScanTest {
@Override
public void onError(Throwable throwable) {
gotException.set(throwable);
+ subscriberFinishedLatch.countDown();
}
@Override
@@ -374,11 +331,9 @@ public class ItInternalTableScanTest {
}
});
- assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
+ subscriberFinishedLatch.await();
assertEquals(gotException.get().getCause().getClass(), NoSuchElementException.class);
-
- assertTrue(waitForCondition(cursorClosed::get, 1_000));
}
/**
@@ -386,6 +341,9 @@ public class ItInternalTableScanTest {
*/
@Test
public void testExceptionRowScan() throws Exception {
+ // The latch that allows to await Subscriber.onError() before asserting test invariants.
+ CountDownLatch gotExceptionLatch = new CountDownLatch(1);
+
AtomicReference<Throwable> gotException = new AtomicReference<>();
when(mockStorage.scan(any())).thenThrow(new StorageException("Some storage exception"));
@@ -405,6 +363,7 @@ public class ItInternalTableScanTest {
@Override
public void onError(Throwable throwable) {
gotException.set(throwable);
+ gotExceptionLatch.countDown();
}
@Override
@@ -413,7 +372,7 @@ public class ItInternalTableScanTest {
}
});
- assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
+ gotExceptionLatch.await();
assertEquals(gotException.get().getCause().getClass(), StorageException.class);
}
@@ -466,6 +425,9 @@ public class ItInternalTableScanTest {
}
});
+ // The latch that allows to await Subscriber.onError() before asserting test invariants.
+ CountDownLatch gotExceptionLatch = new CountDownLatch(1);
+
AtomicReference<Throwable> gotException = new AtomicReference<>();
scan.subscribe(new Subscriber<>() {
@@ -482,6 +444,7 @@ public class ItInternalTableScanTest {
@Override
public void onError(Throwable throwable) {
gotException.set(throwable);
+ gotExceptionLatch.countDown();
}
@Override
@@ -490,7 +453,7 @@ public class ItInternalTableScanTest {
}
});
- assertTrue(waitForCondition(() -> gotException.get() != null, 1_000));
+ gotExceptionLatch.await();
assertEquals(gotException.get().getClass(), IllegalStateException.class);
}
@@ -529,7 +492,7 @@ public class ItInternalTableScanTest {
/**
* Checks whether publisher provides all existing data and then completes if requested by reqAmount rows at a time.
*
- * @param submittedItems Items to be pushed by ublisher.
+ * @param submittedItems Items to be pushed by publisher.
* @param reqAmount Amount of rows to request at a time.
* @throws Exception If Any.
*/
@@ -548,7 +511,8 @@ public class ItInternalTableScanTest {
return cursor;
});
- AtomicBoolean noMoreData = new AtomicBoolean(false);
+ // The latch that allows to await Subscriber.onError() before asserting test invariants.
+ CountDownLatch subscriberAllDataAwaitLatch = new CountDownLatch(1);
internalTbl.scan(0, null).subscribe(new Subscriber<>() {
private Subscription subscription;
@@ -576,11 +540,13 @@ public class ItInternalTableScanTest {
@Override
public void onComplete() {
- noMoreData.set(true);
+ subscriberAllDataAwaitLatch.countDown();
}
});
- assertTrue(waitForCondition(() -> retrievedItems.size() == submittedItems.size(), 2_000));
+ subscriberAllDataAwaitLatch.await();
+
+ assertEquals(submittedItems.size(), retrievedItems.size());
List<byte[]> expItems = submittedItems.stream().map(DataRow::valueBytes).collect(Collectors.toList());
List<byte[]> gotItems = retrievedItems.stream().map(BinaryRow::bytes).collect(Collectors.toList());
@@ -588,7 +554,69 @@ public class ItInternalTableScanTest {
for (int i = 0; i < expItems.size(); i++) {
assertTrue(Arrays.equals(expItems.get(i), gotItems.get(i)));
}
+ }
+
+ /**
+ * Checks whether {@link IllegalArgumentException} is thrown and inner storage cursor is closes in case of invalid requested amount of
+ * items.
+ *
+ * @param reqAmount Amount of rows to request at a time.
+ * @throws Exception If Any.
+ */
+ private void invalidRequestNtest(int reqAmount) throws InterruptedException {
+ // The latch that allows to await Subscriber.onComplete() before asserting test invariants
+ // and avoids the race between closing the cursor and stopping the node.
+ CountDownLatch subscriberFinishedLatch = new CountDownLatch(2);
+
+ when(mockStorage.scan(any())).thenAnswer(invocation -> {
+ var cursor = mock(Cursor.class);
+
+ doAnswer(
+ invocationClose -> {
+ subscriberFinishedLatch.countDown();
+ return null;
+ }
+ ).when(cursor).close();
- assertTrue(noMoreData.get(), "More data is not expected.");
+ when(cursor.hasNext()).thenAnswer(hnInvocation -> {
+ throw new StorageException("test");
+ });
+
+ return cursor;
+ });
+
+ AtomicReference<Throwable> gotException = new AtomicReference<>();
+
+ internalTbl.scan(0, null).subscribe(new Subscriber<>() {
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ subscription.request(reqAmount);
+ }
+
+ @Override
+ public void onNext(BinaryRow item) {
+ fail("Should never get here.");
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ gotException.set(throwable);
+ subscriberFinishedLatch.countDown();
+ }
+
+ @Override
+ public void onComplete() {
+ fail("Should never get here.");
+ }
+ });
+
+ subscriberFinishedLatch.await();
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+ throw gotException.get();
+ }
+ );
}
}