You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2019/01/11 18:30:37 UTC

[GitHub] asfgit closed pull request #5810: Ignite 10891 Fix flaky IgnitePdsThreadInterruptionTest.testInterruptsOnLFSRead

asfgit closed pull request #5810: Ignite 10891 Fix flaky IgnitePdsThreadInterruptionTest.testInterruptsOnLFSRead
URL: https://github.com/apache/ignite/pull/5810
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
index aa1b06161c04..c04073e1373e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
@@ -163,17 +163,23 @@ public void waitUnlock(FullPageId id) {
                 if (!hasLockedPages)
                     return;
 
+                boolean interrupted = false;
+
                 while (locked.contains(id)) {
                     if (log.isDebugEnabled())
                         log.debug("Found replaced page [" + id + "] which is being written to page store, wait for finish replacement");
 
                     try {
+                        // Uninterruptable wait.
                         locked.wait();
                     }
                     catch (InterruptedException e) {
-                        throw new IgniteInterruptedException(e);
+                        interrupted = true;
                     }
                 }
+
+                if (interrupted)
+                    Thread.currentThread().interrupt();
             }
         }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java b/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
index 59371e3a5049..7c5fd8192fb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
@@ -460,12 +460,13 @@ private static void cloneField(Map<Integer, Integer> identityIdxs, List<Object>
      * into check.
      *
      * @param t Throwable to check (if {@code null}, {@code false} is returned).
+     * @param msg Message text that should be in cause.
      * @param cls Cause classes to check (if {@code null} or empty, {@code false} is returned).
      * @return {@code True} if one of the causing exception is an instance of passed in classes,
      *      {@code false} otherwise.
      */
     @SafeVarargs
-    public static boolean hasCause(@Nullable Throwable t, @Nullable Class<?>... cls) {
+    public static boolean hasCause(@Nullable Throwable t,  @Nullable String msg, @Nullable Class<?>... cls) {
         if (t == null || F.isEmpty(cls))
             return false;
 
@@ -473,12 +474,20 @@ public static boolean hasCause(@Nullable Throwable t, @Nullable Class<?>... cls)
 
         for (Throwable th = t; th != null; th = th.getCause()) {
             for (Class<?> c : cls) {
-                if (c.isAssignableFrom(th.getClass()))
+                if (c.isAssignableFrom(th.getClass())) {
+                    if (msg != null) {
+                        if (th.getMessage() != null && th.getMessage().contains(msg))
+                            return true;
+                        else
+                            continue;
+                    }
+
                     return true;
+                }
             }
 
             for (Throwable n : th.getSuppressed()) {
-                if (hasCause(n, cls))
+                if (hasCause(n, msg, cls))
                     return true;
             }
 
@@ -489,6 +498,19 @@ public static boolean hasCause(@Nullable Throwable t, @Nullable Class<?>... cls)
         return false;
     }
 
+    /**
+     * Checks if passed in {@code 'Throwable'} has given class in {@code 'cause'} hierarchy <b>including</b> that
+     * throwable itself. <p> Note that this method follows includes {@link Throwable#getSuppressed()} into check.
+     *
+     * @param t Throwable to check (if {@code null}, {@code false} is returned).
+     * @param cls Cause classes to check (if {@code null} or empty, {@code false} is returned).
+     * @return {@code True} if one of the causing exception is an instance of passed in classes, {@code false}
+     * otherwise.
+     */
+    public static boolean hasCause(@Nullable Throwable t, @Nullable Class<?>... cls) {
+        return hasCause(t, null, cls);
+    }
+
     /**
      * Checks if passed throwable has given class in one of the suppressed exceptions.
      *
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
index 4b7db7dd4d54..9f7f79183f3f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
@@ -17,19 +17,29 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.db.file;
 
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataPageEvictionMode;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -40,110 +50,95 @@
 @RunWith(JUnit4.class)
 public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest {
     /** */
-    private static final int PAGE_SIZE = 1 << 12; // 4096
+    public static final int THREADS_CNT = 100;
 
     /** */
-    public static final int THREADS_CNT = 100;
+    private static final int VAL_LEN = 8192;
 
-    /**
-     * Cache name.
-     */
-    private final String CACHE_NAME = "cache";
+    /** */
+    private static final byte[] PAYLOAD = new byte[VAL_LEN];
 
     /** */
-    private volatile boolean stop = false;
+    private volatile boolean stop;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        final IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setDataStorageConfiguration(storageConfiguration());
-
-        CacheConfiguration ccfg = new CacheConfiguration<>(CACHE_NAME);
-
-        RendezvousAffinityFunction affinityFunction = new RendezvousAffinityFunction();
-        affinityFunction.setPartitions(1);
-
-        ccfg.setAffinity(affinityFunction);
-
-        cfg.setCacheConfiguration(ccfg);
-
-        return cfg;
-    }
-
-    /**
-     * @return DataStorage configuration.
-     */
-    private DataStorageConfiguration storageConfiguration() {
-        DataRegionConfiguration regionCfg = new DataRegionConfiguration()
-                .setInitialSize(10L * 1024L * 1024L)
-                .setMaxSize(10L * 1024L * 1024L)
-                .setPageEvictionMode(DataPageEvictionMode.RANDOM_LRU);
-
-        DataStorageConfiguration cfg = new DataStorageConfiguration()
-                .setWalMode(WALMode.LOG_ONLY)
-                .setWalFsyncDelayNanos(0)
-                .setPageSize(PAGE_SIZE)
-                .setFileIOFactory(new AsyncFileIOFactory());
-
-        cfg.setDefaultDataRegionConfiguration(regionCfg);
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setWalMode(WALMode.LOG_ONLY)
+            .setWalFsyncDelayNanos(0)
+            .setFileIOFactory(new AsyncFileIOFactory())
+            .setDefaultDataRegionConfiguration(
+                new DataRegionConfiguration()
+                    .setPersistenceEnabled(true)
+                    .setInitialSize(10L * 1024L * 1024L)
+                    .setMaxSize(10L * 1024L * 1024L)
+            ));
+
+        cfg.setCacheConfiguration(
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setAffinity(new RendezvousAffinityFunction(false, 1))
+        );
 
         return cfg;
     }
 
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        super.beforeTestsStarted();
-
+    @Before
+    public void before() throws Exception {
         cleanPersistenceDir();
     }
 
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
+    @After
+    public void after() throws Exception {
         stopAllGrids();
 
         cleanPersistenceDir();
     }
 
     /**
-     * Tests interruptions on LFS read.
+     * Tests interruptions on read.
      *
      * @throws Exception If failed.
      */
     @Test
-    public void testInterruptsOnLFSRead() throws Exception {
-        final Ignite ignite = startGrid();
-
-        ignite.active(true);
+    public void testInterruptsOnRead() throws Exception {
+        Ignite ignite = startGrid();
 
-        final int valLen = 8192;
+        ignite.cluster().active(true);
 
-        final byte[] payload = new byte[valLen];
+        int maxKey = 10_000;
 
-        final int maxKey = 10_000;
+        Set<Integer> keysToCheck = new HashSet<>();
 
         Thread[] workers = new Thread[THREADS_CNT];
 
+        // Load data.
+        try (IgniteDataStreamer<Integer, byte[]> st = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
+            st.allowOverwrite(true);
 
-        final IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
-
-        for (int i=0; i < maxKey; i++)
-            cache.put(i, payload);
+            for (int i = 0; i < maxKey; i++){
+                keysToCheck.add(i);
 
-        final AtomicReference<Throwable> fail = new AtomicReference<>();
+                st.addData(i, PAYLOAD);
+            }
+        }
 
+        IgniteCache<Integer,  byte[]> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
-        Runnable clo = new Runnable() {
-            @Override public void run() {
-                cache.get(ThreadLocalRandom.current().nextInt(maxKey / 5));
-            }
-        };
+        AtomicReference<Throwable> fail = new AtomicReference<>();
 
         for (int i = 0; i < workers.length; i++) {
-            workers[i] = new Thread(clo);
+            workers[i] = new Thread(() -> cache.get(ThreadLocalRandom.current().nextInt(maxKey / 5)));
+
             workers[i].setName("reader-" + i);
-            workers[i].setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-                @Override public void uncaughtException(Thread t, Throwable e) {
+
+            workers[i].setUncaughtExceptionHandler((t, e) -> {
+                // We can get IgniteInterruptedException on GridCacheAdapter.asyncOpsSem if thread was interrupted
+                // before asyncOpsSem.acquire().
+                if (!X.hasCause(e,
+                    "Failed to wait for asynchronous operation permit",
+                    IgniteInterruptedException.class)) {
                     fail.compareAndSet(null, e);
                 }
             });
@@ -152,15 +147,11 @@ public void testInterruptsOnLFSRead() throws Exception {
         for (Thread worker : workers)
             worker.start();
 
-        //Thread.sleep(3_000);
-
         // Interrupts should not affect reads.
         for (int i = 0;i < workers.length / 2; i++)
             workers[i].interrupt();
 
-        Thread.sleep(3_000);
-
-        stop = true;
+        U.sleep(3_000);
 
         for (Thread worker : workers)
             worker.join();
@@ -171,53 +162,61 @@ public void testInterruptsOnLFSRead() throws Exception {
 
         int verifiedKeys = 0;
 
+        // Get all keys.
+        Map<Integer, byte[]> res = cache.getAll(keysToCheck);
+
+        Assert.assertEquals(maxKey, keysToCheck.size());
+        Assert.assertEquals(maxKey, res.size());
+
         // Post check.
-        for (int i = 0; i < maxKey; i++) {
-            byte[] val = (byte[]) cache.get(i);
+        for (Integer key: keysToCheck) {
+            byte[] val = res.get(key);
 
-            if (val != null) {
-                assertEquals("Illegal length", valLen, val.length);
+            assertNotNull(val);
+            assertEquals("Illegal length", VAL_LEN, val.length);
 
-                verifiedKeys++;
-            }
+            verifiedKeys++;
         }
 
+        Assert.assertEquals(maxKey, verifiedKeys);
+
         log.info("Verified keys: " + verifiedKeys);
     }
 
     /**
      * Tests interruptions on WAL write.
      *
-     * @throws Exception
+     * @throws Exception If failed.
      */
     @Test
     public void testInterruptsOnWALWrite() throws Exception {
-        final Ignite ignite = startGrid();
+        Ignite ignite = startGrid();
 
-        ignite.active(true);
+        ignite.cluster().active(true);
 
-        final int valLen = 8192;
+        int maxKey = 100_000;
 
-        final byte[] payload = new byte[valLen];
-
-        final int maxKey = 100_000;
+        Set<Integer> keysToCheck = new GridConcurrentHashSet<>();
 
         Thread[] workers = new Thread[THREADS_CNT];
 
-        final AtomicReference<Throwable> fail = new AtomicReference<>();
+        AtomicReference<Throwable> fail = new AtomicReference<>();
 
-        Runnable clo = new Runnable() {
-            @Override public void run() {
-                IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
+        for (int i = 0; i < workers.length; i++) {
+            workers[i] = new Thread(() -> {
+                IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
-                while (!stop)
-                    cache.put(ThreadLocalRandom.current().nextInt(maxKey), payload);
-            }
-        };
+                while (!stop) {
+                    int key = ThreadLocalRandom.current().nextInt(maxKey);
+
+                    cache.put(key, PAYLOAD);
+
+                    keysToCheck.add(key);
+                }
+            });
 
-        for (int i = 0; i < workers.length; i++) {
-            workers[i] = new Thread(clo);
             workers[i].setName("writer-" + i);
+
             workers[i].setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                 @Override public void uncaughtException(Thread t, Throwable e) {
                     fail.compareAndSet(null, e);
@@ -245,19 +244,22 @@ public void testInterruptsOnWALWrite() throws Exception {
 
         assertNull(t);
 
-        IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
+        IgniteCache<Integer, byte[]> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
         int verifiedKeys = 0;
 
+        Map<Integer, byte[]> res = cache.getAll(keysToCheck);
+
+        Assert.assertEquals(res.size(), keysToCheck.size());
+
         // Post check.
-        for (int i = 0; i < maxKey; i++) {
-            byte[] val = (byte[]) cache.get(i);
+        for (Integer key: keysToCheck) {
+            byte[] val = res.get(key);
 
-            if (val != null) {
-                assertEquals("Illegal length", valLen, val.length);
+            assertNotNull(val);
+            assertEquals("Illegal length", VAL_LEN, val.length);
 
-                verifiedKeys++;
-            }
+            verifiedKeys++;
         }
 
         log.info("Verified keys: " + verifiedKeys);
diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index e6a394902fc5..38179949eb6a 100644
--- a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -103,6 +103,7 @@
 import org.apache.ignite.transactions.TransactionRollbackException;
 import org.apache.ignite.transactions.TransactionTimeoutException;
 import org.jetbrains.annotations.NotNull;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -691,6 +692,7 @@ public void testKillHangingLocalTransactions() throws Exception {
      * Simulate uncommitted backup transactions and test rolling back using utility.
      */
     @Test
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10899")
     public void testKillHangingRemoteTransactions() throws Exception {
         final int cnt = 3;
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services