You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/01/30 11:50:45 UTC
[ignite] branch master updated: IGNITE-10995 Raise a critical
failure when a node cannot process demand message - Fixes #5923.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3b00b47 IGNITE-10995 Raise a critical failure when a node cannot process demand message - Fixes #5923.
3b00b47 is described below
commit 3b00b47dda99e8860e3c66c871a8ca6657a72b3d
Author: mstepachev <ma...@gmail.com>
AuthorDate: Wed Jan 30 14:47:10 2019 +0300
IGNITE-10995 Raise a critical failure when a node cannot process demand message - Fixes #5923.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../dht/preloader/GridDhtPartitionSupplier.java | 7 +
.../IgniteShutdownOnSupplyMessageFailureTest.java | 250 +++++++++++++++++++++
.../ignite/testsuites/IgnitePdsMvccTestSuite2.java | 4 +
.../ignite/testsuites/IgnitePdsTestSuite2.java | 3 +
4 files changed, 264 insertions(+)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index d26d68f..c6bcc80 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -30,6 +30,8 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
@@ -492,6 +494,11 @@ class GridDhtPartitionSupplier {
U.error(log, "Failed to send supply error message ["
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t1);
}
+
+ grp.shared().kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR,
+ new IgniteCheckedException("Failed to continue supplying ["
+ + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t)
+ ));
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java
new file mode 100644
index 0000000..18fce87
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+
+/**
+ *
+ */
+@RunWith(JUnit4.class)
+public class IgniteShutdownOnSupplyMessageFailureTest extends GridCommonAbstractTest {
+ /** Rebalance cache name. */
+ private static final String TEST_REBALANCE_CACHE = "b13813zk";
+
+ /** Wal history size. */
+ private static final int WAL_HISTORY_SIZE = 30;
+
+ /** Node name with test file factory. */
+ private static final int NODE_NAME_WITH_TEST_FILE_FACTORY = 0;
+
+ /** Node name listen to a left event. */
+ private static final int NODE_NAME_LISTEN_TO_LEFT_EVENT = 1;
+
+ /** Wait on supply message failure. */
+ private static final CountDownLatch WAIT_ON_SUPPLY_MESSAGE_FAILURE = new CountDownLatch(1);
+
+ /** */
+ private AtomicBoolean canFailFirstNode = new AtomicBoolean();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(name);
+
+ cfg.setConsistentId(name);
+
+ DataStorageConfiguration conf = new DataStorageConfiguration()
+ .setWalHistorySize(WAL_HISTORY_SIZE)
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setMaxSize(DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE)
+ .setPersistenceEnabled(true)
+ )
+ .setWalMode(WALMode.FSYNC)
+ .setCheckpointFrequency(500);
+
+ if (name.equals(getTestIgniteInstanceName(NODE_NAME_WITH_TEST_FILE_FACTORY)))
+ conf.setFileIOFactory(new FailingFileIOFactory(canFailFirstNode));
+
+ if (name.equals(getTestIgniteInstanceName(NODE_NAME_LISTEN_TO_LEFT_EVENT)))
+ registerLeftEvent(cfg);
+
+ cfg.setDataStorageConfiguration(conf);
+
+ cfg.setFailureHandler(new StopNodeFailureHandler());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0");
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * Checks that we shutdown after a throwable into handleDemandMessage.
+ */
+ @Test
+ public void testShutdownOnSupplyMessageFailure() throws Exception {
+ IgniteEx ig = startGrid(0);
+ IgniteEx awayNode = startGrid(1);
+
+ ig.cluster().active(true);
+
+ createCache(ig, TEST_REBALANCE_CACHE);
+
+ populateCache(ig, TEST_REBALANCE_CACHE, 0, 3_000);
+
+ stopGrid(1);
+
+ populateCache(ig, TEST_REBALANCE_CACHE, 3_000, 6_000);
+
+ canFailFirstNode.set(true);
+
+ startGrid(1);
+
+ WAIT_ON_SUPPLY_MESSAGE_FAILURE.await();
+
+ assertEquals(1, grid(1).context().discovery().aliveServerNodes().size());
+ assertFalse(awayNode.context().discovery().alive(ig.context().localNodeId())); // Only second node is alive
+ }
+
+ /**
+ * @param ig Ig.
+ * @param cacheName Cache name.
+ */
+ private void createCache(IgniteEx ig, String cacheName) {
+ ig.getOrCreateCache(new CacheConfiguration<>(cacheName)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setAffinity(new RendezvousAffinityFunction(false, 1))
+ .setBackups(1)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setRebalanceBatchSize(100));
+ }
+
+ /**
+ * @param ig Ig.
+ * @param cacheName Cache name.
+ * @param startKey Start key range.
+ * @param cnt Count.
+ */
+ private void populateCache(IgniteEx ig, String cacheName, int startKey, int cnt) throws IgniteCheckedException {
+ try (IgniteDataStreamer<Object, Object> streamer = ig.dataStreamer(cacheName)) {
+ for (int i = startKey; i < startKey + cnt; i++)
+ streamer.addData(i, new byte[5 * 1000]);
+ }
+
+ GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ig.context().cache().context().database();
+
+ dbMgr.waitForCheckpoint("test");
+ }
+
+ /**
+ * @param cfg Config.
+ */
+ private void registerLeftEvent(IgniteConfiguration cfg) {
+ int[] evts = {EVT_NODE_LEFT};
+
+ cfg.setIncludeEventTypes(evts);
+
+ Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>();
+
+ lsnrs.put((IgnitePredicate<Event>)event -> {
+ WAIT_ON_SUPPLY_MESSAGE_FAILURE.countDown();
+
+ return true;
+ }, evts);
+
+ cfg.setLocalEventListeners(lsnrs);
+ }
+
+ /**
+ * Create File I/O which fails after second attempt to write to File
+ */
+ private static class FailingFileIOFactory implements FileIOFactory {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Delegate factory. */
+ private AtomicBoolean fail;
+
+ /** */
+ private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory();
+
+ /** */
+ FailingFileIOFactory(AtomicBoolean fail) {
+ this.fail = fail;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ final FileIO delegate = delegateFactory.create(file, modes);
+
+ return new FileIODecorator(delegate) {
+
+ @Override public int read(ByteBuffer destBuf, long position) throws IOException {
+ if (fail != null && fail.get())
+ throw new IOException("Test crash.");
+
+ return super.read(destBuf, position);
+ }
+
+ @Override public int read(ByteBuffer destBuf) throws IOException {
+ if (fail != null && fail.get())
+ throw new IOException("Test crash.");
+
+ return super.read(destBuf);
+ }
+
+ @Override public int read(byte[] buf, int off, int len) throws IOException {
+ if (fail != null && fail.get())
+ throw new IOException("Test crash.");
+
+ return super.read(buf, off, len);
+ }
+ };
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
index 7dd5ec1..38f93c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.persistence.LocalWalModeNoCha
import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteAbsentEvictionNodeOutOfBaselineTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsWithCompactionTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgniteShutdownOnSupplyMessageFailureTest;
import org.apache.ignite.internal.processors.cache.persistence.db.filename.IgniteUidAsConsistentIdMigrationTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.FsyncWalRolloverDoesNotBlockTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWALTailIsReachedDuringIterationOverArchiveTest;
@@ -59,6 +60,9 @@ public class IgnitePdsMvccTestSuite2 {
Collection<Class> ignoredTests = new HashSet<>();
+ // TODO IGNITE-7384: include test when implemented.
+ ignoredTests.add(IgniteShutdownOnSupplyMessageFailureTest.class);
+
// Classes that are contained mvcc test already.
ignoredTests.add(LocalWalModeNoChangeDuringRebalanceOnNonNodeAssignTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 49a56dc..5593dce 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebal
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsWithCompactionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWholeClusterRestartTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgniteShutdownOnSupplyMessageFailureTest;
import org.apache.ignite.internal.processors.cache.persistence.db.SlowHistoricalRebalanceSmallHistoryTest;
import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.IgniteCheckpointDirtyPagesForLowLoadTest;
import org.apache.ignite.internal.processors.cache.persistence.db.filename.IgniteUidAsConsistentIdMigrationTest;
@@ -159,6 +160,8 @@ public class IgnitePdsTestSuite2 {
GridTestUtils.addTestIfNeeded(suite, SlowHistoricalRebalanceSmallHistoryTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, IgniteShutdownOnSupplyMessageFailureTest.class, ignoredTests);
+
GridTestUtils.addTestIfNeeded(suite, IgnitePersistentStoreDataStructuresTest.class, ignoredTests);
// Failover test