You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2022/09/29 08:52:09 UTC

[ignite] branch master updated: IGNITE-17742 Fixed an issue that could lead to data corruption of atomic cache when a new updated entry is greater than WAL buffer size.

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d7583fe863 IGNITE-17742 Fixed an issue that could lead to data corruption of atomic cache when a new updated entry is greater than WAL buffer size.
1d7583fe863 is described below

commit 1d7583fe863e586536e230e7455d8f65d3495631
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Thu Sep 29 11:50:15 2022 +0300

    IGNITE-17742 Fixed an issue that could lead to data corruption of atomic cache when a new updated entry is greater than WAL buffer size.
---
 .../distributed/dht/atomic/GridDhtAtomicCache.java | 12 ++-
 .../persistence/wal/SegmentedRingByteBuffer.java   |  6 +-
 .../wal/filehandle/FileWriteHandleImpl.java        | 16 ++--
 .../processors/database/IgniteDbAbstractTest.java  |  5 ++
 .../database/IgniteDbPutGetAbstractTest.java       | 91 +++++++++++++++++++++-
 ...ingleNodeWithIndexingPutGetPersistenceTest.java | 15 ----
 6 files changed, 120 insertions(+), 25 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 673ddbcf359..07d76d71747 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -40,6 +40,8 @@ import org.apache.ignite.binary.BinaryInvalidTypeException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.ReadRepairStrategy;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.UnregisteredBinaryTypeException;
@@ -3365,12 +3367,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     if (e instanceof RuntimeException && !X.hasCause(e, IgniteOutOfMemoryException.class))
                         throw (RuntimeException)e;
 
-                    IgniteCheckedException err = new IgniteCheckedException("Failed to update key on backup node: " + key, e);
+                    U.error(log, "Failed to update key on backup node: " + key, e);
+
+                    IgniteCheckedException err =
+                        new IgniteCheckedException("Failed to update key on backup node: " + key, e);
+
+                    // Trigger failure handler to avoid data inconsistency.
+                    ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, err));
 
                     if (nearRes != null)
                         nearRes.addFailedKey(key, err);
