You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2020/12/07 11:47:49 UTC

[ignite] branch master updated: IGNITE-13812 Fixed possible ClassCastException on checkpoint start with disabled WAL. - Fixes #8540.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e8b9170  IGNITE-13812 Fixed possible ClassCastException on checkpoint start with disabled WAL. - Fixes #8540.
e8b9170 is described below

commit e8b9170a07f0cdbbd8afb2a06553eb9288465fd3
Author: ibessonov <be...@gmail.com>
AuthorDate: Mon Dec 7 14:16:30 2020 +0300

    IGNITE-13812 Fixed possible ClassCastException on checkpoint start with disabled WAL. - Fixes #8540.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../persistence/checkpoint/CheckpointEntry.java    |  5 +-
 .../persistence/checkpoint/CheckpointHistory.java  | 63 +++++++++++++++-------
 .../checkpoint/CheckpointMarkersStorage.java       |  6 +--
 3 files changed, 50 insertions(+), 24 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java
index 670723b..eef8dac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java
@@ -355,9 +355,10 @@ public class CheckpointEntry {
 
                         grpStates = remap(stateRec);
                     }
-                    else
-                        initEx = new IgniteCheckedException(
+                    else {
+                        throw new IgniteCheckedException(
                             "Failed to find checkpoint record at the given WAL pointer: " + ptr);
+                    }
                 }
                 catch (IgniteCheckedException e) {
                     initEx = e;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
index 999eba4..1112579 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
@@ -36,6 +36,7 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.CacheState;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.util.lang.IgniteThrowableBiPredicate;
@@ -195,9 +196,10 @@ public class CheckpointHistory {
      * finished yet.
      *
      * @param entry Entry to add.
+     * @param cacheStates Cache states map.
      */
-    public void addCheckpoint(CheckpointEntry entry) {
-        addCpToEarliestCpMap(entry);
+    public void addCheckpoint(CheckpointEntry entry, Map<Integer, CacheState> cacheStates) {
+        addCpCacheStatesToEarliestCpMap(entry, cacheStates);
 
         histMap.put(entry.timestamp(), entry);
     }
@@ -232,7 +234,7 @@ public class CheckpointHistory {
                     iter.remove();
             }
 
-            addCpToEarliestCpMap(entry);
+            addCpGroupStatesToEarliestCpMap(entry, states);
         }
         catch (IgniteCheckedException ex) {
             U.warn(log, "Failed to process checkpoint: " + (entry != null ? entry : "none"), ex);
@@ -261,32 +263,57 @@ public class CheckpointHistory {
      * Add last checkpoint to map of the earliest checkpoints.
      *
      * @param entry Checkpoint entry.
+     * @param cacheStates Cache states map.
      */
-    private void addCpToEarliestCpMap(CheckpointEntry entry) {
-        try {
-            Map<Integer, CheckpointEntry.GroupState> states = entry.groupState(wal);
-
-            for (Integer grpId : states.keySet()) {
-                CheckpointEntry.GroupState grpState = states.get(grpId);
+    private void addCpCacheStatesToEarliestCpMap(CheckpointEntry entry, Map<Integer, CacheState> cacheStates) {
+        for (Integer grpId : cacheStates.keySet()) {
+            CacheState cacheState = cacheStates.get(grpId);
 
-                for (int pIdx = 0; pIdx < grpState.size(); pIdx++) {
-                    int part = grpState.getPartitionByIndex(pIdx);
+            for (int pIdx = 0; pIdx < cacheState.size(); pIdx++) {
+                int part = cacheState.partitionByIndex(pIdx);
 
-                    GroupPartitionId grpPartKey = new GroupPartitionId(grpId, part);
+                GroupPartitionId grpPartKey = new GroupPartitionId(grpId, part);
 
-                    if (!earliestCp.containsKey(grpPartKey))
-                        earliestCp.put(grpPartKey, entry);
-                }
+                addPartitionToEarliestCheckpoints(grpPartKey, entry);
             }
         }
-        catch (IgniteCheckedException ex) {
-            U.warn(log, "Failed to process checkpoint: " + (entry != null ? entry : "none"), ex);
+    }
 
-            earliestCp.clear();
+    /**
+     * Add last checkpoint to map of the earliest checkpoints.
+     *
+     * @param entry Checkpoint entry.
+     * @param cacheGrpStates Group states map.
+     */
+    private void addCpGroupStatesToEarliestCpMap(
+        CheckpointEntry entry,
+        Map<Integer, CheckpointEntry.GroupState> cacheGrpStates
+    ) {
+        for (Integer grpId : cacheGrpStates.keySet()) {
+            CheckpointEntry.GroupState grpState = cacheGrpStates.get(grpId);
+
+            for (int pIdx = 0; pIdx < grpState.size(); pIdx++) {
+                int part = grpState.getPartitionByIndex(pIdx);
+
+                GroupPartitionId grpPartKey = new GroupPartitionId(grpId, part);
+
+                addPartitionToEarliestCheckpoints(grpPartKey, entry);
+            }
         }
     }
 
     /**
+     * Add entry to earliest checkpoint map. Ignore is such key is already present.
+     *
+     * @param grpPartKey Key that consists of cache group id and partition index.
+     * @param entry Checkpoint entry.
+     */
+    private void addPartitionToEarliestCheckpoints(GroupPartitionId grpPartKey, CheckpointEntry entry) {
+        if (!earliestCp.containsKey(grpPartKey))
+            earliestCp.put(grpPartKey, entry);
+    }
+
+    /**
      * @return {@code true} if there is space for next checkpoint.
      */
     public boolean hasSpace() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMarkersStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMarkersStorage.java
index 8b83c84..b421a33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMarkersStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMarkersStorage.java
@@ -328,9 +328,7 @@ public class CheckpointMarkersStorage {
 
         Map<Integer, CacheState> cacheGrpStates = null;
 
-        // Do not hold groups state in-memory if there is no space in the checkpoint history to prevent possible OOM.
-        // In this case the actual group states will be readed from WAL by demand.
-        if (rec != null && cpHistory.hasSpace())
+        if (rec != null)
             cacheGrpStates = rec.cacheGroupStates();
 
         return new CheckpointEntry(cpTs, ptr, cpId, cacheGrpStates);
@@ -426,7 +424,7 @@ public class CheckpointMarkersStorage {
         );
 
         if (type == CheckpointEntryType.START)
-            cpHistory.addCheckpoint(entry);
+            cpHistory.addCheckpoint(entry, rec.cacheGroupStates());
 
         writeCheckpointEntry(tmpWriteBuf, entry, type, skipSync);