You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/01/30 11:50:45 UTC

[ignite] branch master updated: IGNITE-10995 Raise a critical failure when a node cannot process demand message - Fixes #5923.

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b00b47  IGNITE-10995 Raise a critical failure when a node cannot process demand message - Fixes #5923.
3b00b47 is described below

commit 3b00b47dda99e8860e3c66c871a8ca6657a72b3d
Author: mstepachev <ma...@gmail.com>
AuthorDate: Wed Jan 30 14:47:10 2019 +0300

    IGNITE-10995 Raise a critical failure when a node cannot process demand message - Fixes #5923.
    
    Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
 .../dht/preloader/GridDhtPartitionSupplier.java    |   7 +
 .../IgniteShutdownOnSupplyMessageFailureTest.java  | 250 +++++++++++++++++++++
 .../ignite/testsuites/IgnitePdsMvccTestSuite2.java |   4 +
 .../ignite/testsuites/IgnitePdsTestSuite2.java     |   3 +
 4 files changed, 264 insertions(+)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index d26d68f..c6bcc80 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -30,6 +30,8 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
@@ -492,6 +494,11 @@ class GridDhtPartitionSupplier {
                 U.error(log, "Failed to send supply error message ["
                     + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t1);
             }
+
+            grp.shared().kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR,
+                new IgniteCheckedException("Failed to continue supplying ["
+                    + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t)
+            ));
         }
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java
new file mode 100644
index 0000000..18fce87
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+
+/**
+ *
+ */
+@RunWith(JUnit4.class)
+public class IgniteShutdownOnSupplyMessageFailureTest extends GridCommonAbstractTest {
+    /** Rebalance cache name. */
+    private static final String TEST_REBALANCE_CACHE = "b13813zk";
+
+    /** Wal history size. */
+    private static final int WAL_HISTORY_SIZE = 30;
+
+    /** Node name with test file factory. */
+    private static final int NODE_NAME_WITH_TEST_FILE_FACTORY = 0;
+
+    /** Node name listen to a left event. */
+    private static final int NODE_NAME_LISTEN_TO_LEFT_EVENT = 1;
+
+    /** Wait on supply message failure. */
+    private static final CountDownLatch WAIT_ON_SUPPLY_MESSAGE_FAILURE = new CountDownLatch(1);
+
+    /** */
+    private AtomicBoolean canFailFirstNode = new AtomicBoolean();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(name);
+
+        cfg.setConsistentId(name);
+
+        DataStorageConfiguration conf = new DataStorageConfiguration()
+            .setWalHistorySize(WAL_HISTORY_SIZE)
+            .setDefaultDataRegionConfiguration(
+                new DataRegionConfiguration()
+                    .setMaxSize(DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE)
+                    .setPersistenceEnabled(true)
+            )
+            .setWalMode(WALMode.FSYNC)
+            .setCheckpointFrequency(500);
+
+        if (name.equals(getTestIgniteInstanceName(NODE_NAME_WITH_TEST_FILE_FACTORY)))
+            conf.setFileIOFactory(new FailingFileIOFactory(canFailFirstNode));
+
+        if (name.equals(getTestIgniteInstanceName(NODE_NAME_LISTEN_TO_LEFT_EVENT)))
+            registerLeftEvent(cfg);
+
+        cfg.setDataStorageConfiguration(conf);
+
+        cfg.setFailureHandler(new StopNodeFailureHandler());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0");
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * Checks that we shutdown after a throwable into handleDemandMessage.
+     */
+    @Test
+    public void testShutdownOnSupplyMessageFailure() throws Exception {
+        IgniteEx ig = startGrid(0);
+        IgniteEx awayNode = startGrid(1);
+
+        ig.cluster().active(true);
+
+        createCache(ig, TEST_REBALANCE_CACHE);
+
+        populateCache(ig, TEST_REBALANCE_CACHE, 0, 3_000);
+
+        stopGrid(1);
+
+        populateCache(ig, TEST_REBALANCE_CACHE, 3_000, 6_000);
+
+        canFailFirstNode.set(true);
+
+        startGrid(1);
+
+        WAIT_ON_SUPPLY_MESSAGE_FAILURE.await();
+
+        assertEquals(1, grid(1).context().discovery().aliveServerNodes().size());
+        assertFalse(awayNode.context().discovery().alive(ig.context().localNodeId())); // Only second node is alive
+    }
+
+    /**
+     * @param ig Ig.
+     * @param cacheName Cache name.
+     */
+    private void createCache(IgniteEx ig, String cacheName) {
+        ig.getOrCreateCache(new CacheConfiguration<>(cacheName)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setAffinity(new RendezvousAffinityFunction(false, 1))
+            .setBackups(1)
+            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+            .setRebalanceBatchSize(100));
+    }
+
+    /**
+     * @param ig Ig.
+     * @param cacheName Cache name.
+     * @param startKey Start key range.
+     * @param cnt Count.
+     */
+    private void populateCache(IgniteEx ig, String cacheName, int startKey, int cnt) throws IgniteCheckedException {
+        try (IgniteDataStreamer<Object, Object> streamer = ig.dataStreamer(cacheName)) {
+            for (int i = startKey; i < startKey + cnt; i++)
+                streamer.addData(i, new byte[5 * 1000]);
+        }
+
+        GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ig.context().cache().context().database();
+
+        dbMgr.waitForCheckpoint("test");
+    }
+
+    /**
+     * @param cfg Config.
+     */
+    private void registerLeftEvent(IgniteConfiguration cfg) {
+        int[] evts = {EVT_NODE_LEFT};
+
+        cfg.setIncludeEventTypes(evts);
+
+        Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>();
+
+        lsnrs.put((IgnitePredicate<Event>)event -> {
+            WAIT_ON_SUPPLY_MESSAGE_FAILURE.countDown();
+
+            return true;
+        }, evts);
+
+        cfg.setLocalEventListeners(lsnrs);
+    }
+
+    /**
+     * Create File I/O which fails after second attempt to write to File
+     */
+    private static class FailingFileIOFactory implements FileIOFactory {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Delegate factory. */
+        private AtomicBoolean fail;
+
+        /** */
+        private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory();
+
+        /** */
+        FailingFileIOFactory(AtomicBoolean fail) {
+            this.fail = fail;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+            final FileIO delegate = delegateFactory.create(file, modes);
+
+            return new FileIODecorator(delegate) {
+
+                @Override public int read(ByteBuffer destBuf, long position) throws IOException {
+                    if (fail != null && fail.get())
+                        throw new IOException("Test crash.");
+
+                    return super.read(destBuf, position);
+                }
+
+                @Override public int read(ByteBuffer destBuf) throws IOException {
+                    if (fail != null && fail.get())
+                        throw new IOException("Test crash.");
+
+                    return super.read(destBuf);
+                }
+
+                @Override public int read(byte[] buf, int off, int len) throws IOException {
+                    if (fail != null && fail.get())
+                        throw new IOException("Test crash.");
+
+                    return super.read(buf, off, len);
+                }
+            };
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
index 7dd5ec1..38f93c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.persistence.LocalWalModeNoCha
 import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteAbsentEvictionNodeOutOfBaselineTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsWithCompactionTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgniteShutdownOnSupplyMessageFailureTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.filename.IgniteUidAsConsistentIdMigrationTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.FsyncWalRolloverDoesNotBlockTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWALTailIsReachedDuringIterationOverArchiveTest;
@@ -59,6 +60,9 @@ public class IgnitePdsMvccTestSuite2 {
 
         Collection<Class> ignoredTests = new HashSet<>();
 
+        // TODO IGNITE-7384: include test when implemented.
+        ignoredTests.add(IgniteShutdownOnSupplyMessageFailureTest.class);
+
         // Classes that are contained mvcc test already.
         ignoredTests.add(LocalWalModeNoChangeDuringRebalanceOnNonNodeAssignTest.class);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 49a56dc..5593dce 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebal
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsWithCompactionTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWholeClusterRestartTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgniteShutdownOnSupplyMessageFailureTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.SlowHistoricalRebalanceSmallHistoryTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.IgniteCheckpointDirtyPagesForLowLoadTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.filename.IgniteUidAsConsistentIdMigrationTest;
@@ -159,6 +160,8 @@ public class IgnitePdsTestSuite2 {
 
         GridTestUtils.addTestIfNeeded(suite, SlowHistoricalRebalanceSmallHistoryTest.class, ignoredTests);
 
+        GridTestUtils.addTestIfNeeded(suite, IgniteShutdownOnSupplyMessageFailureTest.class, ignoredTests);
+
         GridTestUtils.addTestIfNeeded(suite, IgnitePersistentStoreDataStructuresTest.class, ignoredTests);
 
         // Failover test