-
-                    U.error(log, "Failed to update key on backup node: " + key, e);
                 }
             }
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java
index 4a741c3602e..37789317c4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -188,6 +189,7 @@ public class SegmentedRingByteBuffer {
      * @param size Amount of bytes for reserve.
      * @return {@link WriteSegment} instance that point to {@link ByteBuffer} instance with given {@code size}.
      * {@code null} if buffer space is not enough.
+     * @throws IgniteException If the given {@code size} is greater then capacity of this buffer.
      */
     public WriteSegment offer(int size) {
         return offer0(size, false);
@@ -200,6 +202,7 @@ public class SegmentedRingByteBuffer {
      * @param size Amount of bytes for reserve.
      * @return {@link WriteSegment} instance that point to {@link ByteBuffer} instance with given {@code size}.
      * {@code null} if buffer space is not enough.
+     * @throws IgniteException If the given {@code size} is greater then capacity of this buffer.
      */
     public WriteSegment offerSafe(int size) {
         return offer0(size, true);
@@ -208,10 +211,11 @@ public class SegmentedRingByteBuffer {
     /**
      * @param size Amount of bytes for reserve.
      * @param safe Safe mode.
+     * @throws IgniteException If the given {@code size} is greater then capacity of this buffer.
      */
     private WriteSegment offer0(int size, boolean safe) {
         if (size > cap)
-            throw new IllegalArgumentException("Record is too long [capacity=" + cap + ", size=" + size + ']');
+            throw new IgniteException("Record is too long [capacity=" + cap + ", size=" + size + ']');
 
         for (;;) {
             if (!waitForConsumer) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
index 1eaffde8b12..5dc2ec155f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
@@ -226,11 +226,17 @@ class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle
 
             SegmentedRingByteBuffer.WriteSegment seg;
 
-            // Buffer can be in open state in case of resuming with different serializer version.
-            if (rec.type() == SWITCH_SEGMENT_RECORD && !resume)
-                seg = buf.offerSafe(rec.size());
-            else
-                seg = buf.offer(rec.size());
+            try {
+                // Buffer can be in open state in case of resuming with different serializer version.
+                if (rec.type() == SWITCH_SEGMENT_RECORD && !resume)
+                    seg = buf.offerSafe(rec.size());
+                else
+                    seg = buf.offer(rec.size());
+            }
+            catch (IgniteException e) {
+                // WAL record size is greater than the buffer's capacity.
+                throw new IgniteCheckedException(e);
+            }
 
             WALPointer ptr = null;
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java
index 1c6f4fca50e..f7c3c86c089 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java
@@ -89,6 +89,7 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest {
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
         ccfg.setRebalanceMode(SYNC);
         ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+        ccfg.setBackups(1);
 
         CacheConfiguration ccfg2 = new CacheConfiguration("non-primitive");
 
@@ -99,6 +100,7 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest {
         ccfg2.setWriteSynchronizationMode(FULL_SYNC);
         ccfg2.setRebalanceMode(SYNC);
         ccfg2.setAffinity(new RendezvousAffinityFunction(false, 32));
+        ccfg2.setBackups(1);
 
         CacheConfiguration ccfg3 = new CacheConfiguration("large");
 
@@ -109,6 +111,7 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest {
         ccfg3.setWriteSynchronizationMode(FULL_SYNC);
         ccfg3.setRebalanceMode(SYNC);
         ccfg3.setAffinity(new RendezvousAffinityFunction(false, 32));
+        ccfg3.setBackups(1);
 
         CacheConfiguration ccfg4 = new CacheConfiguration("tiny");
 
@@ -116,6 +119,7 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest {
         ccfg4.setWriteSynchronizationMode(FULL_SYNC);
         ccfg4.setRebalanceMode(SYNC);
         ccfg4.setAffinity(new RendezvousAffinityFunction(1, null));
+        ccfg4.setBackups(1);
 
         CacheConfiguration ccfg5 = new CacheConfiguration("atomic");
 
@@ -126,6 +130,7 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest {
         ccfg5.setWriteSynchronizationMode(FULL_SYNC);
         ccfg5.setRebalanceMode(SYNC);
         ccfg5.setAffinity(new RendezvousAffinityFunction(false, 32));
+        ccfg5.setBackups(1);
 
         if (!client)
             cfg.setCacheConfiguration(ccfg, ccfg2, ccfg3, ccfg4, ccfg5);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java
index bb018d387fc..05ee63dd098 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java
@@ -27,18 +27,22 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.failure.FailureHandler;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
@@ -52,6 +56,12 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
 
+import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_SEGMENT_SIZE;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.is;
+
 /**
  *
  */
@@ -59,6 +69,12 @@ public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest {
     /** */
     private static final int KEYS_COUNT = SF.applyLB(10_000, 2_000);
 
+    /** Index of Ignite node with a reduced WAL buffer size. */
+    private static final int smallWalBufSizeNodeIdx = 0;
+
+    /** Set of nodes that indicates system critical failure on a particular node. */
+    private final Set<String> failedNodes = new ConcurrentSkipListSet<>();
+
     /**
      * @return Ignite instance for testing.
      */
@@ -69,6 +85,27 @@ public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest {
         return grid(0);
     }
 
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (getTestIgniteInstanceIndex(gridName) == smallWalBufSizeNodeIdx) {
+            cfg.getDataStorageConfiguration()
+                .setWalBufferSize(DFLT_WAL_SEGMENT_SIZE / 4)
+                .setWalSegmentSize(DFLT_WAL_SEGMENT_SIZE / 4);
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+        return (ignite, failureCtx) -> {
+            failedNodes.add(ignite.name());
+            return true;
+        };
+    }
+
     /**
      * @return Cache for testing.
      * @throws Exception If failed.
@@ -273,6 +310,58 @@ public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest {
         assertNull(cache1.get(1));
     }
 
+    /**
+     * Tests that putting a large entry, which size is greater than WAL buffer/segment size, results in CacheException.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPutLargeEntry() throws Exception {
+        assertTrue(
+            "Primary key should correspond to the node with small wal buffer size.",
+            smallWalBufSizeNodeIdx == 0);
+
+        IgniteCache<Integer, byte[]> atomicCache = grid(0).cache("atomic");
+        Integer atomicPrimaryKey = primaryKey(atomicCache);
+
+        // New value which is greater than WAL segment size / WAL buffer.
+        byte[] newVal = new byte[DFLT_WAL_SEGMENT_SIZE / 2];
+
+        assertThrows(
+            log,
+            () -> atomicCache.put(atomicPrimaryKey, newVal),
+            IgniteException.class,
+            null);
+        assertNull("Unexpected non-null value.", atomicCache.get(atomicPrimaryKey));
+        assertTrue("Unexpected system critical error.", failedNodes.isEmpty());
+
+        // Check backup scenario.
+        if (gridCount() > 1) {
+            Integer atomicBackupKey = backupKey(atomicCache);
+            Ignite primaryNode = primaryNode(atomicBackupKey, atomicCache.getName());
+
+            // Primary node should be updated successfully,
+            // however, backup node should fail because of size of the new entry does not allow to write it to WAL.
+            assertThrows(
+                log,
+                () -> atomicCache.put(atomicBackupKey, newVal),
+                IgniteException.class,
+                "Failed to update keys");
+
+            assertThat(
+                "Unexpected value.",
+                primaryNode.cache(atomicCache.getName()).get(atomicBackupKey),
+                is(newVal));
+
+            assertThat(
+                "Failure handler was not triggered on backup node.",
+                failedNodes,
+                hasItem(grid(0).name()));
+
+            assertFalse("Unexpected system critical error(s).", failedNodes.size() > 1);
+        }
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -1075,7 +1164,7 @@ public abstract class IgniteDbPutGetAbstractTest extends IgniteDbAbstractTest {
 
             int cnt = 0;
 
-            for (Cache.Entry<DbKey, DbValue> e : cache0.localEntries()) {
+            for (Cache.Entry<DbKey, DbValue> e : cache0.localEntries(CachePeekMode.PRIMARY)) {
                 cnt++;
 
                 allKeys.add(e.getKey());
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest.java
index d91fce6abeb..7e47604e73a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest.java
@@ -17,27 +17,12 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithIndexingPutGetTest;
 
 /**
  *
  */
 public class IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest extends IgniteDbSingleNodeWithIndexingPutGetTest {
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setDataStorageConfiguration(
-            new DataStorageConfiguration()
-                .setWalMode(WALMode.LOG_ONLY)
-        );
-
-        return cfg;
-    }
-
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         cleanPersistenceDir();