You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2020/08/05 14:24:24 UTC

[GitHub] [ignite] alex-plekhanov commented on a change in pull request #7941: IGNITE-12843

alex-plekhanov commented on a change in pull request #7941:
URL: https://github.com/apache/ignite/pull/7941#discussion_r465676384



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupEncryptionKeys.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.encryption;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.pagemem.wal.record.MasterKeyChangeRecord;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Serves for managing encryption keys and related datastructure located in the heap.
+ */
+class CacheGroupEncryptionKeys {
+    /** Group encryption keys. */
+    private final Map<Integer, List<GroupKey>> grpKeys = new ConcurrentHashMap<>();
+
+    /** WAL segments encrypted with previous encrypted keys, mapped to cache group encryption key identifiers. */
+    private final Map<Long, Map<Integer, Set<Integer>>> trackedWalSegments = new ConcurrentSkipListMap<>();
+
+    /** Encryption spi. */
+    private final EncryptionSpi encSpi;
+
+    /**
+     * @param encSpi Encryption spi.
+     */
+    CacheGroupEncryptionKeys(EncryptionSpi encSpi) {
+        this.encSpi = encSpi;
+    }
+
+    /**
+     * Returns group encryption key.
+     *
+     * @param grpId Cache group ID.
+     * @return Group encryption key with identifier, that was set for writing.
+     */
+    GroupKey get(int grpId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (F.isEmpty(keys))
+            return null;
+
+        return keys.get(0);
+    }
+
+    /**
+     * Returns group encryption key with specified identifier.
+     *
+     * @param grpId Cache group ID.
+     * @param keyId Encryption key ID.
+     * @return Group encryption key.
+     */
+    GroupKey get(int grpId, int keyId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (keys == null)
+            return null;
+
+        for (GroupKey groupKey : keys) {
+            if (groupKey.unsignedId() == keyId)
+                return groupKey;
+        }
+
+        return null;
+    }
+
+    /**
+     * Gets information about existing encryption keys for the specified cache group.
+     *
+     * @param grpId Cache group ID.
+     * @return Map of the key identifier with hash code of encryption key.
+     */
+    Map<Integer, Integer> info(int grpId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (keys == null)
+            return null;
+
+        Map<Integer, Integer> keysInfo = new LinkedHashMap<>();
+
+        for (GroupKey groupKey : keys)
+            keysInfo.put(groupKey.unsignedId(), Arrays.hashCode(U.toBytes(groupKey.key())));
+
+        return keysInfo;
+    }
+
+    /**
+     * @return Local encryption keys.
+     */
+    @Nullable HashMap<Integer, GroupKeyEncrypted> getAll() {
+        if (F.isEmpty(grpKeys))
+            return null;
+
+        HashMap<Integer, GroupKeyEncrypted> Keys = new HashMap<>();

Review comment:
       `Keys` -> `keys`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupEncryptionKeys.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.encryption;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.pagemem.wal.record.MasterKeyChangeRecord;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Serves for managing encryption keys and related datastructure located in the heap.
+ */
+class CacheGroupEncryptionKeys {
+    /** Group encryption keys. */
+    private final Map<Integer, List<GroupKey>> grpKeys = new ConcurrentHashMap<>();
+
+    /** WAL segments encrypted with previous encrypted keys, mapped to cache group encryption key identifiers. */
+    private final Map<Long, Map<Integer, Set<Integer>>> trackedWalSegments = new ConcurrentSkipListMap<>();
+
+    /** Encryption spi. */
+    private final EncryptionSpi encSpi;
+
+    /**
+     * @param encSpi Encryption spi.
+     */
+    CacheGroupEncryptionKeys(EncryptionSpi encSpi) {
+        this.encSpi = encSpi;
+    }
+
+    /**
+     * Returns group encryption key.
+     *
+     * @param grpId Cache group ID.
+     * @return Group encryption key with identifier, that was set for writing.
+     */
+    GroupKey get(int grpId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (F.isEmpty(keys))
+            return null;
+
+        return keys.get(0);
+    }
+
+    /**
+     * Returns group encryption key with specified identifier.
+     *
+     * @param grpId Cache group ID.
+     * @param keyId Encryption key ID.
+     * @return Group encryption key.
+     */
+    GroupKey get(int grpId, int keyId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (keys == null)
+            return null;
+
+        for (GroupKey groupKey : keys) {
+            if (groupKey.unsignedId() == keyId)
+                return groupKey;
+        }
+
+        return null;
+    }
+
+    /**
+     * Gets information about existing encryption keys for the specified cache group.
+     *
+     * @param grpId Cache group ID.
+     * @return Map of the key identifier with hash code of encryption key.
+     */
+    Map<Integer, Integer> info(int grpId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (keys == null)
+            return null;
+
+        Map<Integer, Integer> keysInfo = new LinkedHashMap<>();
+
+        for (GroupKey groupKey : keys)
+            keysInfo.put(groupKey.unsignedId(), Arrays.hashCode(U.toBytes(groupKey.key())));
+
+        return keysInfo;
+    }
+
+    /**
+     * @return Local encryption keys.
+     */
+    @Nullable HashMap<Integer, GroupKeyEncrypted> getAll() {
+        if (F.isEmpty(grpKeys))
+            return null;
+
+        HashMap<Integer, GroupKeyEncrypted> Keys = new HashMap<>();
+
+        for (Map.Entry<Integer, List<GroupKey>> entry : grpKeys.entrySet()) {
+            int grpId = entry.getKey();
+            GroupKey grpKey = entry.getValue().get(0);
+
+            Keys.put(grpId, new GroupKeyEncrypted(grpKey.unsignedId(), encSpi.encryptKey(grpKey.key())));
+        }
+
+        return Keys;
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     *
+     * @return Local encryption keys used for specified cache group.
+     */
+    List<GroupKeyEncrypted> getAll(int grpId) {
+        List<GroupKey> grpKeys = this.grpKeys.get(grpId);
+
+        if (F.isEmpty(grpKeys))
+            return null;
+
+        List<GroupKeyEncrypted> encryptedKeys = new ArrayList<>(grpKeys.size());
+
+        for (GroupKey grpKey : grpKeys)
+            encryptedKeys.add(new GroupKeyEncrypted(grpKey.unsignedId(), encSpi.encryptKey(grpKey.key())));
+
+        return encryptedKeys;
+    }
+
+    /**
+     * Put new encryption key and set it for writing.
+     *
+     * @param grpId Cache group ID.
+     * @param newEncKey New encrypted key for writing.
+     * @return Previous encryption key for writing.
+     */
+    GroupKey put(int grpId, GroupKeyEncrypted newEncKey) {
+        assert newEncKey != null;
+
+        List<GroupKey> keys = grpKeys.computeIfAbsent(grpId, list -> new CopyOnWriteArrayList<>());
+
+        if (keys == null)

Review comment:
       Is it possible?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupEncryptionKeys.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.encryption;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.pagemem.wal.record.MasterKeyChangeRecord;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Serves for managing encryption keys and related datastructure located in the heap.
+ */
+class CacheGroupEncryptionKeys {
+    /** Group encryption keys. */
+    private final Map<Integer, List<GroupKey>> grpKeys = new ConcurrentHashMap<>();
+
+    /** WAL segments encrypted with previous encrypted keys, mapped to cache group encryption key identifiers. */
+    private final Map<Long, Map<Integer, Set<Integer>>> trackedWalSegments = new ConcurrentSkipListMap<>();

Review comment:
       You don't need Map of Maps here. You can use plain structure with flat (walIdx, grpId, keyId) tuple (ConcurrentLinkedQueue for example). This structure will be naturally ordered, since you call reserveWalKey under the lock and ctx.cache().context().wal().currentSegment() always increasing.
   This will simplify a lot all methods where trackedWalSegments is used.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupEncryptionKeys.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.encryption;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.pagemem.wal.record.MasterKeyChangeRecord;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Serves for managing encryption keys and related datastructure located in the heap.
+ */
+class CacheGroupEncryptionKeys {
+    /** Group encryption keys. */
+    private final Map<Integer, List<GroupKey>> grpKeys = new ConcurrentHashMap<>();
+
+    /** WAL segments encrypted with previous encrypted keys, mapped to cache group encryption key identifiers. */
+    private final Map<Long, Map<Integer, Set<Integer>>> trackedWalSegments = new ConcurrentSkipListMap<>();
+
+    /** Encryption spi. */
+    private final EncryptionSpi encSpi;
+
+    /**
+     * @param encSpi Encryption spi.
+     */
+    CacheGroupEncryptionKeys(EncryptionSpi encSpi) {
+        this.encSpi = encSpi;
+    }
+
+    /**
+     * Returns group encryption key.
+     *
+     * @param grpId Cache group ID.
+     * @return Group encryption key with identifier, that was set for writing.
+     */
+    GroupKey get(int grpId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (F.isEmpty(keys))
+            return null;
+
+        return keys.get(0);
+    }
+
+    /**
+     * Returns group encryption key with specified identifier.
+     *
+     * @param grpId Cache group ID.
+     * @param keyId Encryption key ID.
+     * @return Group encryption key.
+     */
+    GroupKey get(int grpId, int keyId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (keys == null)
+            return null;
+
+        for (GroupKey groupKey : keys) {
+            if (groupKey.unsignedId() == keyId)
+                return groupKey;
+        }
+
+        return null;
+    }
+
+    /**
+     * Gets information about existing encryption keys for the specified cache group.
+     *
+     * @param grpId Cache group ID.
+     * @return Map of the key identifier with hash code of encryption key.
+     */
+    Map<Integer, Integer> info(int grpId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (keys == null)
+            return null;
+
+        Map<Integer, Integer> keysInfo = new LinkedHashMap<>();
+
+        for (GroupKey groupKey : keys)
+            keysInfo.put(groupKey.unsignedId(), Arrays.hashCode(U.toBytes(groupKey.key())));
+
+        return keysInfo;
+    }
+
+    /**
+     * @return Local encryption keys.
+     */
+    @Nullable HashMap<Integer, GroupKeyEncrypted> getAll() {
+        if (F.isEmpty(grpKeys))
+            return null;
+
+        HashMap<Integer, GroupKeyEncrypted> Keys = new HashMap<>();
+
+        for (Map.Entry<Integer, List<GroupKey>> entry : grpKeys.entrySet()) {
+            int grpId = entry.getKey();
+            GroupKey grpKey = entry.getValue().get(0);
+
+            Keys.put(grpId, new GroupKeyEncrypted(grpKey.unsignedId(), encSpi.encryptKey(grpKey.key())));
+        }
+
+        return Keys;
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     *
+     * @return Local encryption keys used for specified cache group.
+     */
+    List<GroupKeyEncrypted> getAll(int grpId) {
+        List<GroupKey> grpKeys = this.grpKeys.get(grpId);
+
+        if (F.isEmpty(grpKeys))
+            return null;
+
+        List<GroupKeyEncrypted> encryptedKeys = new ArrayList<>(grpKeys.size());
+
+        for (GroupKey grpKey : grpKeys)
+            encryptedKeys.add(new GroupKeyEncrypted(grpKey.unsignedId(), encSpi.encryptKey(grpKey.key())));
+
+        return encryptedKeys;
+    }
+
+    /**
+     * Put new encryption key and set it for writing.
+     *
+     * @param grpId Cache group ID.
+     * @param newEncKey New encrypted key for writing.
+     * @return Previous encryption key for writing.
+     */
+    GroupKey put(int grpId, GroupKeyEncrypted newEncKey) {
+        assert newEncKey != null;
+
+        List<GroupKey> keys = grpKeys.computeIfAbsent(grpId, list -> new CopyOnWriteArrayList<>());
+
+        if (keys == null)
+            return null;
+
+        GroupKey prevKey = F.first(keys);
+
+        GroupKey newKey = new GroupKey(newEncKey.id(), encSpi.decryptKey(newEncKey.key()));
+
+        keys.add(0, newKey);
+
+        // Remove the duplicate key from the tail of the list if exists.
+        keys.subList(1, keys.size()).remove(newKey);
+
+        return prevKey;
+    }
+
+    /**
+     * Put new unused key.
+     *
+     * @param grpId Cache group ID.
+     * @param newEncKey New encrypted key for writing.
+     */
+    void putUnused(int grpId, GroupKeyEncrypted newEncKey) {
+        grpKeys.get(grpId).add(new GroupKey(newEncKey.id(), encSpi.decryptKey(newEncKey.key())));
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     * @param encryptedKeys Encrypted keys.
+     */
+    void put(int grpId, List<GroupKeyEncrypted> encryptedKeys) {
+        List<GroupKey> keys = new CopyOnWriteArrayList<>();
+
+        for (GroupKeyEncrypted encrKey : encryptedKeys)
+            keys.add(new GroupKey(encrKey.id(), encSpi.decryptKey(encrKey.key())));
+
+        grpKeys.put(grpId, keys);
+    }
+
+    /**
+     * @return Cache group identifiers for which encryption keys are stored.
+     */
+    Set<Integer> groups() {
+        return grpKeys.keySet();
+    }
+
+    /**
+     * Remove encrytion keys associated with the specified cache group.
+     *
+     * @param grpId Cache group ID.
+     */
+    void remove(int grpId) {
+        grpKeys.remove(grpId);
+    }
+
+    /**
+     * Convert encryption keys to WAL logical record that stores encryption keys.
+     */
+    MasterKeyChangeRecord toMasterKeyChangeRecord() {

Review comment:
       I think you should not work with WAL records in this class. Just return reencryptedKeys and wrap into MasterKeyChangeRecord in encryption manager.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupEncryptionKeys.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.encryption;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.pagemem.wal.record.MasterKeyChangeRecord;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Serves for managing encryption keys and related datastructure located in the heap.
+ */
+class CacheGroupEncryptionKeys {
+    /** Group encryption keys. */
+    private final Map<Integer, List<GroupKey>> grpKeys = new ConcurrentHashMap<>();
+
+    /** WAL segments encrypted with previous encrypted keys, mapped to cache group encryption key identifiers. */
+    private final Map<Long, Map<Integer, Set<Integer>>> trackedWalSegments = new ConcurrentSkipListMap<>();
+
+    /** Encryption spi. */
+    private final EncryptionSpi encSpi;
+
+    /**
+     * @param encSpi Encryption spi.
+     */
+    CacheGroupEncryptionKeys(EncryptionSpi encSpi) {
+        this.encSpi = encSpi;
+    }
+
+    /**
+     * Returns group encryption key.
+     *
+     * @param grpId Cache group ID.
+     * @return Group encryption key with identifier, that was set for writing.
+     */
+    GroupKey get(int grpId) {

Review comment:
       Perhaps should be renamed `getDefault` or `getCurrent` or `getActive`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupEncryptionKeys.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.encryption;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.pagemem.wal.record.MasterKeyChangeRecord;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Serves for managing encryption keys and related datastructure located in the heap.
+ */
+class CacheGroupEncryptionKeys {
+    /** Group encryption keys. */
+    private final Map<Integer, List<GroupKey>> grpKeys = new ConcurrentHashMap<>();
+
+    /** WAL segments encrypted with previous encrypted keys, mapped to cache group encryption key identifiers. */
+    private final Map<Long, Map<Integer, Set<Integer>>> trackedWalSegments = new ConcurrentSkipListMap<>();
+
+    /** Encryption spi. */
+    private final EncryptionSpi encSpi;
+
+    /**
+     * @param encSpi Encryption spi.
+     */
+    CacheGroupEncryptionKeys(EncryptionSpi encSpi) {
+        this.encSpi = encSpi;
+    }
+
+    /**
+     * Returns group encryption key.
+     *
+     * @param grpId Cache group ID.
+     * @return Group encryption key with identifier, that was set for writing.
+     */
+    GroupKey get(int grpId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (F.isEmpty(keys))
+            return null;
+
+        return keys.get(0);
+    }
+
+    /**
+     * Returns group encryption key with specified identifier.
+     *
+     * @param grpId Cache group ID.
+     * @param keyId Encryption key ID.
+     * @return Group encryption key.
+     */
+    GroupKey get(int grpId, int keyId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (keys == null)
+            return null;
+
+        for (GroupKey groupKey : keys) {
+            if (groupKey.unsignedId() == keyId)
+                return groupKey;
+        }
+
+        return null;
+    }
+
+    /**
+     * Gets information about existing encryption keys for the specified cache group.
+     *
+     * @param grpId Cache group ID.
+     * @return Map of the key identifier with hash code of encryption key.
+     */
+    Map<Integer, Integer> info(int grpId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (keys == null)
+            return null;
+
+        Map<Integer, Integer> keysInfo = new LinkedHashMap<>();
+
+        for (GroupKey groupKey : keys)
+            keysInfo.put(groupKey.unsignedId(), Arrays.hashCode(U.toBytes(groupKey.key())));
+
+        return keysInfo;
+    }
+
+    /**
+     * @return Local encryption keys.
+     */
+    @Nullable HashMap<Integer, GroupKeyEncrypted> getAll() {
+        if (F.isEmpty(grpKeys))
+            return null;
+
+        HashMap<Integer, GroupKeyEncrypted> Keys = new HashMap<>();
+
+        for (Map.Entry<Integer, List<GroupKey>> entry : grpKeys.entrySet()) {
+            int grpId = entry.getKey();
+            GroupKey grpKey = entry.getValue().get(0);
+
+            Keys.put(grpId, new GroupKeyEncrypted(grpKey.unsignedId(), encSpi.encryptKey(grpKey.key())));
+        }
+
+        return Keys;
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     *
+     * @return Local encryption keys used for specified cache group.
+     */
+    List<GroupKeyEncrypted> getAll(int grpId) {
+        List<GroupKey> grpKeys = this.grpKeys.get(grpId);
+
+        if (F.isEmpty(grpKeys))
+            return null;
+
+        List<GroupKeyEncrypted> encryptedKeys = new ArrayList<>(grpKeys.size());
+
+        for (GroupKey grpKey : grpKeys)
+            encryptedKeys.add(new GroupKeyEncrypted(grpKey.unsignedId(), encSpi.encryptKey(grpKey.key())));
+
+        return encryptedKeys;
+    }
+
+    /**
+     * Put new encryption key and set it for writing.
+     *
+     * @param grpId Cache group ID.
+     * @param newEncKey New encrypted key for writing.
+     * @return Previous encryption key for writing.
+     */
+    GroupKey put(int grpId, GroupKeyEncrypted newEncKey) {

Review comment:
       Perhaps method name should be changed to something like `changeDefaultKey` or `setDefaultKey` or `setCurrentKey` or `setActiveKey`, etc

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupEncryptionKeys.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.encryption;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.pagemem.wal.record.MasterKeyChangeRecord;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Serves for managing encryption keys and related datastructure located in the heap.
+ */
+class CacheGroupEncryptionKeys {
+    /** Group encryption keys. */
+    private final Map<Integer, List<GroupKey>> grpKeys = new ConcurrentHashMap<>();
+
+    /** WAL segments encrypted with previous encrypted keys, mapped to cache group encryption key identifiers. */
+    private final Map<Long, Map<Integer, Set<Integer>>> trackedWalSegments = new ConcurrentSkipListMap<>();
+
+    /** Encryption spi. */
+    private final EncryptionSpi encSpi;
+
+    /**
+     * @param encSpi Encryption spi.
+     */
+    CacheGroupEncryptionKeys(EncryptionSpi encSpi) {
+        this.encSpi = encSpi;
+    }
+
+    /**
+     * Returns group encryption key.
+     *
+     * @param grpId Cache group ID.
+     * @return Group encryption key with identifier, that was set for writing.
+     */
+    GroupKey get(int grpId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (F.isEmpty(keys))
+            return null;
+
+        return keys.get(0);
+    }
+
+    /**
+     * Returns group encryption key with specified identifier.
+     *
+     * @param grpId Cache group ID.
+     * @param keyId Encryption key ID.
+     * @return Group encryption key.
+     */
+    GroupKey get(int grpId, int keyId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (keys == null)
+            return null;
+
+        for (GroupKey groupKey : keys) {
+            if (groupKey.unsignedId() == keyId)
+                return groupKey;
+        }
+
+        return null;
+    }
+
+    /**
+     * Gets information about existing encryption keys for the specified cache group.
+     *
+     * @param grpId Cache group ID.
+     * @return Map of the key identifier with hash code of encryption key.
+     */
+    Map<Integer, Integer> info(int grpId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (keys == null)
+            return null;
+
+        Map<Integer, Integer> keysInfo = new LinkedHashMap<>();
+
+        for (GroupKey groupKey : keys)
+            keysInfo.put(groupKey.unsignedId(), Arrays.hashCode(U.toBytes(groupKey.key())));
+
+        return keysInfo;
+    }
+
+    /**
+     * @return Local encryption keys.
+     */
+    @Nullable HashMap<Integer, GroupKeyEncrypted> getAll() {
+        if (F.isEmpty(grpKeys))
+            return null;
+
+        HashMap<Integer, GroupKeyEncrypted> Keys = new HashMap<>();
+
+        for (Map.Entry<Integer, List<GroupKey>> entry : grpKeys.entrySet()) {
+            int grpId = entry.getKey();
+            GroupKey grpKey = entry.getValue().get(0);
+
+            Keys.put(grpId, new GroupKeyEncrypted(grpKey.unsignedId(), encSpi.encryptKey(grpKey.key())));
+        }
+
+        return Keys;
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     *
+     * @return Local encryption keys used for specified cache group.
+     */
+    List<GroupKeyEncrypted> getAll(int grpId) {
+        List<GroupKey> grpKeys = this.grpKeys.get(grpId);
+
+        if (F.isEmpty(grpKeys))
+            return null;
+
+        List<GroupKeyEncrypted> encryptedKeys = new ArrayList<>(grpKeys.size());
+
+        for (GroupKey grpKey : grpKeys)
+            encryptedKeys.add(new GroupKeyEncrypted(grpKey.unsignedId(), encSpi.encryptKey(grpKey.key())));
+
+        return encryptedKeys;
+    }
+
+    /**
+     * Put new encryption key and set it for writing.
+     *
+     * @param grpId Cache group ID.
+     * @param newEncKey New encrypted key for writing.
+     * @return Previous encryption key for writing.
+     */
+    GroupKey put(int grpId, GroupKeyEncrypted newEncKey) {
+        assert newEncKey != null;
+
+        List<GroupKey> keys = grpKeys.computeIfAbsent(grpId, list -> new CopyOnWriteArrayList<>());
+
+        if (keys == null)
+            return null;
+
+        GroupKey prevKey = F.first(keys);
+
+        GroupKey newKey = new GroupKey(newEncKey.id(), encSpi.decryptKey(newEncKey.key()));
+
+        keys.add(0, newKey);
+
+        // Remove the duplicate key from the tail of the list if exists.
+        keys.subList(1, keys.size()).remove(newKey);
+
+        return prevKey;
+    }
+
+    /**
+     * Put new unused key.
+     *
+     * @param grpId Cache group ID.
+     * @param newEncKey New encrypted key for writing.
+     */
+    void putUnused(int grpId, GroupKeyEncrypted newEncKey) {
+        grpKeys.get(grpId).add(new GroupKey(newEncKey.id(), encSpi.decryptKey(newEncKey.key())));
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     * @param encryptedKeys Encrypted keys.
+     */
+    void put(int grpId, List<GroupKeyEncrypted> encryptedKeys) {
+        List<GroupKey> keys = new CopyOnWriteArrayList<>();
+
+        for (GroupKeyEncrypted encrKey : encryptedKeys)
+            keys.add(new GroupKey(encrKey.id(), encSpi.decryptKey(encrKey.key())));
+
+        grpKeys.put(grpId, keys);
+    }
+
+    /**
+     * @return Cache group identifiers for which encryption keys are stored.
+     */
+    Set<Integer> groups() {
+        return grpKeys.keySet();
+    }
+
+    /**
+     * Remove encrytion keys associated with the specified cache group.
+     *
+     * @param grpId Cache group ID.
+     */
+    void remove(int grpId) {
+        grpKeys.remove(grpId);
+    }
+
+    /**
+     * Convert encryption keys to WAL logical record that stores encryption keys.
+     */
+    MasterKeyChangeRecord toMasterKeyChangeRecord() {
+        List<T3<Integer, Byte, byte[]>> reencryptedKeys = new ArrayList<>();
+
+        for (Map.Entry<Integer, List<GroupKey>> entry : grpKeys.entrySet()) {
+            int grpId = entry.getKey();
+
+            for (GroupKey grpKey : entry.getValue()) {
+                byte keyId = grpKey.id();
+                byte[] encryptedKey = encSpi.encryptKey(grpKey.key());
+
+                reencryptedKeys.add(new T3<>(grpId, keyId, encryptedKey));
+            }
+        }
+
+        return new MasterKeyChangeRecord(encSpi.getMasterKeyName(), reencryptedKeys);
+    }
+
+    /**
+     * Load encryption keys from WAL logical record that stores encryption keys.
+     *
+     * @param rec Logical record that stores encryption keys.
+     */
+    void fromMasterKeyChangeRecord(MasterKeyChangeRecord rec) {
+        for (T3<Integer, Byte, byte[]> entry : rec.getGrpKeys()) {
+            int grpId = entry.get1();
+            int keyId = entry.get2() & 0xff;
+            byte[] key = entry.get3();
+
+            grpKeys.computeIfAbsent(grpId, list ->
+                new CopyOnWriteArrayList<>()).add(new GroupKey(keyId, encSpi.decryptKey(key)));
+        }
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     * @param ids Key identifiers for deletion.
+     * @return {@code True} if the keys have been deleted.
+     */
+    boolean removeKeysById(int grpId, Set<Integer> ids) {
+        return removeKeysById(grpKeys.get(grpId), ids);

Review comment:
       `removeKeysById(List<GroupKey> keys, Set<Integer> ids)` is redundant, can be replaced by `removeIf(key -> ids.contains(key.unsignedId()));`
   Here for example:
   `grpKeys.get(grpId).removeIf(key -> ids.contains(key.unsignedId()));`
   Also, do we need to check `grpKeys.get(grpId)` for `null`?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupEncryptionKeys.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.encryption;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.pagemem.wal.record.MasterKeyChangeRecord;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Serves for managing encryption keys and related datastructure located in the heap.
+ */
+class CacheGroupEncryptionKeys {
+    /** Group encryption keys. */
+    private final Map<Integer, List<GroupKey>> grpKeys = new ConcurrentHashMap<>();
+
+    /** WAL segments encrypted with previous encrypted keys, mapped to cache group encryption key identifiers. */
+    private final Map<Long, Map<Integer, Set<Integer>>> trackedWalSegments = new ConcurrentSkipListMap<>();
+
+    /** Encryption spi. */
+    private final EncryptionSpi encSpi;
+
+    /**
+     * @param encSpi Encryption spi.
+     */
+    CacheGroupEncryptionKeys(EncryptionSpi encSpi) {
+        this.encSpi = encSpi;
+    }
+
+    /**
+     * Returns group encryption key.
+     *
+     * @param grpId Cache group ID.
+     * @return Group encryption key with identifier, that was set for writing.
+     */
+    GroupKey get(int grpId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (F.isEmpty(keys))
+            return null;
+
+        return keys.get(0);
+    }
+
+    /**
+     * Returns group encryption key with specified identifier.
+     *
+     * @param grpId Cache group ID.
+     * @param keyId Encryption key ID.
+     * @return Group encryption key.
+     */
+    GroupKey get(int grpId, int keyId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (keys == null)
+            return null;
+
+        for (GroupKey groupKey : keys) {
+            if (groupKey.unsignedId() == keyId)
+                return groupKey;
+        }
+
+        return null;
+    }
+
+    /**
+     * Gets information about existing encryption keys for the specified cache group.
+     *
+     * @param grpId Cache group ID.
+     * @return Map of the key identifier with hash code of encryption key.
+     */
+    Map<Integer, Integer> info(int grpId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (keys == null)
+            return null;
+
+        Map<Integer, Integer> keysInfo = new LinkedHashMap<>();
+
+        for (GroupKey groupKey : keys)
+            keysInfo.put(groupKey.unsignedId(), Arrays.hashCode(U.toBytes(groupKey.key())));
+
+        return keysInfo;
+    }
+
+    /**
+     * @return Local encryption keys.
+     */
+    @Nullable HashMap<Integer, GroupKeyEncrypted> getAll() {
+        if (F.isEmpty(grpKeys))
+            return null;
+
+        HashMap<Integer, GroupKeyEncrypted> Keys = new HashMap<>();
+
+        for (Map.Entry<Integer, List<GroupKey>> entry : grpKeys.entrySet()) {
+            int grpId = entry.getKey();
+            GroupKey grpKey = entry.getValue().get(0);
+
+            Keys.put(grpId, new GroupKeyEncrypted(grpKey.unsignedId(), encSpi.encryptKey(grpKey.key())));
+        }
+
+        return Keys;
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     *
+     * @return Local encryption keys used for specified cache group.
+     */
+    List<GroupKeyEncrypted> getAll(int grpId) {
+        List<GroupKey> grpKeys = this.grpKeys.get(grpId);
+
+        if (F.isEmpty(grpKeys))
+            return null;
+
+        List<GroupKeyEncrypted> encryptedKeys = new ArrayList<>(grpKeys.size());
+
+        for (GroupKey grpKey : grpKeys)
+            encryptedKeys.add(new GroupKeyEncrypted(grpKey.unsignedId(), encSpi.encryptKey(grpKey.key())));
+
+        return encryptedKeys;
+    }
+
+    /**
+     * Put new encryption key and set it for writing.
+     *
+     * @param grpId Cache group ID.
+     * @param newEncKey New encrypted key for writing.
+     * @return Previous encryption key for writing.
+     */
+    GroupKey put(int grpId, GroupKeyEncrypted newEncKey) {
+        assert newEncKey != null;
+
+        List<GroupKey> keys = grpKeys.computeIfAbsent(grpId, list -> new CopyOnWriteArrayList<>());
+
+        if (keys == null)
+            return null;
+
+        GroupKey prevKey = F.first(keys);
+
+        GroupKey newKey = new GroupKey(newEncKey.id(), encSpi.decryptKey(newEncKey.key()));
+
+        keys.add(0, newKey);
+
+        // Remove the duplicate key from the tail of the list if exists.
+        keys.subList(1, keys.size()).remove(newKey);
+
+        return prevKey;
+    }
+
+    /**
+     * Put new unused key.
+     *
+     * @param grpId Cache group ID.
+     * @param newEncKey New encrypted key for writing.
+     */
+    void putUnused(int grpId, GroupKeyEncrypted newEncKey) {

Review comment:
       `addKey`?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupEncryptionKeys.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.encryption;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.pagemem.wal.record.MasterKeyChangeRecord;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Serves for managing encryption keys and related datastructure located in the heap.
+ */
+class CacheGroupEncryptionKeys {
+    /** Group encryption keys. */
+    private final Map<Integer, List<GroupKey>> grpKeys = new ConcurrentHashMap<>();
+
+    /** WAL segments encrypted with previous encrypted keys, mapped to cache group encryption key identifiers. */
+    private final Map<Long, Map<Integer, Set<Integer>>> trackedWalSegments = new ConcurrentSkipListMap<>();
+
+    /** Encryption spi. */
+    private final EncryptionSpi encSpi;
+
+    /**
+     * @param encSpi Encryption spi.
+     */
+    CacheGroupEncryptionKeys(EncryptionSpi encSpi) {
+        this.encSpi = encSpi;
+    }
+
+    /**
+     * Returns group encryption key.
+     *
+     * @param grpId Cache group ID.
+     * @return Group encryption key with identifier, that was set for writing.
+     */
+    GroupKey get(int grpId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (F.isEmpty(keys))
+            return null;
+
+        return keys.get(0);
+    }
+
+    /**
+     * Returns group encryption key with specified identifier.
+     *
+     * @param grpId Cache group ID.
+     * @param keyId Encryption key ID.
+     * @return Group encryption key.
+     */
+    GroupKey get(int grpId, int keyId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (keys == null)
+            return null;
+
+        for (GroupKey groupKey : keys) {
+            if (groupKey.unsignedId() == keyId)
+                return groupKey;
+        }
+
+        return null;
+    }
+
+    /**
+     * Gets information about existing encryption keys for the specified cache group.
+     *
+     * @param grpId Cache group ID.
+     * @return Map of the key identifier with hash code of encryption key.
+     */
+    Map<Integer, Integer> info(int grpId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (keys == null)
+            return null;
+
+        Map<Integer, Integer> keysInfo = new LinkedHashMap<>();
+
+        for (GroupKey groupKey : keys)
+            keysInfo.put(groupKey.unsignedId(), Arrays.hashCode(U.toBytes(groupKey.key())));
+
+        return keysInfo;
+    }
+
+    /**
+     * @return Local encryption keys.
+     */
+    @Nullable HashMap<Integer, GroupKeyEncrypted> getAll() {
+        if (F.isEmpty(grpKeys))
+            return null;
+
+        HashMap<Integer, GroupKeyEncrypted> Keys = new HashMap<>();
+
+        for (Map.Entry<Integer, List<GroupKey>> entry : grpKeys.entrySet()) {
+            int grpId = entry.getKey();
+            GroupKey grpKey = entry.getValue().get(0);
+
+            Keys.put(grpId, new GroupKeyEncrypted(grpKey.unsignedId(), encSpi.encryptKey(grpKey.key())));
+        }
+
+        return Keys;
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     *
+     * @return Local encryption keys used for specified cache group.
+     */
+    List<GroupKeyEncrypted> getAll(int grpId) {
+        List<GroupKey> grpKeys = this.grpKeys.get(grpId);
+
+        if (F.isEmpty(grpKeys))
+            return null;
+
+        List<GroupKeyEncrypted> encryptedKeys = new ArrayList<>(grpKeys.size());
+
+        for (GroupKey grpKey : grpKeys)
+            encryptedKeys.add(new GroupKeyEncrypted(grpKey.unsignedId(), encSpi.encryptKey(grpKey.key())));
+
+        return encryptedKeys;
+    }
+
+    /**
+     * Put new encryption key and set it for writing.
+     *
+     * @param grpId Cache group ID.
+     * @param newEncKey New encrypted key for writing.
+     * @return Previous encryption key for writing.
+     */
+    GroupKey put(int grpId, GroupKeyEncrypted newEncKey) {
+        assert newEncKey != null;
+
+        List<GroupKey> keys = grpKeys.computeIfAbsent(grpId, list -> new CopyOnWriteArrayList<>());
+
+        if (keys == null)
+            return null;
+
+        GroupKey prevKey = F.first(keys);
+
+        GroupKey newKey = new GroupKey(newEncKey.id(), encSpi.decryptKey(newEncKey.key()));
+
+        keys.add(0, newKey);
+
+        // Remove the duplicate key from the tail of the list if exists.
+        keys.subList(1, keys.size()).remove(newKey);
+
+        return prevKey;
+    }
+
+    /**
+     * Put new unused key.
+     *
+     * @param grpId Cache group ID.
+     * @param newEncKey New encrypted key for writing.
+     */
+    void putUnused(int grpId, GroupKeyEncrypted newEncKey) {
+        grpKeys.get(grpId).add(new GroupKey(newEncKey.id(), encSpi.decryptKey(newEncKey.key())));
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     * @param encryptedKeys Encrypted keys.
+     */
+    void put(int grpId, List<GroupKeyEncrypted> encryptedKeys) {

Review comment:
       `setGroupKeys`?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupEncryptionKeys.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.encryption;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.pagemem.wal.record.MasterKeyChangeRecord;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Serves for managing encryption keys and related datastructure located in the heap.
+ */
+class CacheGroupEncryptionKeys {
+    /** Group encryption keys. */
+    private final Map<Integer, List<GroupKey>> grpKeys = new ConcurrentHashMap<>();
+
+    /** WAL segments encrypted with previous encrypted keys, mapped to cache group encryption key identifiers. */
+    private final Map<Long, Map<Integer, Set<Integer>>> trackedWalSegments = new ConcurrentSkipListMap<>();
+
+    /** Encryption spi. */
+    private final EncryptionSpi encSpi;
+
+    /**
+     * @param encSpi Encryption spi.
+     */
+    CacheGroupEncryptionKeys(EncryptionSpi encSpi) {
+        this.encSpi = encSpi;
+    }
+
+    /**
+     * Returns group encryption key.
+     *
+     * @param grpId Cache group ID.
+     * @return Group encryption key with identifier, that was set for writing.
+     */
+    GroupKey get(int grpId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (F.isEmpty(keys))
+            return null;
+
+        return keys.get(0);
+    }
+
+    /**
+     * Returns group encryption key with specified identifier.
+     *
+     * @param grpId Cache group ID.
+     * @param keyId Encryption key ID.
+     * @return Group encryption key.
+     */
+    GroupKey get(int grpId, int keyId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (keys == null)
+            return null;
+
+        for (GroupKey groupKey : keys) {
+            if (groupKey.unsignedId() == keyId)
+                return groupKey;
+        }
+
+        return null;
+    }
+
+    /**
+     * Gets information about existing encryption keys for the specified cache group.
+     *
+     * @param grpId Cache group ID.
+     * @return Map of the key identifier with hash code of encryption key.
+     */
+    Map<Integer, Integer> info(int grpId) {
+        List<GroupKey> keys = grpKeys.get(grpId);
+
+        if (keys == null)
+            return null;
+
+        Map<Integer, Integer> keysInfo = new LinkedHashMap<>();
+
+        for (GroupKey groupKey : keys)
+            keysInfo.put(groupKey.unsignedId(), Arrays.hashCode(U.toBytes(groupKey.key())));
+
+        return keysInfo;
+    }
+
+    /**
+     * @return Local encryption keys.
+     */
+    @Nullable HashMap<Integer, GroupKeyEncrypted> getAll() {
+        if (F.isEmpty(grpKeys))
+            return null;
+
+        HashMap<Integer, GroupKeyEncrypted> Keys = new HashMap<>();
+
+        for (Map.Entry<Integer, List<GroupKey>> entry : grpKeys.entrySet()) {
+            int grpId = entry.getKey();
+            GroupKey grpKey = entry.getValue().get(0);
+
+            Keys.put(grpId, new GroupKeyEncrypted(grpKey.unsignedId(), encSpi.encryptKey(grpKey.key())));
+        }
+
+        return Keys;
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     *
+     * @return Local encryption keys used for specified cache group.
+     */
+    List<GroupKeyEncrypted> getAll(int grpId) {
+        List<GroupKey> grpKeys = this.grpKeys.get(grpId);
+
+        if (F.isEmpty(grpKeys))
+            return null;
+
+        List<GroupKeyEncrypted> encryptedKeys = new ArrayList<>(grpKeys.size());
+
+        for (GroupKey grpKey : grpKeys)
+            encryptedKeys.add(new GroupKeyEncrypted(grpKey.unsignedId(), encSpi.encryptKey(grpKey.key())));
+
+        return encryptedKeys;
+    }
+
+    /**
+     * Put new encryption key and set it for writing.
+     *
+     * @param grpId Cache group ID.
+     * @param newEncKey New encrypted key for writing.
+     * @return Previous encryption key for writing.
+     */
+    GroupKey put(int grpId, GroupKeyEncrypted newEncKey) {
+        assert newEncKey != null;
+
+        List<GroupKey> keys = grpKeys.computeIfAbsent(grpId, list -> new CopyOnWriteArrayList<>());
+
+        if (keys == null)
+            return null;
+
+        GroupKey prevKey = F.first(keys);
+
+        GroupKey newKey = new GroupKey(newEncKey.id(), encSpi.decryptKey(newEncKey.key()));
+
+        keys.add(0, newKey);
+
+        // Remove the duplicate key from the tail of the list if exists.
+        keys.subList(1, keys.size()).remove(newKey);
+
+        return prevKey;
+    }
+
+    /**
+     * Put new unused key.
+     *
+     * @param grpId Cache group ID.
+     * @param newEncKey New encrypted key for writing.
+     */
+    void putUnused(int grpId, GroupKeyEncrypted newEncKey) {
+        grpKeys.get(grpId).add(new GroupKey(newEncKey.id(), encSpi.decryptKey(newEncKey.key())));
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     * @param encryptedKeys Encrypted keys.
+     */
+    void put(int grpId, List<GroupKeyEncrypted> encryptedKeys) {
+        List<GroupKey> keys = new CopyOnWriteArrayList<>();
+
+        for (GroupKeyEncrypted encrKey : encryptedKeys)
+            keys.add(new GroupKey(encrKey.id(), encSpi.decryptKey(encrKey.key())));
+
+        grpKeys.put(grpId, keys);
+    }
+
+    /**
+     * @return Cache group identifiers for which encryption keys are stored.
+     */
+    Set<Integer> groups() {
+        return grpKeys.keySet();
+    }
+
+    /**
+     * Remove encrytion keys associated with the specified cache group.
+     *
+     * @param grpId Cache group ID.
+     */
+    void remove(int grpId) {
+        grpKeys.remove(grpId);
+    }
+
+    /**
+     * Convert encryption keys to WAL logical record that stores encryption keys.
+     */
+    MasterKeyChangeRecord toMasterKeyChangeRecord() {
+        List<T3<Integer, Byte, byte[]>> reencryptedKeys = new ArrayList<>();
+
+        for (Map.Entry<Integer, List<GroupKey>> entry : grpKeys.entrySet()) {
+            int grpId = entry.getKey();
+
+            for (GroupKey grpKey : entry.getValue()) {
+                byte keyId = grpKey.id();
+                byte[] encryptedKey = encSpi.encryptKey(grpKey.key());
+
+                reencryptedKeys.add(new T3<>(grpId, keyId, encryptedKey));
+            }
+        }
+
+        return new MasterKeyChangeRecord(encSpi.getMasterKeyName(), reencryptedKeys);
+    }
+
+    /**
+     * Load encryption keys from WAL logical record that stores encryption keys.
+     *
+     * @param rec Logical record that stores encryption keys.
+     */
+    void fromMasterKeyChangeRecord(MasterKeyChangeRecord rec) {

Review comment:
       Pass rec.getGrpKeys() to this method without WAL record (method also should be renamed of course).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org