You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/10/01 14:52:39 UTC

[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9321: IGNITE-14744 Partition swap on checkpoint

alex-plekhanov commented on a change in pull request #9321:
URL: https://github.com/apache/ignite/pull/9321#discussion_r715677422



##########
File path: modules/core/src/test/java/org/apache/ignite/platform/PlatformAddArgEntryProcessorBinarizable.java
##########
@@ -19,7 +19,6 @@
 
 import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.MutableEntry;
-

Review comment:
       Nothing changed except this line

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
##########
@@ -100,7 +103,20 @@ public boolean clientOnlyExchange() {
      * @return New caches start requests.
      */
     public Collection<CacheActionData> cacheStartRequests() {
-        return cachesToStart != null ? cachesToStart.values() : Collections.emptyList();
+        return cacheStartRequests((ccfg, uuid) -> true);
+    }
+
+    /**
+     * @param filter Cache start requests filtering predicate.
+     * @return New caches start requests.
+     */
+    public Collection<CacheActionData> cacheStartRequests(BiPredicate<CacheConfiguration<?, ?>, @Nullable UUID> filter) {

Review comment:
       This method seems to be redundant. It is used with filter only once, but introduces performance penalties for usages without filtering. It's better to leave `cacheStartRequests()` without streams creation and do filtering only in `GridQueryProcessor`. Or, at least, don't reuse `cacheStartRequests(filter)` in `cacheStartRequests()`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -594,6 +598,51 @@ public void removeConfigurationChangeListener(BiConsumer<String, File> lsnr) {
         store.truncate(tag);
     }
 
+    /** {@inheritDoc} */
+    @Override public PageStore recreate(int grpId, int partId, int tag, Path src) throws IgniteCheckedException {
+        assert cctx.database().checkpointLockIsHeldByThread();
+        assert src.toFile().exists();
+        assert tag >= 0;
+
+        CacheStoreHolder holder = getHolder(grpId);
+
+        if (holder == null)
+            throw new IgniteCheckedException("Failed to get page store for the given cache ID " +
+                "(cache has not been started): " + grpId);
+
+        CacheGroupDescriptor desc = cctx.cache().cacheGroupDescriptor(grpId);
+        DataRegion region = cctx.database().dataRegion(desc.config().getDataRegionName());
+        PageMetrics metrics = region.metrics().cacheGrpPageMetrics(desc.groupId());
+        FileVersionCheckingFactory factory = getPageStoreFactory(grpId, desc.config().isEncryptionEnabled());
+        FilePageStore pageStore = (FilePageStore)getStore(grpId, partId);
+
+        boolean exists = pageStore.exists();
+
+        if (exists)
+            throw new IgniteCheckedException("Previous partition page store must be truncated first: " + partId);
+
+        if (desc == null)
+            throw new IgniteCheckedException("Cache group with given id doesn't exists: " + grpId);
+
+        try {
+            Files.move(src,
+                getPartitionFilePath(cacheWorkDir(desc.config()), partId),
+                StandardCopyOption.ATOMIC_MOVE);
+
+            // Previous page stores may be used by other processes. The link to the instance of a PageStore available
+            // for may internal components, so the best way to share the 'recreation' status is to close the previous
+            // page store instance and to create a new one.
+            return holder.set(partId,
+                factory.createPageStore(getTypeByPartId(partId),
+                    () -> getPartitionFilePath(desc.config(), partId),
+                    metrics.totalPages()::add,

Review comment:
       Shouldn't we reset add pages to the total pages metric? Let's add a test for such a case.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -771,6 +820,14 @@ private CacheStoreHolder initDir(File cacheWorkDir,
         }
     }
 
+    /**
+     * @param ccfg Cache group configuration.
+     * @param partId Partition id.
+     */
+    @NotNull public Path getPartitionFilePath(CacheConfiguration<?, ?> ccfg, int partId) {

Review comment:
       Can be private. Also can be inlined.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
##########
@@ -67,7 +70,7 @@
      * since for instance, due to the node filter there is no cache data on node.
      */
     @GridToStringInclude
-    private final Map<Integer, Set<Integer>> locParts = new HashMap<>();
+    private transient Map<Integer, Set<Integer>> locParts = new HashMap<>();

Review comment:
       What's wrong with the default serialization?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
##########
@@ -1285,7 +1285,7 @@ private Metas getOrAllocateCacheMetas() throws IgniteCheckedException {
 
         assert locPart != null && locPart.reservations() > 0;
 
-        locPart.dataStore().preload();
+        dataStore(locPart).preload();

Review comment:
       Looks like this is redundant.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
##########
@@ -3397,6 +3408,34 @@ private void assignPartitionSizes(GridDhtPartitionTopology top) {
         top.globalPartSizes(partSizes);
     }
 
+    /**
+     * @param top Topology to reset all states.
+     */
+    private void resetAllPartitionStates(GridDhtPartitionTopology top) {
+        assert crd.isLocal();
+
+        List<List<ClusterNode>> ideal = cctx.affinity().affinity(top.groupId()).idealAssignmentRaw();
+
+        resetStateByCondition(IntStream.range(0, top.partitions()).boxed().collect(Collectors.toMap(part -> part, part -> emptySet())),
+            state -> true,
+            () -> AffinityTopologyVersion.NONE, // The last major affinity version will be used (e.g. node left, node join event).
+            top,
+            emptySet(),
+            (partId, nodeId) -> F.transform(ideal.get(partId), ClusterNode::id).contains(nodeId),

Review comment:
       `F.transform` creates `ArrayList` and copy all elements on each execution, let's avoid this overhead. 
   For example, like this: `(partId, nodeId) -> F.exist(ideal.get(partId), n -> n.id().equals(nodeId)),`

##########
File path: modules/core/src/test/java/org/apache/ignite/platform/PlatformAddArgEntryProcessor.java
##########
@@ -19,7 +19,6 @@
 
 import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.MutableEntry;
-

Review comment:
       Nothing changed except this line

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
##########
@@ -3491,31 +3537,32 @@ else if (cntr == maxCntr.cnt)
      * If anyone of OWNING partitions have a counter less than maximum this partition changes state to MOVING forcibly.
      *
      * @param top Topology.
-     * @param maxCntrs Max counter partiton map.
+     * @param awaitAffVer Topology version to wait late affinity assignment to. If <tt>NONE</tt> is used then
+     * the last topology version which requires affinity re-calculation is used.
      * @param haveHistory Set of partitions witch have historical supplier.
      */
-    private void resetOwnersByCounter(GridDhtPartitionTopology top,
-        Map<Integer, CounterWithNodes> maxCntrs, Set<Integer> haveHistory) {
-        Map<Integer, Set<UUID>> ownersByUpdCounters = U.newHashMap(maxCntrs.size());
-        Map<Integer, Long> partSizes = U.newHashMap(maxCntrs.size());
-
-        for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet()) {
-            ownersByUpdCounters.put(e.getKey(), e.getValue().nodes);
-
-            partSizes.put(e.getKey(), e.getValue().size);
-        }
+    private void resetStateByCondition(
+        Map<Integer, Set<UUID>> partsToReset,
+        Predicate<GridDhtPartitionState> statesToReset,

Review comment:
       IMO `EnumSet` is more readable then `Predicate`, but it's up to you.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
##########
@@ -3397,6 +3408,34 @@ private void assignPartitionSizes(GridDhtPartitionTopology top) {
         top.globalPartSizes(partSizes);
     }
 
+    /**
+     * @param top Topology to reset all states.
+     */
+    private void resetAllPartitionStates(GridDhtPartitionTopology top) {
+        assert crd.isLocal();
+
+        List<List<ClusterNode>> ideal = cctx.affinity().affinity(top.groupId()).idealAssignmentRaw();
+
+        resetStateByCondition(IntStream.range(0, top.partitions()).boxed().collect(Collectors.toMap(part -> part, part -> emptySet())),
+            state -> true,
+            () -> AffinityTopologyVersion.NONE, // The last major affinity version will be used (e.g. node left, node join event).
+            top,
+            emptySet(),
+            (partId, nodeId) -> F.transform(ideal.get(partId), ClusterNode::id).contains(nodeId),
+            IntStream.range(0, top.partitions()).boxed().collect(Collectors.toMap(p -> p, p -> 0L)));
+
+        Collection<ClusterNode> affNodes = cctx.discovery().cacheGroupAffinityNodes(top.groupId(), AffinityTopologyVersion.NONE);
+
+        for (ClusterNode node : affNodes) {
+            for (Map.Entry<Integer, GridDhtPartitionState> e : top.partitions(node.id()).map().entrySet()) {
+                if (e.getValue() == GridDhtPartitionState.MOVING)
+                    continue;
+
+                assert false : "Partitions must be set to MOVING state on all nodes [node=" + node.id() + ", state=" + e + ']';
+            }
+        }

Review comment:
       `if (U.assertionsEnabled()) ...`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -1408,27 +1460,59 @@ public int pageSize() {
      *
      */
     private static class CacheStoreHolder extends AbstractList<PageStore> {
-        /** Index store. */
-        private final PageStore idxStore;
-
         /** Partition stores. */
-        private final PageStore[] partStores;
+        private final AtomicReferenceArray<PageStore> stores;
 
         /**
+         * @param idxStore Index page store.
+         * @param partStores Partition page stores.
          */
         CacheStoreHolder(PageStore idxStore, PageStore[] partStores) {
-            this.idxStore = requireNonNull(idxStore);
-            this.partStores = requireNonNull(partStores);
+            assert idxStore.type() == PageStore.TYPE_IDX;
+
+            int len = requireNonNull(partStores).length;
+
+            PageStore[] arr = Arrays.copyOf(partStores, len + 1);
+            arr[len] = requireNonNull(idxStore);
+
+            stores = new AtomicReferenceArray<>(arr);
+        }
+
+        /** {@inheritDoc} */
+        @Override public PageStore get(int partId) {

Review comment:
       Actually, it's not quite correct. You override `get(int index)` method of `List` class with different semantic (`partId` instead of `index`). This method (and `put` too) is used internally by `List` class (by iterator, for example) and there will be a problem with maximum allowed partitions count (MAX_PARTITION_ID+1 partitions): iterator will try to get partition with index MAX_PARTITION_ID+1 for an INDEX_PARTITION and will fail with the `IllegalArgumentException`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
##########
@@ -930,28 +943,31 @@ public GridDhtLocalPartition getOrCreatePartition(int p) {
         return loc;
     }
 
-    /** {@inheritDoc} */
-    @Override public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException {
+    /**
+     * @param partId Partition id to create.
+     * @param exit Condition to return partition immediately if it's already created.
+     * @param act Post-processing action for created partition if the previous partition exists.
+     * @return Created partition or the exiting partition.
+     */
+    public GridDhtLocalPartition doForcePartitionCreate(
+        int partId,
+        Predicate<GridDhtPartitionState> exit,
+        BiConsumer<GridDhtPartitionState, GridDhtLocalPartition> act

Review comment:
       It's much harder to read the code if there are a lot of lambdas. Why don't use just EnumSet of states and some boolean flag to do `resetUpdateCounter()`?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
##########
@@ -3491,31 +3537,32 @@ else if (cntr == maxCntr.cnt)
      * If anyone of OWNING partitions have a counter less than maximum this partition changes state to MOVING forcibly.
      *
      * @param top Topology.
-     * @param maxCntrs Max counter partiton map.
+     * @param awaitAffVer Topology version to wait late affinity assignment to. If <tt>NONE</tt> is used then
+     * the last topology version which requires affinity re-calculation is used.
      * @param haveHistory Set of partitions witch have historical supplier.
      */
-    private void resetOwnersByCounter(GridDhtPartitionTopology top,
-        Map<Integer, CounterWithNodes> maxCntrs, Set<Integer> haveHistory) {
-        Map<Integer, Set<UUID>> ownersByUpdCounters = U.newHashMap(maxCntrs.size());
-        Map<Integer, Long> partSizes = U.newHashMap(maxCntrs.size());
-
-        for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet()) {
-            ownersByUpdCounters.put(e.getKey(), e.getValue().nodes);
-
-            partSizes.put(e.getKey(), e.getValue().size);
-        }
+    private void resetStateByCondition(
+        Map<Integer, Set<UUID>> partsToReset,
+        Predicate<GridDhtPartitionState> statesToReset,
+        Supplier<AffinityTopologyVersion> awaitAffVer,

Review comment:
       Why do we need `Supplier` here? Why not just `AffinityTopologyVersion`?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
##########
@@ -3876,8 +3893,13 @@ public boolean rebuildIndexOnExchange(int cacheId, GridDhtPartitionsExchangeFutu
         Set<Integer> cacheIds = emptySet();
 
         if (acts != null) {
-            if (!F.isEmpty(acts.cacheStartRequests())) {
-                cacheIds = acts.cacheStartRequests().stream()
+            // THe index rebuild for restoring caches will be explicitly completed under the restore manager,

Review comment:
       `THe` -> `The`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
##########
@@ -289,26 +308,37 @@ public void addCacheToStop(DynamicCacheChangeRequest req, DynamicCacheDescriptor
     void addCacheToResetLostPartitions(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
         assert req.resetLostPartitions() : req;
 
-        cachesToResetLostParts = add(cachesToResetLostParts, req, desc);
+        cachesToResetLostParts = add(cachesToResetLostParts, req, desc, null);
     }
 
     /**
      * @param grpDesc Group descriptor.
      */
-    void addCacheGroupToStart(CacheGroupDescriptor grpDesc) {
+    void addCacheGroupToStart(CacheGroupDescriptor grpDesc, @Nullable UUID ownerId) {
         assert grpDesc != null;
 
         if (cacheGrpsToStart == null)
             cacheGrpsToStart = new ArrayList<>();
 
-        cacheGrpsToStart.add(new CacheGroupActionData(grpDesc));
+        cacheGrpsToStart.add(new CacheGroupActionData(grpDesc, false, ownerId));
     }
 
     /**
      * @return Cache groups to start.
      */
     public List<CacheGroupActionData> cacheGroupsToStart() {
-        return cacheGrpsToStart != null ? cacheGrpsToStart : Collections.<CacheGroupActionData>emptyList();
+        return cacheGroupsToStart((ccfg, uuid) -> true);

Review comment:
       Let's keep this method as is (do not reuse `cacheGroupsToStart`) to avoid extra objects creation by streams.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
##########
@@ -628,7 +628,7 @@ private void removeCacheData(int cacheId) {
     /** {@inheritDoc} */
     @Nullable @Override public CacheDataRow read(GridCacheContext cctx, KeyCacheObject key)
         throws IgniteCheckedException {
-        CacheDataStore dataStore = dataStore(cctx, key);
+        CacheDataStore dataStore = dataStore(cctx.affinity().partition(key), false);

Review comment:
       Changed back?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -1012,15 +1062,33 @@ else if (dir.getName().startsWith(CACHE_GRP_DIR_PREFIX))
         if (files == null)
             return Collections.emptyList();
 
-        return Arrays.stream(dir.listFiles())
+        return Arrays.stream(files)
             .sorted()
             .filter(File::isDirectory)
-            .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX) ||
-                f.getName().equals(MetaStorage.METASTORAGE_DIR_NAME))
+            .filter(f -> CACHE_DIR_FILTER.test(f.getName()))
             .filter(f -> names.test(cacheGroupName(f)))
             .collect(Collectors.toList());
     }
 
+    /**
+     * @param dir Directory to check.
+     * @param grpId Cache group id
+     * @return Files that match cache or cache group pattern.
+     */
+    public static File cacheDirectory(File dir, int grpId) {
+        File[] files = dir.listFiles();
+
+        if (files == null)
+            return null;
+
+        return Arrays.stream(files)
+            .filter(File::isDirectory)
+            .filter(f -> CACHE_DIR_FILTER.test(f.getName()))

Review comment:
       Do we really need metastorage directory here?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -594,6 +598,51 @@ public void removeConfigurationChangeListener(BiConsumer<String, File> lsnr) {
         store.truncate(tag);
     }
 
+    /** {@inheritDoc} */
+    @Override public PageStore recreate(int grpId, int partId, int tag, Path src) throws IgniteCheckedException {

Review comment:
       Perhaps not a relevant method name. At least javadoc should be added to note what exactly is recreated.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -133,6 +135,12 @@
     /** */
     public static final String CACHE_GRP_DIR_PREFIX = "cacheGroup-";
 
+    /** */
+    public static final Predicate<String> CACHE_DIR_FILTER = dirName ->

Review comment:
       Since metastorage is not a cache, perhaps it's better to name this predicate something like 'DATA_DIR_FILTER'?
   Let's use `Predicate<File>`, but not `Predicate<String>`, to avoid another nesting level.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -594,6 +598,51 @@ public void removeConfigurationChangeListener(BiConsumer<String, File> lsnr) {
         store.truncate(tag);
     }
 
+    /** {@inheritDoc} */
+    @Override public PageStore recreate(int grpId, int partId, int tag, Path src) throws IgniteCheckedException {
+        assert cctx.database().checkpointLockIsHeldByThread();
+        assert src.toFile().exists();
+        assert tag >= 0;
+
+        CacheStoreHolder holder = getHolder(grpId);
+
+        if (holder == null)

Review comment:
       `{ ... }`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
##########
@@ -3491,31 +3537,32 @@ else if (cntr == maxCntr.cnt)
      * If anyone of OWNING partitions have a counter less than maximum this partition changes state to MOVING forcibly.
      *
      * @param top Topology.
-     * @param maxCntrs Max counter partiton map.
+     * @param awaitAffVer Topology version to wait late affinity assignment to. If <tt>NONE</tt> is used then
+     * the last topology version which requires affinity re-calculation is used.
      * @param haveHistory Set of partitions witch have historical supplier.

Review comment:
       witch -> which

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+abstract class AbstractSnapshotMessage implements Message {
+    /** Unique request id. */
+    private String rqId;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    protected AbstractSnapshotMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param rqId Unique snapshot name.
+     */
+    protected AbstractSnapshotMessage(String rqId) {
+        assert U.alphanumericUnderscore(rqId) : rqId;
+
+        this.rqId = rqId;
+    }
+
+    /**
+     * @return Unique snapshot name.
+     */
+    public String requestId() {

Review comment:
       Why method name and javadoc is different? Can we rename field/methods to `snapshotName`?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -987,14 +1044,7 @@ public void readCacheConfigurations(File dir, Map<String, StoredCacheData> ccfgs
             if (conf.exists() && conf.length() > 0) {
                 StoredCacheData cacheData = readCacheData(conf);
 
-                String cacheName = cacheData.config().getName();
-
-                if (!ccfgs.containsKey(cacheName))
-                    ccfgs.put(cacheName, cacheData);
-                else {
-                    U.warn(log, "Cache with name=" + cacheName + " is already registered, skipping config file "

Review comment:
       Shouldn't we keep this warning?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
##########
@@ -155,7 +158,58 @@ public int pageSize() {
      * saved on the local node because some of them may be skipped due to cache node filter).
      */
     public Map<Integer, Set<Integer>> partitions() {
-        return locParts;
+        return locParts.entrySet().stream().
+            collect(Collectors.toMap(Map.Entry::getKey,
+                e -> new HashSet<>(e.getValue())));
+    }
+
+    /** Save the state of this <tt>HashMap</tt> partitions and cache groups to a stream. */
+    private void writeObject(java.io.ObjectOutputStream s)
+        throws java.io.IOException {
+        // Write out any hidden serialization.
+        s.defaultWriteObject();
+
+        // Write out size of map.
+        s.writeInt(locParts.size());
+
+        // Write out all elements in the proper order.
+        for (Map.Entry<Integer, Set<Integer>> e : locParts.entrySet()) {
+            s.writeInt(e.getKey());
+            s.writeInt(e.getValue().size());
+
+            for (Integer partId : e.getValue())
+                s.writeInt(partId);
+        }
+    }
+
+    /** Reconstitute the <tt>HashMap</tt> instance of partitions and cache groups from a stream. */
+    private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException {
+        // Read in any hidden serialization.
+        s.defaultReadObject();
+
+        // Read size and verify non-negative.
+        int size = s.readInt();
+
+        if (size < 0)
+            throw new InvalidObjectException("Illegal size: " + size);
+
+        locParts = new HashMap<>(size);
+
+        // Read in all elements in the proper order.
+        for (int i = 0; i < size; i++) {
+            int grpId = s.readInt();
+            int total = s.readInt();
+
+            if (total < 0)
+                throw new InvalidObjectException("Illegal size: " + total);
+
+            Set<Integer> parts = new HashSet<>(total);

Review comment:
       `U.newHashSet(size)`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
##########
@@ -155,7 +158,58 @@ public int pageSize() {
      * saved on the local node because some of them may be skipped due to cache node filter).
      */
     public Map<Integer, Set<Integer>> partitions() {
-        return locParts;
+        return locParts.entrySet().stream().

Review comment:
       Sometimes you access `partitions()` method twice per each cache group, it's very ineffective, on each invoke a new stream, a new map and new sets are created. If you are afraid of accidental collection modification, it's better to wrap it to `Collections.unmodifiableMap`/`Collections.unmodifiableSet` once and then always return this created unmodifiable instance.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRequestMessage.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class SnapshotRequestMessage extends AbstractSnapshotMessage {
+    /** Snapshot request message type (value is {@code 177}). */

Review comment:
       `@code 178`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
##########
@@ -155,7 +158,58 @@ public int pageSize() {
      * saved on the local node because some of them may be skipped due to cache node filter).
      */
     public Map<Integer, Set<Integer>> partitions() {
-        return locParts;
+        return locParts.entrySet().stream().
+            collect(Collectors.toMap(Map.Entry::getKey,
+                e -> new HashSet<>(e.getValue())));
+    }
+
+    /** Save the state of this <tt>HashMap</tt> partitions and cache groups to a stream. */
+    private void writeObject(java.io.ObjectOutputStream s)
+        throws java.io.IOException {
+        // Write out any hidden serialization.
+        s.defaultWriteObject();
+
+        // Write out size of map.
+        s.writeInt(locParts.size());
+
+        // Write out all elements in the proper order.
+        for (Map.Entry<Integer, Set<Integer>> e : locParts.entrySet()) {
+            s.writeInt(e.getKey());
+            s.writeInt(e.getValue().size());
+
+            for (Integer partId : e.getValue())
+                s.writeInt(partId);
+        }
+    }
+
+    /** Reconstitute the <tt>HashMap</tt> instance of partitions and cache groups from a stream. */
+    private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException {
+        // Read in any hidden serialization.
+        s.defaultReadObject();
+
+        // Read size and verify non-negative.
+        int size = s.readInt();
+
+        if (size < 0)
+            throw new InvalidObjectException("Illegal size: " + size);
+
+        locParts = new HashMap<>(size);

Review comment:
       `U.newHashMap(size)`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseMessage.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class SnapshotResponseMessage extends AbstractSnapshotMessage {
+    /** Snapshot response message type (value is {@code 178}). */

Review comment:
       `@code 179`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org