You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/12/02 08:59:18 UTC

[GitHub] [ignite] tkalkirill commented on a diff in pull request #10417: IGNITE-17719 IgnitePdsThreadInterruptionTest#testInterruptsOnWALWrite hangs

tkalkirill commented on code in PR #10417:
URL: https://github.com/apache/ignite/pull/10417#discussion_r1037919332


##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java:
##########
@@ -83,105 +80,97 @@ public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest {
         return cfg;
     }
 
-    /** */
-    @Before
-    public void before() throws Exception {
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
         cleanPersistenceDir();
     }
 
-    /** */
-    @After
-    public void after() throws Exception {
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
         stopAllGrids();
 
         cleanPersistenceDir();
     }
 
+    /** {@inheritDoc} */
+    @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+        return new StopNodeFailureHandler();
+    }
+
     /**
      * Tests interruptions on read.
      *
      * @throws Exception If failed.
      */
     @Test
     public void testInterruptsOnRead() throws Exception {
-        Ignite ignite = startGrid();
+        Ignite ignite = startGrid(0);
 
-        ignite.cluster().active(true);
+        ignite.cluster().state(ACTIVE);
 
-        int maxKey = 10_000;
+        int keyCount = 10_000;
 
-        Set<Integer> keysToCheck = new HashSet<>();
-
-        Thread[] workers = new Thread[THREADS_CNT];
+        byte[] value = new byte[8_192];
 
         // Load data.
         try (IgniteDataStreamer<Integer, byte[]> st = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
             st.allowOverwrite(true);
 
-            for (int i = 0; i < maxKey; i++) {
-                keysToCheck.add(i);
-
-                st.addData(i, PAYLOAD);
-            }
+            for (int i = 0; i < keyCount; i++)
+                st.addData(i, value);
         }
 
         IgniteCache<Integer, byte[]> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
-        AtomicReference<Throwable> fail = new AtomicReference<>();
+        Collection<Throwable> readThreadsError = new ConcurrentLinkedQueue<>();
 
-        for (int i = 0; i < workers.length; i++) {
-            workers[i] = new Thread(() -> cache.get(ThreadLocalRandom.current().nextInt(maxKey / 5)));
+        CountDownLatch startThreadsLatch = new CountDownLatch(THREADS_CNT);
 
-            workers[i].setName("reader-" + i);
+        Thread[] workers = new Thread[THREADS_CNT];
 
-            workers[i].setUncaughtExceptionHandler((t, e) -> {
-                // We can get IgniteInterruptedException on GridCacheAdapter.asyncOpsSem if thread was interrupted
-                // before asyncOpsSem.acquire().
-                if (!X.hasCause(e,
-                    "Failed to wait for asynchronous operation permit",
-                    IgniteInterruptedException.class)) {
-                    fail.compareAndSet(null, e);
-                }
-            });
+        for (int i = 0; i < workers.length; i++) {
+            workers[i] = new Thread(
+                () -> {
+                    try {
+                        startThreadsLatch.countDown();
+
+                        cache.get(ThreadLocalRandom.current().nextInt(keyCount / 5));
+                    }
+                    catch (Throwable throwable) {
+                        if (!X.hasCause(
+                            throwable,
+                            "Failed to wait for asynchronous operation permit",
+                            IgniteInterruptedException.class
+                        ))
+                            readThreadsError.add(throwable);
+                    }
+                },
+                "cache-reader-from-test" + i
+            );
         }
 
         for (Thread worker : workers)
             worker.start();
 
-        // Interrupts should not affect reads.
-        for (int i = 0; i < workers.length / 2; i++)
-            workers[i].interrupt();
-
-        U.sleep(3_000);
+        startThreadsLatch.await();

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org