You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2022/09/29 08:52:09 UTC
[ignite] branch master updated: IGNITE-17742 Fixed an issue that could lead to data corruption of atomic cache when a new updated entry is greater than WAL buffer size.
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1d7583fe863 IGNITE-17742 Fixed an issue that could lead to data corruption of atomic cache when a new updated entry is greater than WAL buffer size.
1d7583fe863 is described below
commit 1d7583fe863e586536e230e7455d8f65d3495631
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Thu Sep 29 11:50:15 2022 +0300
IGNITE-17742 Fixed an issue that could lead to data corruption of atomic cache when a new updated entry is greater than WAL buffer size.
---
.../distributed/dht/atomic/GridDhtAtomicCache.java | 12 ++-
.../persistence/wal/SegmentedRingByteBuffer.java | 6 +-
.../wal/filehandle/FileWriteHandleImpl.java | 16 ++--
.../processors/database/IgniteDbAbstractTest.java | 5 ++
.../database/IgniteDbPutGetAbstractTest.java | 91 +++++++++++++++++++++-
...ingleNodeWithIndexingPutGetPersistenceTest.java | 15 ----
6 files changed, 120 insertions(+), 25 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 673ddbcf359..07d76d71747 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -40,6 +40,8 @@ import org.apache.ignite.binary.BinaryInvalidTypeException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.ReadRepairStrategy;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.UnregisteredBinaryTypeException;
@@ -3365,12 +3367,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (e instanceof RuntimeException && !X.hasCause(e, IgniteOutOfMemoryException.class))
throw (RuntimeException)e;
- IgniteCheckedException err = new IgniteCheckedException("Failed to update key on backup node: " + key, e);
+ U.error(log, "Failed to update key on backup node: " + key, e);
+
+ IgniteCheckedException err =
+ new IgniteCheckedException("Failed to update key on backup node: " + key, e);
+
+ // Trigger failure handler to avoid data inconsistency.
+ ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, err));
if (nearRes != null)
nearRes.addFailedKey(key, err);
-
- U.error(log, "Failed to update key on backup node: " + key, e);
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java
index 4a741c3602e..37789317c4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -188,6 +189,7 @@ public class SegmentedRingByteBuffer {
* @param size Amount of bytes for reserve.
* @return {@link WriteSegment} instance that point to {@link ByteBuffer} instance with given {@code size}.
* {@code null} if buffer space is not enough.
+ * @throws IgniteException If the given {@code size} is greater then capacity of this buffer.
*/
public WriteSegment offer(int size) {
return offer0(size, false);
@@ -200,6 +202,7 @@ public class SegmentedRingByteBuffer {
* @param size Amount of bytes for reserve.
* @return {@link WriteSegment} instance that point to {@link ByteBuffer} instance with given {@code size}.
* {@code null} if buffer space is not enough.
+ * @throws IgniteException If the given {@code size} is greater then capacity of this buffer.
*/
public WriteSegment offerSafe(int size) {
return offer0(size, true);
@@ -208,10 +211,11 @@ public class SegmentedRingByteBuffer {
/**
* @param size Amount of bytes for reserve.
* @param safe Safe mode.
+ * @throws IgniteException If the given {@code size} is greater then capacity of this buffer.
*/
private WriteSegment offer0(int size, boolean safe) {
if (size > cap)
- throw new IllegalArgumentException("Record is too long [capacity=" + cap + ", size=" + size + ']');
+ throw new IgniteException("Record is too long [capacity=" + cap + ", size=" + size + ']');
for (;;) {
if (!waitForConsumer) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
index 1eaffde8b12..5dc2ec155f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
@@ -226,11 +226,17 @@ class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle
SegmentedRingByteBuffer.WriteSegment seg;
- // Buffer can be in open state in case of resuming with different serializer version.
- if (rec.type() == SWITCH_SEGMENT_RECORD && !resume)
- seg = buf.offerSafe(rec.size());
- else
- seg = buf.offer(rec.size());
+ try {
+ // Buffer can be in open state in case of resuming with different serializer version.
+ if (rec.type() == SWITCH_SEGMENT_RECORD && !resume)
+ seg = buf.offerSafe(rec.size());
+ else
+ seg = buf.offer(rec.size());
+ }
+ catch (IgniteException e) {
+ // WAL record size is greater than the buffer's capacity.
+ throw new IgniteCheckedException(e);
+ }
WALPointer ptr = null;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java
index 1c6f4fca50e..f7c3c86c089 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java
@@ -89,6 +89,7 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest {
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setRebalanceMode(SYNC);
ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+ ccfg.setBackups(1);
CacheConfiguration ccfg2 = new CacheConfiguration("non-primitive");
@@ -99,6 +100,7 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest {
ccfg2.setWriteSynchronizationMode(FULL_SYNC);
ccfg2.setRebalanceMode(SYNC);
ccfg2.setAffinity(new RendezvousAffinityFunction(false, 32));
+ ccfg2.setBackups(1);
CacheConfiguration ccfg3 = new CacheConfiguration("large");
@@ -109,6 +111,7 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest {
ccfg3.setWriteSynchronizationMode(FULL_SYNC);
ccfg3.setRebalanceMode(SYNC);
ccfg3.setAffinity(new RendezvousAffinityFunction(false, 32));
+ ccfg3.setBackups(1);
CacheConfiguration ccfg4 = new CacheConfiguration("tiny");
@@ -116,6 +119,7 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest {
ccfg4.setWriteSynchronizationMode(FULL_SYNC);
ccfg4.setRebalanceMode(SYNC);
ccfg4.setAffinity(new RendezvousAffinityFunction(1, null));
+ ccfg4.setBackups(1);
CacheConfiguration ccfg5 = new CacheConfiguration("atomic");
@@ -126,6 +130,7 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest {
ccfg5.setWriteSynchronizationMode(FULL_SYNC);
ccfg5.setRebalanceMode(SYNC);
ccfg5.setAffinity(new RendezvousAffinityFunction(false, 32));
+ ccfg5.setBackups(1);
if (!client)
cfg.setCacheConfiguration(ccfg, ccfg2, ccfg3, ccfg4, ccfg5);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java
index bb018d387fc..05ee63dd098 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java
@@ -27,18 +27,22 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ThreadLocalRandom;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
@@ -52,6 +56,12 @@ import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
+import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_SEGMENT_SIZE;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.is;
+
/**
*
*/
@@ -59,6 +69,12 @@ public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest {
/** */
private static final int KEYS_COUNT = SF.applyLB(10_000, 2_000);
+ /** Index of Ignite node with a reduced WAL buffer size. */
+ private static final int smallWalBufSizeNodeIdx = 0;
+
+ /** Set of nodes that indicates system critical failure on a particular node. */
+ private final Set<String> failedNodes = new ConcurrentSkipListSet<>();
+
/**
* @return Ignite instance for testing.
*/
@@ -69,6 +85,27 @@ public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest {
return grid(0);
}
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (getTestIgniteInstanceIndex(gridName) == smallWalBufSizeNodeIdx) {
+ cfg.getDataStorageConfiguration()
+ .setWalBufferSize(DFLT_WAL_SEGMENT_SIZE / 4)
+ .setWalSegmentSize(DFLT_WAL_SEGMENT_SIZE / 4);
+ }
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+ return (ignite, failureCtx) -> {
+ failedNodes.add(ignite.name());
+ return true;
+ };
+ }
+
/**
* @return Cache for testing.
* @throws Exception If failed.
@@ -273,6 +310,58 @@ public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest {
assertNull(cache1.get(1));
}
+ /**
+ * Tests that putting a large entry, which size is greater than WAL buffer/segment size, results in CacheException.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPutLargeEntry() throws Exception {
+ assertTrue(
+ "Primary key should correspond to the node with small wal buffer size.",
+ smallWalBufSizeNodeIdx == 0);
+
+ IgniteCache<Integer, byte[]> atomicCache = grid(0).cache("atomic");
+ Integer atomicPrimaryKey = primaryKey(atomicCache);
+
+ // New value which is greater than WAL segment size / WAL buffer.
+ byte[] newVal = new byte[DFLT_WAL_SEGMENT_SIZE / 2];
+
+ assertThrows(
+ log,
+ () -> atomicCache.put(atomicPrimaryKey, newVal),
+ IgniteException.class,
+ null);
+ assertNull("Unexpected non-null value.", atomicCache.get(atomicPrimaryKey));
+ assertTrue("Unexpected system critical error.", failedNodes.isEmpty());
+
+ // Check backup scenario.
+ if (gridCount() > 1) {
+ Integer atomicBackupKey = backupKey(atomicCache);
+ Ignite primaryNode = primaryNode(atomicBackupKey, atomicCache.getName());
+
+ // Primary node should be updated successfully,
+ // however, backup node should fail because of size of the new entry does not allow to write it to WAL.
+ assertThrows(
+ log,
+ () -> atomicCache.put(atomicBackupKey, newVal),
+ IgniteException.class,
+ "Failed to update keys");
+
+ assertThat(
+ "Unexpected value.",
+ primaryNode.cache(atomicCache.getName()).get(atomicBackupKey),
+ is(newVal));
+
+ assertThat(
+ "Failure handler was not triggered on backup node.",
+ failedNodes,
+ hasItem(grid(0).name()));
+
+ assertFalse("Unexpected system critical error(s).", failedNodes.size() > 1);
+ }
+ }
+
/**
* @throws Exception If failed.
*/
@@ -1075,7 +1164,7 @@ public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest {
int cnt = 0;
- for (Cache.Entry<DbKey, DbValue> e : cache0.localEntries()) {
+ for (Cache.Entry<DbKey, DbValue> e : cache0.localEntries(CachePeekMode.PRIMARY)) {
cnt++;
allKeys.add(e.getKey());
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest.java
index d91fce6abeb..7e47604e73a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest.java
@@ -17,27 +17,12 @@
package org.apache.ignite.internal.processors.cache;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithIndexingPutGetTest;
/**
*
*/
public class IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest extends IgniteDbSingleNodeWithIndexingPutGetTest {
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setDataStorageConfiguration(
- new DataStorageConfiguration()
- .setWalMode(WALMode.LOG_ONLY)
- );
-
- return cfg;
- }
-
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
cleanPersistenceDir();