You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/04/26 16:16:56 UTC

[GitHub] [ignite] Mmuzaf opened a new pull request #9047: IGNITE-14572 Add metastorage to snapshot

Mmuzaf opened a new pull request #9047:
URL: https://github.com/apache/ignite/pull/9047


   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-XXXX Change summary` where `XXXX` - number of JIRA issue.
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622026544



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the pause task completed.

Review comment:
       Future which will be completed **_will_** all the tasks prior to the pause task completed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623822624



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });

Review comment:
       ```suggestion
           Function<IgniteConfiguration, String> pathProv = cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath();
           Set<String> keySet0 = new TreeSet<>();
           Set<String> keySet1 = new TreeSet<>();
   
           IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0), pathProv, SNAPSHOT_NAME, false);
           snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> keySet0.add(key));
   
           stopGrid(0);
   
           IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1), pathProv, SNAPSHOT_NAME, false);
           snp1.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> keySet1.add(key));
   
           assertEquals(keySet0, keySet1);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622228541



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -69,7 +69,16 @@ public DmsWorkerStatus status() {
     private volatile ReadWriteMetastorage metastorage;
 
     /** */
-    private volatile boolean firstStart = true;
+    private volatile RunnableFuture<?> curTask;

Review comment:
       I see that curTask is used only locally in method "body"
   I think it's better to move it into the method (don't see any reason to add volatile reads).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621311125



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
##########
@@ -161,6 +171,13 @@ public MetaStorage(
         this.failureProcessor = cctx.kernalContext().failure();
     }
 
+    /**
+     * @return Count of metastorage partitions.
+     */
+    public static Set<Integer> partitions() {
+        return new HashSet<>(Arrays.asList(OLD_METASTORE_PARTITION, METASTORE_PARTITION));

Review comment:
       I suggest moving this into constant.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623073769



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
##########
@@ -1210,6 +1210,39 @@ private void onAckMessage(
         }
     }
 
+    /**
+     * @return Future which will be completed when all the updates prior to the pause processed.
+     */
+    public Future<?> flush() {
+        assert isPersistenceEnabled;
+
+        lock.readLock().lock();
+
+        try {
+            return worker.flush();
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @param compFut Future which should be completed when worker may proceed with updates.
+     */
+    public void pauseMetaStorage(IgniteInternalFuture<?> compFut) {

Review comment:
       Fixed.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
##########
@@ -1210,6 +1210,39 @@ private void onAckMessage(
         }
     }
 
+    /**
+     * @return Future which will be completed when all the updates prior to the pause processed.
+     */
+    public Future<?> flush() {
+        assert isPersistenceEnabled;
+
+        lock.readLock().lock();

Review comment:
       Fixed.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -69,7 +69,16 @@ public DmsWorkerStatus status() {
     private volatile ReadWriteMetastorage metastorage;
 
     /** */
-    private volatile boolean firstStart = true;
+    private volatile RunnableFuture<?> curTask;

Review comment:
       Fixed.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the pause task completed.
+     */
+    public Future<?> flush() {
+        return pauseTask;
     }
 
-    /** */
-    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
-        assert fullNodeData.fullData != null;
-        assert fullNodeData.hist != null;
-
-        updateQueue.clear();
+    /**
+     * @param compFut Future which should be completed when worker may proceed with updates.
+     */
+    public void pause(IgniteInternalFuture<?> compFut) {
+        latch = new CountDownLatch(1);
 
-        updateQueue.offer(fullNodeData);
-    }
+        updateQueue.offer(pauseTask = new FutureTask<>(() -> AWAIT));
 
-    /** */
-    public void removeHistItem(long ver) {
-        updateQueue.offer(ver);
+        compFut.listen(f -> latch.countDown());
     }
 
     /** */
-    public void cancel(boolean halt) throws InterruptedException {
-        if (halt)
-            updateQueue.clear();
+    public void update(DistributedMetaStorageHistoryItem histItem) {
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.write(historyItemKey(workerDmsVer.id() + 1), histItem);
 
-        updateQueue.offer(status = halt ? HALT : CANCEL);
+            workerDmsVer = workerDmsVer.nextVersion(histItem);
 
-        Thread runner = runner();
+            metastorage.write(versionKey(), workerDmsVer);
 
-        if (runner != null)
-            runner.join();
+            for (int i = 0, len = histItem.keys().length; i < len; i++)
+                write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
+        }));
     }
 
-    /** {@inheritDoc} */
-    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-        status = CONTINUE;
+    /** */
+    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
+        assert fullNodeData.fullData != null;
+        assert fullNodeData.hist != null;
 
-        try {
-            if (firstStart) {
-                firstStart = false;
+        updateQueue.clear();
 
-                lock.lock();
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
 
-                try {
-                    restore();
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
+            doCleanup();
 
-            while (true) {
-                Object update = updateQueue.peek();
+            for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
+                metastorage.writeRaw(localKey(item.key), item.valBytes);
 
-                try {
-                    update = updateQueue.take();
-                }
-                catch (InterruptedException ignore) {
-                }
+            for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
+                DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
 
-                lock.lock();
+                long histItemVer = fullNodeData.ver.id() + i - (len - 1);
 
-                try {
-                    // process update
-                    if (update instanceof DistributedMetaStorageHistoryItem)
-                        applyUpdate((DistributedMetaStorageHistoryItem)update);
-                    else if (update instanceof DistributedMetaStorageClusterNodeData) {
-                        DistributedMetaStorageClusterNodeData fullNodeData = (DistributedMetaStorageClusterNodeData)update;
+                metastorage.write(historyItemKey(histItemVer), histItem);
+            }
 
-                        metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
+            metastorage.write(versionKey(), fullNodeData.ver);
 
-                        doCleanup();
+            workerDmsVer = fullNodeData.ver;
 
-                        for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
-                            metastorage.writeRaw(localKey(item.key), item.valBytes);
+            metastorage.remove(cleanupGuardKey());
+        }));
+    }
 
-                        for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
-                            DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
+    /** */
+    public void removeHistItem(long ver) {
+        updateQueue.offer(newDmsTask(() -> metastorage.remove(historyItemKey(ver))));
+    }
 
-                            long histItemVer = fullNodeData.ver.id() + i - (len - 1);
+    /** */
+    public void cancel(boolean halt) {
+        if (halt)
+            updateQueue.clear();
 
-                            metastorage.write(historyItemKey(histItemVer), histItem);
-                        }
+        updateQueue.offer(new FutureTask<>(() -> STOP));
 
-                        metastorage.write(versionKey(), fullNodeData.ver);
+        U.join(runner(), log);
+    }
 
-                        workerDmsVer = fullNodeData.ver;
+    /** {@inheritDoc} */
+    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+        while (true) {
+            try {
+                curTask = updateQueue.take();
+            }
+            catch (InterruptedException ignore) {
+            }
 
-                        metastorage.remove(cleanupGuardKey());
-                    }
-                    else if (update instanceof Long) {
-                        long ver = (Long)update;
+            curTask.run();

Review comment:
       Fixed.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623978975



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateOnSnapshotFail() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage()
+                        .write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)cctx.database();
+
+        db.addCheckpointListener(new CheckpointListener() {
+            /** {@inheritDoc} */

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621405265



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
##########
@@ -453,28 +471,22 @@ else if (affNode && missed.isEmpty() && cctx.kernalContext().query().moduleEnabl
 
                 CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
 
-                if (gctx == null) {
-                    throw new IgniteCheckedException("Cache group context has not found " +
-                        "due to the cache group is stopped: " + grpId);
-                }
-
-                for (int partId : e.getValue()) {
-                    GroupPartitionId pair = new GroupPartitionId(grpId, partId);
-
-                    PageStore store = pageStore.getStore(grpId, partId);
+                if (gctx == null)
+                    throw new IgniteCheckedException("Cache group is stopped : " + grpId);
 
-                    partDeltaWriters.put(pair,
-                        new PageStoreSerialWriter(store,
-                            partDeltaFile(cacheWorkDir(tmpConsIdDir, cacheDirName(gctx.config())), partId)));
+                ccfgs.add(gctx.config());
+                addPartitionWriters(grpId, e.getValue(), () -> FilePageStoreManager.cacheDirName(gctx.config()));
+            }
 
-                    partFileLengths.put(pair, store.size());
-                }
+            if (withMetaStorage) {
+                processed.put(MetaStorage.METASTORAGE_CACHE_ID, MetaStorage.partitions());
 
-                ccfgs.add(gctx.config());
+                addPartitionWriters(MetaStorage.METASTORAGE_CACHE_ID, MetaStorage.partitions(),
+                    () -> cacheDirName(MetaStorage.METASTORAGE_CACHE_ID));

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621407722



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -1012,7 +1009,8 @@ else if (file.getName().startsWith(CACHE_GRP_DIR_PREFIX))
         return Arrays.stream(dir.listFiles())
             .sorted()
             .filter(File::isDirectory)
-            .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX))
+            .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX) ||
+                f.getName().startsWith(MetaStorage.META_STORAGE_DIR_NAME))

Review comment:
       Fixed.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -1058,6 +1056,8 @@ public static String cacheGroupName(File dir) {
             return name.substring(CACHE_GRP_DIR_PREFIX.length());
         else if (name.startsWith(CACHE_DIR_PREFIX))
             return name.substring(CACHE_DIR_PREFIX.length());
+        else if (name.startsWith(MetaStorage.META_STORAGE_DIR_NAME))

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621297816



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
##########
@@ -453,28 +471,22 @@ else if (affNode && missed.isEmpty() && cctx.kernalContext().query().moduleEnabl
 
                 CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
 
-                if (gctx == null) {
-                    throw new IgniteCheckedException("Cache group context has not found " +
-                        "due to the cache group is stopped: " + grpId);
-                }
-
-                for (int partId : e.getValue()) {
-                    GroupPartitionId pair = new GroupPartitionId(grpId, partId);
-
-                    PageStore store = pageStore.getStore(grpId, partId);
+                if (gctx == null)
+                    throw new IgniteCheckedException("Cache group is stopped : " + grpId);
 
-                    partDeltaWriters.put(pair,
-                        new PageStoreSerialWriter(store,
-                            partDeltaFile(cacheWorkDir(tmpConsIdDir, cacheDirName(gctx.config())), partId)));
+                ccfgs.add(gctx.config());
+                addPartitionWriters(grpId, e.getValue(), () -> FilePageStoreManager.cacheDirName(gctx.config()));
+            }
 
-                    partFileLengths.put(pair, store.size());
-                }
+            if (withMetaStorage) {
+                processed.put(MetaStorage.METASTORAGE_CACHE_ID, MetaStorage.partitions());
 
-                ccfgs.add(gctx.config());
+                addPartitionWriters(MetaStorage.METASTORAGE_CACHE_ID, MetaStorage.partitions(),
+                    () -> cacheDirName(MetaStorage.METASTORAGE_CACHE_ID));

Review comment:
       Why not pass ``MetaStorage.META_STORAGE_DIR_NAME`` directly (instead of ``cacheDirName(MetaStorage.METASTORAGE_CACHE_ID)``)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621382449



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -1058,6 +1056,8 @@ public static String cacheGroupName(File dir) {
             return name.substring(CACHE_GRP_DIR_PREFIX.length());
         else if (name.startsWith(CACHE_DIR_PREFIX))
             return name.substring(CACHE_DIR_PREFIX.length());
+        else if (name.startsWith(MetaStorage.META_STORAGE_DIR_NAME))

Review comment:
       ``startsWith`` can be replaced with ``equals``




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623822624



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });

Review comment:
       ```suggestion
           Function<IgniteConfiguration, String> pathProv = cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath();
           Set<String> keySet0 = new TreeSet<>();
           Set<String> keySet1 = new TreeSet<>();
   
           IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0), pathProv, SNAPSHOT_NAME, false);
           snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> keySet0.add(key));
   
           stopGrid(0);
   
           IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1), pathProv, SNAPSHOT_NAME, false);
           snp1.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> keySet1.add(key));
   
           assertEquals("Keys must be the same on all nodes", keySet0, keySet1);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623835041



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateOnSnapshotFail() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage()
+                        .write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)cctx.database();
+
+        db.addCheckpointListener(new CheckpointListener() {
+            /** {@inheritDoc} */
+            @Override public void onMarkCheckpointBegin(Context ctx) {
+                if (ctx.progress().reason().contains("Snapshot")) {
+                    Map<String, SnapshotFutureTask> locMap = GridTestUtils.getFieldValue(snp(ignite), IgniteSnapshotManager.class,
+                        "locSnpTasks");
+
+                    // Fail the snapshot task with an exception, all metastorage keys must be successfully continued.
+                    locMap.get(SNAPSHOT_NAME).acceptException(new IgniteCheckedException("Test exception"));
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onCheckpointBegin(Context ctx) {}
+
+            /** {@inheritDoc} */
+            @Override public void beforeCheckpointBegin(Context ctx) {}
+        });
+
+        IgniteFuture<?> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        GridTestUtils.assertThrowsAnyCause(log,
+            fut::get,
+            IgniteCheckedException.class,
+            "Test exception");
+
+        stop.set(true);
+
+        // Load future must complete without exceptions, all metastorage keys must be written.
+        updFut.get();
+
+        Set<Integer> allKeys = IntStream.range(0, keyCnt.get()).boxed().collect(Collectors.toSet());
+
+        ignite.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {

Review comment:
       ``BiConsumer`` can be replaced with lambda




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622368937



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();

Review comment:
       Instead of doing .run() we can define pauseTask as
   ```
   private volatile Future<?> pauseTask = CompletableFuture.completedFuture(AWAIT);
   ```
   and cast it in ``pause`` method to ``RunnableFuture`` like this
   ```
   updateQueue.offer((RunnableFuture<?>)(pauseTask = new FutureTask<>(() -> AWAIT)));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622368937



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();

Review comment:
       Instead of calling ``run()`` we can define pauseTask as
   ```
   private volatile Future<?> pauseTask = CompletableFuture.completedFuture(AWAIT);
   ```
   and cast it in ``pause`` method to ``RunnableFuture`` like this
   ```
   updateQueue.offer((RunnableFuture<?>)(pauseTask = new FutureTask<>(() -> AWAIT)));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622195440



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the pause task completed.
+     */
+    public Future<?> flush() {
+        return pauseTask;
     }
 
-    /** */
-    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
-        assert fullNodeData.fullData != null;
-        assert fullNodeData.hist != null;
-
-        updateQueue.clear();
+    /**
+     * @param compFut Future which should be completed when worker may proceed with updates.
+     */
+    public void pause(IgniteInternalFuture<?> compFut) {

Review comment:
       maybe it would be better to name this method ``suspend``




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622029736



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(new HashSet<>(Collections.singletonList(0)),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(new HashSet<>(Collections.singletonList(1)),

Review comment:
       ```suggestion
           IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
   ```

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(new HashSet<>(Collections.singletonList(0)),

Review comment:
       ```suggestion
           IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623978089



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateOnSnapshotFail() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage()
+                        .write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623732239



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
##########
@@ -339,7 +348,13 @@ public boolean start() {
                     throw new IgniteCheckedException("Encrypted cache groups are not allowed to be snapshot: " + grpId);
 
                 // Create cache group snapshot directory on start in a single thread.
-                U.ensureDirectory(cacheWorkDir(tmpConsIdDir, cacheDirName(gctx.config())),
+                U.ensureDirectory(cacheWorkDir(tmpConsIdDir, FilePageStoreManager.cacheDirName(gctx.config())),
+                    "directory for snapshotting cache group",
+                    log);
+            }
+
+            if (withMetaStorage) {
+                U.ensureDirectory(cacheWorkDir(tmpConsIdDir, MetaStorage.METASTORAGE_DIR_NAME),
                     "directory for snapshotting cache group",

Review comment:
       ```suggestion
                       "directory for snapshotting metastorage",
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621296007



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
##########
@@ -453,28 +471,22 @@ else if (affNode && missed.isEmpty() && cctx.kernalContext().query().moduleEnabl
 
                 CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
 
-                if (gctx == null) {
-                    throw new IgniteCheckedException("Cache group context has not found " +
-                        "due to the cache group is stopped: " + grpId);
-                }
-
-                for (int partId : e.getValue()) {
-                    GroupPartitionId pair = new GroupPartitionId(grpId, partId);
-
-                    PageStore store = pageStore.getStore(grpId, partId);
+                if (gctx == null)
+                    throw new IgniteCheckedException("Cache group is stopped : " + grpId);
 
-                    partDeltaWriters.put(pair,
-                        new PageStoreSerialWriter(store,
-                            partDeltaFile(cacheWorkDir(tmpConsIdDir, cacheDirName(gctx.config())), partId)));
+                ccfgs.add(gctx.config());
+                addPartitionWriters(grpId, e.getValue(), () -> FilePageStoreManager.cacheDirName(gctx.config()));
+            }
 
-                    partFileLengths.put(pair, store.size());
-                }
+            if (withMetaStorage) {
+                processed.put(MetaStorage.METASTORAGE_CACHE_ID, MetaStorage.partitions());
 
-                ccfgs.add(gctx.config());
+                addPartitionWriters(MetaStorage.METASTORAGE_CACHE_ID, MetaStorage.partitions(),
+                    () -> cacheDirName(MetaStorage.METASTORAGE_CACHE_ID));
             }
 
             pageStore.readConfigurationFiles(ccfgs,
-                (ccfg, ccfgFile) -> ccfgSndrs.add(new CacheConfigurationSender(ccfg.getName(), cacheDirName(ccfg), ccfgFile)));
+                (ccfg, ccfgFile) -> ccfgSndrs.add(new CacheConfigurationSender(ccfg.getName(), FilePageStoreManager.cacheDirName(ccfg), ccfgFile)));

Review comment:
       The length of this line is 140+ symbols




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623824121



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });

Review comment:
       TreeSet (in suggested code) is only for more readable error output (not for comparison)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621403300



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
##########
@@ -453,28 +471,22 @@ else if (affNode && missed.isEmpty() && cctx.kernalContext().query().moduleEnabl
 
                 CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
 
-                if (gctx == null) {
-                    throw new IgniteCheckedException("Cache group context has not found " +
-                        "due to the cache group is stopped: " + grpId);
-                }
-
-                for (int partId : e.getValue()) {
-                    GroupPartitionId pair = new GroupPartitionId(grpId, partId);
-
-                    PageStore store = pageStore.getStore(grpId, partId);
+                if (gctx == null)
+                    throw new IgniteCheckedException("Cache group is stopped : " + grpId);
 
-                    partDeltaWriters.put(pair,
-                        new PageStoreSerialWriter(store,
-                            partDeltaFile(cacheWorkDir(tmpConsIdDir, cacheDirName(gctx.config())), partId)));
+                ccfgs.add(gctx.config());
+                addPartitionWriters(grpId, e.getValue(), () -> FilePageStoreManager.cacheDirName(gctx.config()));
+            }
 
-                    partFileLengths.put(pair, store.size());
-                }
+            if (withMetaStorage) {
+                processed.put(MetaStorage.METASTORAGE_CACHE_ID, MetaStorage.partitions());
 
-                ccfgs.add(gctx.config());
+                addPartitionWriters(MetaStorage.METASTORAGE_CACHE_ID, MetaStorage.partitions(),
+                    () -> cacheDirName(MetaStorage.METASTORAGE_CACHE_ID));
             }
 
             pageStore.readConfigurationFiles(ccfgs,
-                (ccfg, ccfgFile) -> ccfgSndrs.add(new CacheConfigurationSender(ccfg.getName(), cacheDirName(ccfg), ccfgFile)));
+                (ccfg, ccfgFile) -> ccfgSndrs.add(new CacheConfigurationSender(ccfg.getName(), FilePageStoreManager.cacheDirName(ccfg), ccfgFile)));

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623832162



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateOnSnapshotFail() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage()
+                        .write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)cctx.database();
+
+        db.addCheckpointListener(new CheckpointListener() {
+            /** {@inheritDoc} */

Review comment:
       As far as I know, there is no point in documenting methods in anonymous classes as this document will be completely ignored 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623824121



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });

Review comment:
       TreeSet (in suggested code) required only for the output




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622026544



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the pause task completed.

Review comment:
       Future which will be completed ```diff - will``` all the tasks prior to the pause task completed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623978543



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateOnSnapshotFail() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage()
+                        .write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)cctx.database();
+
+        db.addCheckpointListener(new CheckpointListener() {
+            /** {@inheritDoc} */
+            @Override public void onMarkCheckpointBegin(Context ctx) {
+                if (ctx.progress().reason().contains("Snapshot")) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623824121



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });

Review comment:
       TreeSet (in suggested code) required only for the output eg
   ``
   java.lang.AssertionError: Keys must be the same on all nodes 
   Expected :[SNAPSHOT_PREFIX_0, SNAPSHOT_PREFIX_1, SNAPSHOT_PREFIX_10, SNAPSHOT_PREFIX_11, SNAPSHOT_PREFIX_12, SNAPSHOT_PREFIX_2, SNAPSHOT_PREFIX_3, SNAPSHOT_PREFIX_4, SNAPSHOT_PREFIX_5, SNAPSHOT_PREFIX_6, SNAPSHOT_PREFIX_7, SNAPSHOT_PREFIX_8, SNAPSHOT_PREFIX_9]
   Actual   :[SNAPSHOT_PREFIX_0, SNAPSHOT_PREFIX_1, SNAPSHOT_PREFIX_10, SNAPSHOT_PREFIX_100, SNAPSHOT_PREFIX_101, SNAPSHOT_PREFIX_102, SNAPSHOT_PREFIX_103, SNAPSHOT_PREFIX_104, SNAPSHOT_PREFIX_105, SNAPSHOT_PREFIX_106, SNAPSHOT_PREFIX_107, SNAPSHOT_PREFIX_108, SNAPSHOT_PREFIX_109, SNAPSHOT_PREFIX_11, SNAPSHOT_PREFIX_110, SNAPSHOT_PREFIX_111, SNAPSHOT_PREFIX_112, SNAPSHOT_PREFIX_113, SNAPSHOT_PREFIX_114, SNAPSHOT_PREFIX_115, SNAPSHOT_PREFIX_116, SNAPSHOT_PREFIX_117, SNAPSHOT_PREFIX_118, SNAPSHOT_PREFIX_119, SNAPSHOT_PREFIX_12, SNAPSHOT_PREFIX_120, SNAPSHOT_PREFIX_121, SNAPSHOT_PREFIX_122, SNAPSHOT_PREFIX_123, SNAPSHOT_PREFIX_124, SNAPSHOT_PREFIX_125, SNAPSHOT_PREFIX_126, SNAPSHOT_PREFIX_127, SNAPSHOT_PREFIX_128, SNAPSHOT_PREFIX_129, SNAPSHOT_PREFIX_13, SNAPSHOT_PREFIX_130, SNAPSHOT_PREFIX_131, SNAPSHOT_PREFIX_132, SNAPSHOT_PREFIX_133, SNAPSHOT_PREFIX_134, SNAPSHOT_PREFIX_135, SNAPSHOT_PREFIX_136, SNAPSHOT_PREFIX_137, SNAPSHOT_PREFIX_138, SNAPSHOT_PREFIX_139, SNAPSHOT_PREFIX_14, SNAPSH
 OT_PREFIX_140, SNAPSHOT_PREFIX_141, SNAPSHOT_PREFIX_142, SNAPSHOT_PREFIX_143, SNAPSHOT_PREFIX_144, SNAPSHOT_PREFIX_145, SNAPSHOT_PREFIX_146, SNAPSHOT_PREFIX_147, SNAPSHOT_PREFIX_148, SNAPSHOT_PREFIX_149, SNAPSHOT_PREFIX_15, SNAPSHOT_PREFIX_150, SNAPSHOT_PREFIX_151, SNAPSHOT_PREFIX_152, SNAPSHOT_PREFIX_153, SNAPSHOT_PREFIX_154, SNAPSHOT_PREFIX_155, SNAPSHOT_PREFIX_156, SNAPSHOT_PREFIX_157, SNAPSHOT_PREFIX_158, SNAPSHOT_PREFIX_159, SNAPSHOT_PREFIX_16, SNAPSHOT_PREFIX_160, SNAPSHOT_PREFIX_161, SNAPSHOT_PREFIX_162, SNAPSHOT_PREFIX_163, SNAPSHOT_PREFIX_164, SNAPSHOT_PREFIX_165, SNAPSHOT_PREFIX_166, SNAPSHOT_PREFIX_167, SNAPSHOT_PREFIX_168, SNAPSHOT_PREFIX_169, SNAPSHOT_PREFIX_17, SNAPSHOT_PREFIX_170, SNAPSHOT_PREFIX_18, SNAPSHOT_PREFIX_19, SNAPSHOT_PREFIX_2, SNAPSHOT_PREFIX_20, SNAPSHOT_PREFIX_21, SNAPSHOT_PREFIX_22, SNAPSHOT_PREFIX_23, SNAPSHOT_PREFIX_24, SNAPSHOT_PREFIX_25, SNAPSHOT_PREFIX_26, SNAPSHOT_PREFIX_27, SNAPSHOT_PREFIX_28, SNAPSHOT_PREFIX_29, SNAPSHOT_PREFIX_3, SNAPSHOT_PREFI
 X_30, SNAPSHOT_PREFIX_31, SNAPSHOT_PREFIX_32, SNAPSHOT_PREFIX_33, SNAPSHOT_PREFIX_34, SNAPSHOT_PREFIX_35, SNAPSHOT_PREFIX_36, SNAPSHOT_PREFIX_37, SNAPSHOT_PREFIX_38, SNAPSHOT_PREFIX_39, SNAPSHOT_PREFIX_4, SNAPSHOT_PREFIX_40, SNAPSHOT_PREFIX_41, SNAPSHOT_PREFIX_42, SNAPSHOT_PREFIX_43, SNAPSHOT_PREFIX_44, SNAPSHOT_PREFIX_45, SNAPSHOT_PREFIX_46, SNAPSHOT_PREFIX_47, SNAPSHOT_PREFIX_48, SNAPSHOT_PREFIX_49, SNAPSHOT_PREFIX_5, SNAPSHOT_PREFIX_50, SNAPSHOT_PREFIX_51, SNAPSHOT_PREFIX_52, SNAPSHOT_PREFIX_53, SNAPSHOT_PREFIX_54, SNAPSHOT_PREFIX_55, SNAPSHOT_PREFIX_56, SNAPSHOT_PREFIX_57, SNAPSHOT_PREFIX_58, SNAPSHOT_PREFIX_59, SNAPSHOT_PREFIX_6, SNAPSHOT_PREFIX_60, SNAPSHOT_PREFIX_61, SNAPSHOT_PREFIX_62, SNAPSHOT_PREFIX_63, SNAPSHOT_PREFIX_64, SNAPSHOT_PREFIX_65, SNAPSHOT_PREFIX_66, SNAPSHOT_PREFIX_67, SNAPSHOT_PREFIX_68, SNAPSHOT_PREFIX_69, SNAPSHOT_PREFIX_7, SNAPSHOT_PREFIX_70, SNAPSHOT_PREFIX_71, SNAPSHOT_PREFIX_72, SNAPSHOT_PREFIX_73, SNAPSHOT_PREFIX_74, SNAPSHOT_PREFIX_75, SNAPSHOT_PREFIX
 _76, SNAPSHOT_PREFIX_77, SNAPSHOT_PREFIX_78, SNAPSHOT_PREFIX_79, SNAPSHOT_PREFIX_8, SNAPSHOT_PREFIX_80, SNAPSHOT_PREFIX_81, SNAPSHOT_PREFIX_82, SNAPSHOT_PREFIX_83, SNAPSHOT_PREFIX_84, SNAPSHOT_PREFIX_85, SNAPSHOT_PREFIX_86, SNAPSHOT_PREFIX_87, SNAPSHOT_PREFIX_88, SNAPSHOT_PREFIX_89, SNAPSHOT_PREFIX_9, SNAPSHOT_PREFIX_90, SNAPSHOT_PREFIX_91, SNAPSHOT_PREFIX_92, SNAPSHOT_PREFIX_93, SNAPSHOT_PREFIX_94, SNAPSHOT_PREFIX_95, SNAPSHOT_PREFIX_96, SNAPSHOT_PREFIX_97, SNAPSHOT_PREFIX_98, SNAPSHOT_PREFIX_99]
   <Click to see difference>
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621294241



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
##########
@@ -85,6 +86,15 @@
     /** */
     public static final int METASTORAGE_CACHE_ID = CU.cacheId(METASTORAGE_CACHE_NAME);
 
+    /** Metastorage cache directory to store data. */
+    public static final String META_STORAGE_DIR_NAME = "metastorage";

Review comment:
       I suggest renaming it to ``METASTORAGE_DIR_NAME`` (similar to other constants) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622195440



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the pause task completed.
+     */
+    public Future<?> flush() {
+        return pauseTask;
     }
 
-    /** */
-    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
-        assert fullNodeData.fullData != null;
-        assert fullNodeData.hist != null;
-
-        updateQueue.clear();
+    /**
+     * @param compFut Future which should be completed when worker may proceed with updates.
+     */
+    public void pause(IgniteInternalFuture<?> compFut) {

Review comment:
       may be it would be better to name method ``suspend``




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621382449



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -1058,6 +1056,8 @@ public static String cacheGroupName(File dir) {
             return name.substring(CACHE_GRP_DIR_PREFIX.length());
         else if (name.startsWith(CACHE_DIR_PREFIX))
             return name.substring(CACHE_DIR_PREFIX.length());
+        else if (name.startsWith(MetaStorage.META_STORAGE_DIR_NAME))

Review comment:
       ``startWith`` can be replaced to ``equals``




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623979129



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateOnSnapshotFail() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage()
+                        .write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)cctx.database();
+
+        db.addCheckpointListener(new CheckpointListener() {
+            /** {@inheritDoc} */
+            @Override public void onMarkCheckpointBegin(Context ctx) {
+                if (ctx.progress().reason().contains("Snapshot")) {
+                    Map<String, SnapshotFutureTask> locMap = GridTestUtils.getFieldValue(snp(ignite), IgniteSnapshotManager.class,
+                        "locSnpTasks");
+
+                    // Fail the snapshot task with an exception, all metastorage keys must be successfully continued.
+                    locMap.get(SNAPSHOT_NAME).acceptException(new IgniteCheckedException("Test exception"));
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onCheckpointBegin(Context ctx) {}
+
+            /** {@inheritDoc} */
+            @Override public void beforeCheckpointBegin(Context ctx) {}
+        });
+
+        IgniteFuture<?> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        GridTestUtils.assertThrowsAnyCause(log,
+            fut::get,
+            IgniteCheckedException.class,
+            "Test exception");
+
+        stop.set(true);
+
+        // Load future must complete without exceptions, all metastorage keys must be written.
+        updFut.get();
+
+        Set<Integer> allKeys = IntStream.range(0, keyCnt.get()).boxed().collect(Collectors.toSet());
+
+        ignite.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623721326



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
##########
@@ -85,6 +86,19 @@
     /** */
     public static final int METASTORAGE_CACHE_ID = CU.cacheId(METASTORAGE_CACHE_NAME);
 
+    /** Metastorage cache directory to store data. */
+    public static final String METASTORAGE_DIR_NAME = "metastorage";
+
+    /** Old special partition reserved for metastore space. */
+    public static final int OLD_METASTORE_PARTITION = 0x0;
+
+    /** Special partition reserved for metastore space. */
+    public static final int METASTORE_PARTITION = 0x1;
+
+    /** The set of all metastorage partitions. */
+    public static final Set<Integer> METASTORAGE_PARTITIONS =
+        new HashSet<>(Arrays.asList(OLD_METASTORE_PARTITION, METASTORE_PARTITION));

Review comment:
       ```suggestion
           Collections.unmodifiableSet(new HashSet<>(Arrays.asList(OLD_METASTORE_PARTITION, METASTORE_PARTITION)));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623855057



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateOnSnapshotFail() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage()
+                        .write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)cctx.database();
+
+        db.addCheckpointListener(new CheckpointListener() {
+            /** {@inheritDoc} */
+            @Override public void onMarkCheckpointBegin(Context ctx) {
+                if (ctx.progress().reason().contains("Snapshot")) {
+                    Map<String, SnapshotFutureTask> locMap = GridTestUtils.getFieldValue(snp(ignite), IgniteSnapshotManager.class,
+                        "locSnpTasks");
+
+                    // Fail the snapshot task with an exception, all metastorage keys must be successfully continued.
+                    locMap.get(SNAPSHOT_NAME).acceptException(new IgniteCheckedException("Test exception"));
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onCheckpointBegin(Context ctx) {}
+
+            /** {@inheritDoc} */
+            @Override public void beforeCheckpointBegin(Context ctx) {}
+        });
+
+        IgniteFuture<?> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        GridTestUtils.assertThrowsAnyCause(log,
+            fut::get,
+            IgniteCheckedException.class,
+            "Test exception");
+
+        stop.set(true);
+
+        // Load future must complete without exceptions, all metastorage keys must be written.
+        updFut.get();
+
+        Set<Integer> allKeys = IntStream.range(0, keyCnt.get()).boxed().collect(Collectors.toSet());
+
+        ignite.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+            @Override public void accept(String key, Serializable val) {
+                try {
+                    assertTrue(allKeys.remove(Integer.parseInt(key.replace(SNAPSHOT_PREFIX, ""))));
+                }
+                catch (Throwable t) {
+                    fail("Exception reading metastorage: " + t.getMessage());
+                }
+            }
+        });
+
+        assertTrue("Not all metastorage keys have been written: " + allKeys, allKeys.isEmpty());
+    }
+}

Review comment:
       ```suggestion
       public void testMetastorageUpdateOnSnapshotFail() throws Exception {
           AtomicInteger keyCnt = new AtomicInteger();
           AtomicBoolean stop = new AtomicBoolean();
           Set<String> writtenKeys = new TreeSet<>();
   
           IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
   
           IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
               while (!Thread.currentThread().isInterrupted() && !stop.get()) {
                   try {
                       String key = SNAPSHOT_PREFIX + keyCnt.getAndIncrement();
   
                       ignite.context().distributedMetastorage().write(key, "value");
   
                       writtenKeys.add(key);
                   }
                   catch (IgniteCheckedException e) {
                       throw new IgniteException(e);
                   }
               }
           }, 3, "dms-updater");
   
           ((GridCacheDatabaseSharedManager)ignite.context().cache().context().database())
               .addCheckpointListener(new CheckpointListener() {
                   @Override public void onMarkCheckpointBegin(Context ctx) {
                       if (ctx.progress().reason().contains(SNAPSHOT_NAME)) {
                           Map<String, SnapshotFutureTask> locMap =
                               GridTestUtils.getFieldValue(snp(ignite), IgniteSnapshotManager.class, "locSnpTasks");
   
                           // Fail the snapshot task with an exception, all metastorage keys must be successfully continued.
                           locMap.get(SNAPSHOT_NAME).acceptException(new IgniteCheckedException("Test exception"));
                       }
                   }
   
                   @Override public void onCheckpointBegin(Context ctx) {}
   
                   @Override public void beforeCheckpointBegin(Context ctx) {}
               });
   
           IgniteFuture<?> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
   
           GridTestUtils.assertThrowsAnyCause(log, fut::get, IgniteCheckedException.class, "Test exception");
   
           stop.set(true);
   
           // Load future must complete without exceptions, all metastorage keys must be written.
           updFut.get();
   
           Set<String> readedKeys = new TreeSet<>();
   
           ignite.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> readedKeys.add(key));
   
           assertEquals("Not all metastorage keys have been written", writtenKeys, readedKeys);
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622029051



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(new HashSet<>(Collections.singletonList(0)),

Review comment:
       ```suggestion
           IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0)),
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622026544



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the pause task completed.

Review comment:
       Future which will be completed **_when_**(?) all the tasks prior to the pause task completed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623073179



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the pause task completed.

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622026544



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the pause task completed.

Review comment:
       ``Future which will be completed **_will_** all the tasks prior to the pause task completed.``




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623073665



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the pause task completed.
+     */
+    public Future<?> flush() {
+        return pauseTask;
     }
 
-    /** */
-    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
-        assert fullNodeData.fullData != null;
-        assert fullNodeData.hist != null;
-
-        updateQueue.clear();
+    /**
+     * @param compFut Future which should be completed when worker may proceed with updates.
+     */
+    public void pause(IgniteInternalFuture<?> compFut) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622232211



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the pause task completed.
+     */
+    public Future<?> flush() {
+        return pauseTask;
     }
 
-    /** */
-    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
-        assert fullNodeData.fullData != null;
-        assert fullNodeData.hist != null;
-
-        updateQueue.clear();
+    /**
+     * @param compFut Future which should be completed when worker may proceed with updates.
+     */
+    public void pause(IgniteInternalFuture<?> compFut) {
+        latch = new CountDownLatch(1);
 
-        updateQueue.offer(fullNodeData);
-    }
+        updateQueue.offer(pauseTask = new FutureTask<>(() -> AWAIT));
 
-    /** */
-    public void removeHistItem(long ver) {
-        updateQueue.offer(ver);
+        compFut.listen(f -> latch.countDown());
     }
 
     /** */
-    public void cancel(boolean halt) throws InterruptedException {
-        if (halt)
-            updateQueue.clear();
+    public void update(DistributedMetaStorageHistoryItem histItem) {
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.write(historyItemKey(workerDmsVer.id() + 1), histItem);
 
-        updateQueue.offer(status = halt ? HALT : CANCEL);
+            workerDmsVer = workerDmsVer.nextVersion(histItem);
 
-        Thread runner = runner();
+            metastorage.write(versionKey(), workerDmsVer);
 
-        if (runner != null)
-            runner.join();
+            for (int i = 0, len = histItem.keys().length; i < len; i++)
+                write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
+        }));
     }
 
-    /** {@inheritDoc} */
-    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-        status = CONTINUE;
+    /** */
+    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
+        assert fullNodeData.fullData != null;
+        assert fullNodeData.hist != null;
 
-        try {
-            if (firstStart) {
-                firstStart = false;
+        updateQueue.clear();
 
-                lock.lock();
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
 
-                try {
-                    restore();
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
+            doCleanup();
 
-            while (true) {
-                Object update = updateQueue.peek();
+            for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
+                metastorage.writeRaw(localKey(item.key), item.valBytes);
 
-                try {
-                    update = updateQueue.take();
-                }
-                catch (InterruptedException ignore) {
-                }
+            for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
+                DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
 
-                lock.lock();
+                long histItemVer = fullNodeData.ver.id() + i - (len - 1);
 
-                try {
-                    // process update
-                    if (update instanceof DistributedMetaStorageHistoryItem)
-                        applyUpdate((DistributedMetaStorageHistoryItem)update);
-                    else if (update instanceof DistributedMetaStorageClusterNodeData) {
-                        DistributedMetaStorageClusterNodeData fullNodeData = (DistributedMetaStorageClusterNodeData)update;
+                metastorage.write(historyItemKey(histItemVer), histItem);
+            }
 
-                        metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
+            metastorage.write(versionKey(), fullNodeData.ver);
 
-                        doCleanup();
+            workerDmsVer = fullNodeData.ver;
 
-                        for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
-                            metastorage.writeRaw(localKey(item.key), item.valBytes);
+            metastorage.remove(cleanupGuardKey());
+        }));
+    }
 
-                        for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
-                            DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
+    /** */
+    public void removeHistItem(long ver) {
+        updateQueue.offer(newDmsTask(() -> metastorage.remove(historyItemKey(ver))));
+    }
 
-                            long histItemVer = fullNodeData.ver.id() + i - (len - 1);
+    /** */
+    public void cancel(boolean halt) {
+        if (halt)
+            updateQueue.clear();
 
-                            metastorage.write(historyItemKey(histItemVer), histItem);
-                        }
+        updateQueue.offer(new FutureTask<>(() -> STOP));
 
-                        metastorage.write(versionKey(), fullNodeData.ver);
+        U.join(runner(), log);
+    }
 
-                        workerDmsVer = fullNodeData.ver;
+    /** {@inheritDoc} */
+    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+        while (true) {
+            try {
+                curTask = updateQueue.take();
+            }
+            catch (InterruptedException ignore) {
+            }
 
-                        metastorage.remove(cleanupGuardKey());
-                    }
-                    else if (update instanceof Long) {
-                        long ver = (Long)update;
+            curTask.run();

Review comment:
       Since ``curTask`` is currently a class field we might run the previous task twice (if ``take()`` throws InterruptedException, for example  ``restore``) and that's not good either. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622232211



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the pause task completed.
+     */
+    public Future<?> flush() {
+        return pauseTask;
     }
 
-    /** */
-    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
-        assert fullNodeData.fullData != null;
-        assert fullNodeData.hist != null;
-
-        updateQueue.clear();
+    /**
+     * @param compFut Future which should be completed when worker may proceed with updates.
+     */
+    public void pause(IgniteInternalFuture<?> compFut) {
+        latch = new CountDownLatch(1);
 
-        updateQueue.offer(fullNodeData);
-    }
+        updateQueue.offer(pauseTask = new FutureTask<>(() -> AWAIT));
 
-    /** */
-    public void removeHistItem(long ver) {
-        updateQueue.offer(ver);
+        compFut.listen(f -> latch.countDown());
     }
 
     /** */
-    public void cancel(boolean halt) throws InterruptedException {
-        if (halt)
-            updateQueue.clear();
+    public void update(DistributedMetaStorageHistoryItem histItem) {
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.write(historyItemKey(workerDmsVer.id() + 1), histItem);
 
-        updateQueue.offer(status = halt ? HALT : CANCEL);
+            workerDmsVer = workerDmsVer.nextVersion(histItem);
 
-        Thread runner = runner();
+            metastorage.write(versionKey(), workerDmsVer);
 
-        if (runner != null)
-            runner.join();
+            for (int i = 0, len = histItem.keys().length; i < len; i++)
+                write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
+        }));
     }
 
-    /** {@inheritDoc} */
-    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-        status = CONTINUE;
+    /** */
+    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
+        assert fullNodeData.fullData != null;
+        assert fullNodeData.hist != null;
 
-        try {
-            if (firstStart) {
-                firstStart = false;
+        updateQueue.clear();
 
-                lock.lock();
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
 
-                try {
-                    restore();
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
+            doCleanup();
 
-            while (true) {
-                Object update = updateQueue.peek();
+            for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
+                metastorage.writeRaw(localKey(item.key), item.valBytes);
 
-                try {
-                    update = updateQueue.take();
-                }
-                catch (InterruptedException ignore) {
-                }
+            for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
+                DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
 
-                lock.lock();
+                long histItemVer = fullNodeData.ver.id() + i - (len - 1);
 
-                try {
-                    // process update
-                    if (update instanceof DistributedMetaStorageHistoryItem)
-                        applyUpdate((DistributedMetaStorageHistoryItem)update);
-                    else if (update instanceof DistributedMetaStorageClusterNodeData) {
-                        DistributedMetaStorageClusterNodeData fullNodeData = (DistributedMetaStorageClusterNodeData)update;
+                metastorage.write(historyItemKey(histItemVer), histItem);
+            }
 
-                        metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
+            metastorage.write(versionKey(), fullNodeData.ver);
 
-                        doCleanup();
+            workerDmsVer = fullNodeData.ver;
 
-                        for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
-                            metastorage.writeRaw(localKey(item.key), item.valBytes);
+            metastorage.remove(cleanupGuardKey());
+        }));
+    }
 
-                        for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
-                            DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
+    /** */
+    public void removeHistItem(long ver) {
+        updateQueue.offer(newDmsTask(() -> metastorage.remove(historyItemKey(ver))));
+    }
 
-                            long histItemVer = fullNodeData.ver.id() + i - (len - 1);
+    /** */
+    public void cancel(boolean halt) {
+        if (halt)
+            updateQueue.clear();
 
-                            metastorage.write(historyItemKey(histItemVer), histItem);
-                        }
+        updateQueue.offer(new FutureTask<>(() -> STOP));
 
-                        metastorage.write(versionKey(), fullNodeData.ver);
+        U.join(runner(), log);
+    }
 
-                        workerDmsVer = fullNodeData.ver;
+    /** {@inheritDoc} */
+    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+        while (true) {
+            try {
+                curTask = updateQueue.take();
+            }
+            catch (InterruptedException ignore) {
+            }
 
-                        metastorage.remove(cleanupGuardKey());
-                    }
-                    else if (update instanceof Long) {
-                        long ver = (Long)update;
+            curTask.run();

Review comment:
       Since ``curTask`` is currently a class field we might get an NPE or run the previous task twice (if ``take()`` throws InterruptedException) and that's not good either. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623830486



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateOnSnapshotFail() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage()
+                        .write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)cctx.database();
+
+        db.addCheckpointListener(new CheckpointListener() {
+            /** {@inheritDoc} */
+            @Override public void onMarkCheckpointBegin(Context ctx) {
+                if (ctx.progress().reason().contains("Snapshot")) {

Review comment:
       ```suggestion
                   if (ctx.progress().reason().contains(SNAPSHOT_NAME)) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621380848



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -1012,7 +1009,8 @@ else if (file.getName().startsWith(CACHE_GRP_DIR_PREFIX))
         return Arrays.stream(dir.listFiles())
             .sorted()
             .filter(File::isDirectory)
-            .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX))
+            .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX) ||
+                f.getName().startsWith(MetaStorage.META_STORAGE_DIR_NAME))

Review comment:
       may be replaced with `f.getName().equals(MetaStorage.META_STORAGE_DIR_NAME)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623824121



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });

Review comment:
       TreeSet (in suggested code) required only for more readable error output (not for comparison)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621407189



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
##########
@@ -161,6 +171,13 @@ public MetaStorage(
         this.failureProcessor = cctx.kernalContext().failure();
     }
 
+    /**
+     * @return Count of metastorage partitions.
+     */
+    public static Set<Integer> partitions() {
+        return new HashSet<>(Arrays.asList(OLD_METASTORE_PARTITION, METASTORE_PARTITION));

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621382449



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -1058,6 +1056,8 @@ public static String cacheGroupName(File dir) {
             return name.substring(CACHE_GRP_DIR_PREFIX.length());
         else if (name.startsWith(CACHE_DIR_PREFIX))
             return name.substring(CACHE_DIR_PREFIX.length());
+        else if (name.startsWith(MetaStorage.META_STORAGE_DIR_NAME))

Review comment:
       ``startsWith`` can be replaced to ``equals``




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf merged pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
Mmuzaf merged pull request #9047:
URL: https://github.com/apache/ignite/pull/9047


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622232211



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the pause task completed.
+     */
+    public Future<?> flush() {
+        return pauseTask;
     }
 
-    /** */
-    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
-        assert fullNodeData.fullData != null;
-        assert fullNodeData.hist != null;
-
-        updateQueue.clear();
+    /**
+     * @param compFut Future which should be completed when worker may proceed with updates.
+     */
+    public void pause(IgniteInternalFuture<?> compFut) {
+        latch = new CountDownLatch(1);
 
-        updateQueue.offer(fullNodeData);
-    }
+        updateQueue.offer(pauseTask = new FutureTask<>(() -> AWAIT));
 
-    /** */
-    public void removeHistItem(long ver) {
-        updateQueue.offer(ver);
+        compFut.listen(f -> latch.countDown());
     }
 
     /** */
-    public void cancel(boolean halt) throws InterruptedException {
-        if (halt)
-            updateQueue.clear();
+    public void update(DistributedMetaStorageHistoryItem histItem) {
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.write(historyItemKey(workerDmsVer.id() + 1), histItem);
 
-        updateQueue.offer(status = halt ? HALT : CANCEL);
+            workerDmsVer = workerDmsVer.nextVersion(histItem);
 
-        Thread runner = runner();
+            metastorage.write(versionKey(), workerDmsVer);
 
-        if (runner != null)
-            runner.join();
+            for (int i = 0, len = histItem.keys().length; i < len; i++)
+                write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
+        }));
     }
 
-    /** {@inheritDoc} */
-    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-        status = CONTINUE;
+    /** */
+    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
+        assert fullNodeData.fullData != null;
+        assert fullNodeData.hist != null;
 
-        try {
-            if (firstStart) {
-                firstStart = false;
+        updateQueue.clear();
 
-                lock.lock();
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
 
-                try {
-                    restore();
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
+            doCleanup();
 
-            while (true) {
-                Object update = updateQueue.peek();
+            for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
+                metastorage.writeRaw(localKey(item.key), item.valBytes);
 
-                try {
-                    update = updateQueue.take();
-                }
-                catch (InterruptedException ignore) {
-                }
+            for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
+                DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
 
-                lock.lock();
+                long histItemVer = fullNodeData.ver.id() + i - (len - 1);
 
-                try {
-                    // process update
-                    if (update instanceof DistributedMetaStorageHistoryItem)
-                        applyUpdate((DistributedMetaStorageHistoryItem)update);
-                    else if (update instanceof DistributedMetaStorageClusterNodeData) {
-                        DistributedMetaStorageClusterNodeData fullNodeData = (DistributedMetaStorageClusterNodeData)update;
+                metastorage.write(historyItemKey(histItemVer), histItem);
+            }
 
-                        metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
+            metastorage.write(versionKey(), fullNodeData.ver);
 
-                        doCleanup();
+            workerDmsVer = fullNodeData.ver;
 
-                        for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
-                            metastorage.writeRaw(localKey(item.key), item.valBytes);
+            metastorage.remove(cleanupGuardKey());
+        }));
+    }
 
-                        for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
-                            DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
+    /** */
+    public void removeHistItem(long ver) {
+        updateQueue.offer(newDmsTask(() -> metastorage.remove(historyItemKey(ver))));
+    }
 
-                            long histItemVer = fullNodeData.ver.id() + i - (len - 1);
+    /** */
+    public void cancel(boolean halt) {
+        if (halt)
+            updateQueue.clear();
 
-                            metastorage.write(historyItemKey(histItemVer), histItem);
-                        }
+        updateQueue.offer(new FutureTask<>(() -> STOP));
 
-                        metastorage.write(versionKey(), fullNodeData.ver);
+        U.join(runner(), log);
+    }
 
-                        workerDmsVer = fullNodeData.ver;
+    /** {@inheritDoc} */
+    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+        while (true) {
+            try {
+                curTask = updateQueue.take();
+            }
+            catch (InterruptedException ignore) {
+            }
 
-                        metastorage.remove(cleanupGuardKey());
-                    }
-                    else if (update instanceof Long) {
-                        long ver = (Long)update;
+            curTask.run();

Review comment:
       Since ``curTask`` is currently a class field we might get an NPE or run the previous task twice (if take throws InterruptedException) and that's not good either. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621296629



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
##########
@@ -525,76 +537,105 @@ else if (affNode && missed.isEmpty() && cctx.kernalContext().query().moduleEnabl
         for (CacheConfigurationSender ccfgSndr : ccfgSndrs)
             futs.add(CompletableFuture.runAsync(wrapExceptionIfStarted(ccfgSndr::sendCacheConfig), snpSndr.executor()));
 
-        for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
-            int grpId = e.getKey();
+        try {
+            for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+                int grpId = e.getKey();
+                String cacheDirName = cacheDirName(grpId);
 
-            CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
+                // Process partitions for a particular cache group.
+                for (int partId : e.getValue()) {
+                    GroupPartitionId pair = new GroupPartitionId(grpId, partId);
 
-            if (gctx == null) {
-                acceptException(new IgniteCheckedException("Cache group context has not found " +
-                    "due to the cache group is stopped: " + grpId));
+                    Long partLen = partFileLengths.get(pair);
 
-                break;
+                    CompletableFuture<Void> fut0 = CompletableFuture.runAsync(
+                        wrapExceptionIfStarted(() -> {
+                            snpSndr.sendPart(
+                                getPartitionFile(pageStore.workDir(), cacheDirName, partId),
+                                cacheDirName,
+                                pair,
+                                partLen);
+
+                            // Stop partition writer.
+                            partDeltaWriters.get(pair).markPartitionProcessed();
+                        }),
+                        snpSndr.executor())
+                        // Wait for the completion of both futures - checkpoint end, copy partition.
+                        .runAfterBothAsync(cpEndFut,
+                            wrapExceptionIfStarted(() -> {
+                                File delta = partDeltaWriters.get(pair).deltaFile;
+
+                                try {
+                                    // Atomically creates a new, empty delta file if and only if
+                                    // a file with this name does not yet exist.
+                                    delta.createNewFile();
+                                }
+                                catch (IOException ex) {
+                                    throw new IgniteCheckedException(ex);
+                                }
+
+                                snpSndr.sendDelta(delta, cacheDirName, pair);
+
+                                boolean deleted = delta.delete();
+
+                                assert deleted;
+                            }),
+                            snpSndr.executor());
+
+                    futs.add(fut0);
+                }
             }
 
-            // Process partitions for a particular cache group.
-            for (int partId : e.getValue()) {
-                GroupPartitionId pair = new GroupPartitionId(grpId, partId);
-
-                CacheConfiguration<?, ?> ccfg = gctx.config();
+            int futsSize = futs.size();
 
-                assert ccfg != null : "Cache configuration cannot be empty on snapshot creation: " + pair;
+            CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize]))
+                .whenComplete((res, t) -> {
+                    assert t == null : "Exception must never be thrown since a wrapper is used " +
+                        "for each snapshot task: " + t;
 
-                String cacheDirName = cacheDirName(ccfg);
-                Long partLen = partFileLengths.get(pair);
-
-                CompletableFuture<Void> fut0 = CompletableFuture.runAsync(
-                    wrapExceptionIfStarted(() -> {
-                        snpSndr.sendPart(
-                            getPartitionFile(pageStore.workDir(), cacheDirName, partId),
-                            cacheDirName,
-                            pair,
-                            partLen);
-
-                        // Stop partition writer.
-                        partDeltaWriters.get(pair).markPartitionProcessed();
-                    }),
-                    snpSndr.executor())
-                    // Wait for the completion of both futures - checkpoint end, copy partition.
-                    .runAfterBothAsync(cpEndFut,
-                        wrapExceptionIfStarted(() -> {
-                            File delta = partDeltaWriters.get(pair).deltaFile;
-
-                            try {
-                                // Atomically creates a new, empty delta file if and only if
-                                // a file with this name does not yet exist.
-                                delta.createNewFile();
-                            }
-                            catch (IOException ex) {
-                                throw new IgniteCheckedException(ex);
-                            }
+                    closeAsync();
+                });
+        }
+        catch (IgniteCheckedException e) {
+            acceptException(e);
+        }
+    }
 
-                            snpSndr.sendDelta(delta, cacheDirName, pair);
+    /**
+     * @param grpId Cache group id.
+     * @param parts Set of partitions to be processed.
+     * @param dirSupp Directory to init.
+     * @throws IgniteCheckedException If fails.
+     */
+    private void addPartitionWriters(int grpId, Set<Integer> parts, IgniteThrowableSupplier<String> dirSupp) throws IgniteCheckedException {

Review comment:
       why you need dirSupp supplier? Why don't pass dirName directly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622232211



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the pause task completed.
+     */
+    public Future<?> flush() {
+        return pauseTask;
     }
 
-    /** */
-    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
-        assert fullNodeData.fullData != null;
-        assert fullNodeData.hist != null;
-
-        updateQueue.clear();
+    /**
+     * @param compFut Future which should be completed when worker may proceed with updates.
+     */
+    public void pause(IgniteInternalFuture<?> compFut) {
+        latch = new CountDownLatch(1);
 
-        updateQueue.offer(fullNodeData);
-    }
+        updateQueue.offer(pauseTask = new FutureTask<>(() -> AWAIT));
 
-    /** */
-    public void removeHistItem(long ver) {
-        updateQueue.offer(ver);
+        compFut.listen(f -> latch.countDown());
     }
 
     /** */
-    public void cancel(boolean halt) throws InterruptedException {
-        if (halt)
-            updateQueue.clear();
+    public void update(DistributedMetaStorageHistoryItem histItem) {
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.write(historyItemKey(workerDmsVer.id() + 1), histItem);
 
-        updateQueue.offer(status = halt ? HALT : CANCEL);
+            workerDmsVer = workerDmsVer.nextVersion(histItem);
 
-        Thread runner = runner();
+            metastorage.write(versionKey(), workerDmsVer);
 
-        if (runner != null)
-            runner.join();
+            for (int i = 0, len = histItem.keys().length; i < len; i++)
+                write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
+        }));
     }
 
-    /** {@inheritDoc} */
-    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-        status = CONTINUE;
+    /** */
+    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
+        assert fullNodeData.fullData != null;
+        assert fullNodeData.hist != null;
 
-        try {
-            if (firstStart) {
-                firstStart = false;
+        updateQueue.clear();
 
-                lock.lock();
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
 
-                try {
-                    restore();
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
+            doCleanup();
 
-            while (true) {
-                Object update = updateQueue.peek();
+            for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
+                metastorage.writeRaw(localKey(item.key), item.valBytes);
 
-                try {
-                    update = updateQueue.take();
-                }
-                catch (InterruptedException ignore) {
-                }
+            for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
+                DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
 
-                lock.lock();
+                long histItemVer = fullNodeData.ver.id() + i - (len - 1);
 
-                try {
-                    // process update
-                    if (update instanceof DistributedMetaStorageHistoryItem)
-                        applyUpdate((DistributedMetaStorageHistoryItem)update);
-                    else if (update instanceof DistributedMetaStorageClusterNodeData) {
-                        DistributedMetaStorageClusterNodeData fullNodeData = (DistributedMetaStorageClusterNodeData)update;
+                metastorage.write(historyItemKey(histItemVer), histItem);
+            }
 
-                        metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
+            metastorage.write(versionKey(), fullNodeData.ver);
 
-                        doCleanup();
+            workerDmsVer = fullNodeData.ver;
 
-                        for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
-                            metastorage.writeRaw(localKey(item.key), item.valBytes);
+            metastorage.remove(cleanupGuardKey());
+        }));
+    }
 
-                        for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
-                            DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
+    /** */
+    public void removeHistItem(long ver) {
+        updateQueue.offer(newDmsTask(() -> metastorage.remove(historyItemKey(ver))));
+    }
 
-                            long histItemVer = fullNodeData.ver.id() + i - (len - 1);
+    /** */
+    public void cancel(boolean halt) {
+        if (halt)
+            updateQueue.clear();
 
-                            metastorage.write(historyItemKey(histItemVer), histItem);
-                        }
+        updateQueue.offer(new FutureTask<>(() -> STOP));
 
-                        metastorage.write(versionKey(), fullNodeData.ver);
+        U.join(runner(), log);
+    }
 
-                        workerDmsVer = fullNodeData.ver;
+    /** {@inheritDoc} */
+    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+        while (true) {
+            try {
+                curTask = updateQueue.take();
+            }
+            catch (InterruptedException ignore) {
+            }
 
-                        metastorage.remove(cleanupGuardKey());
-                    }
-                    else if (update instanceof Long) {
-                        long ver = (Long)update;
+            curTask.run();

Review comment:
       Since ``curTask`` is currently a class field we cannot get NPE, but we might run the previous task twice (if ``take()`` throws InterruptedException, for example  ``restore`` task) and that's not good either. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622368937



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();

Review comment:
       Instead of doing .run() you can define pauseTask as
   ```
   private volatile Future<?> pauseTask = CompletableFuture.completedFuture(AWAIT);
   ```
   and cast it in ``pause`` method to ``RunnableFuture`` like this
   ```
   updateQueue.offer((RunnableFuture<?>)(pauseTask = new FutureTask<>(() -> AWAIT)));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622229784



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the pause task completed.
+     */
+    public Future<?> flush() {
+        return pauseTask;
     }
 
-    /** */
-    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
-        assert fullNodeData.fullData != null;
-        assert fullNodeData.hist != null;
-
-        updateQueue.clear();
+    /**
+     * @param compFut Future which should be completed when worker may proceed with updates.
+     */
+    public void pause(IgniteInternalFuture<?> compFut) {
+        latch = new CountDownLatch(1);
 
-        updateQueue.offer(fullNodeData);
-    }
+        updateQueue.offer(pauseTask = new FutureTask<>(() -> AWAIT));
 
-    /** */
-    public void removeHistItem(long ver) {
-        updateQueue.offer(ver);
+        compFut.listen(f -> latch.countDown());
     }
 
     /** */
-    public void cancel(boolean halt) throws InterruptedException {
-        if (halt)
-            updateQueue.clear();
+    public void update(DistributedMetaStorageHistoryItem histItem) {
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.write(historyItemKey(workerDmsVer.id() + 1), histItem);
 
-        updateQueue.offer(status = halt ? HALT : CANCEL);
+            workerDmsVer = workerDmsVer.nextVersion(histItem);
 
-        Thread runner = runner();
+            metastorage.write(versionKey(), workerDmsVer);
 
-        if (runner != null)
-            runner.join();
+            for (int i = 0, len = histItem.keys().length; i < len; i++)
+                write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
+        }));
     }
 
-    /** {@inheritDoc} */
-    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-        status = CONTINUE;
+    /** */
+    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
+        assert fullNodeData.fullData != null;
+        assert fullNodeData.hist != null;
 
-        try {
-            if (firstStart) {
-                firstStart = false;
+        updateQueue.clear();
 
-                lock.lock();
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
 
-                try {
-                    restore();
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
+            doCleanup();
 
-            while (true) {
-                Object update = updateQueue.peek();
+            for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
+                metastorage.writeRaw(localKey(item.key), item.valBytes);
 
-                try {
-                    update = updateQueue.take();
-                }
-                catch (InterruptedException ignore) {
-                }
+            for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
+                DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
 
-                lock.lock();
+                long histItemVer = fullNodeData.ver.id() + i - (len - 1);
 
-                try {
-                    // process update
-                    if (update instanceof DistributedMetaStorageHistoryItem)
-                        applyUpdate((DistributedMetaStorageHistoryItem)update);
-                    else if (update instanceof DistributedMetaStorageClusterNodeData) {
-                        DistributedMetaStorageClusterNodeData fullNodeData = (DistributedMetaStorageClusterNodeData)update;
+                metastorage.write(historyItemKey(histItemVer), histItem);
+            }
 
-                        metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
+            metastorage.write(versionKey(), fullNodeData.ver);
 
-                        doCleanup();
+            workerDmsVer = fullNodeData.ver;
 
-                        for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
-                            metastorage.writeRaw(localKey(item.key), item.valBytes);
+            metastorage.remove(cleanupGuardKey());
+        }));
+    }
 
-                        for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
-                            DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
+    /** */
+    public void removeHistItem(long ver) {
+        updateQueue.offer(newDmsTask(() -> metastorage.remove(historyItemKey(ver))));
+    }
 
-                            long histItemVer = fullNodeData.ver.id() + i - (len - 1);
+    /** */
+    public void cancel(boolean halt) {
+        if (halt)
+            updateQueue.clear();
 
-                            metastorage.write(historyItemKey(histItemVer), histItem);
-                        }
+        updateQueue.offer(new FutureTask<>(() -> STOP));
 
-                        metastorage.write(versionKey(), fullNodeData.ver);
+        U.join(runner(), log);
+    }
 
-                        workerDmsVer = fullNodeData.ver;
+    /** {@inheritDoc} */
+    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+        while (true) {
+            try {
+                curTask = updateQueue.take();
+            }
+            catch (InterruptedException ignore) {
+            }
 
-                        metastorage.remove(cleanupGuardKey());
-                    }
-                    else if (update instanceof Long) {
-                        long ver = (Long)update;
+            curTask.run();

Review comment:
       As I can see if we catching InterruptedException then curTask may be ``null``, maybe it's better to add one big try/catch?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623822624



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });

Review comment:
       ```suggestion
           Function<IgniteConfiguration, String> pathProv = cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath();
           Set<String> keySet0 = new TreeSet<>();
           Set<String> keySet1 = new TreeSet<>();
   
           IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0), pathProv, SNAPSHOT_NAME, false);
           snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> keySet0.add(key));
   
           stopGrid(0);
   
           IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1), pathProv, SNAPSHOT_NAME, false);
           snp1.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> keySet1.add(key));
   
           assertEquals("Keys must be the same on all nodes", keySet0, keySet1);
   ```
   
   TreeSet (in suggested code) is only for more readable error output (not for comparision)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622232211



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the pause task completed.
+     */
+    public Future<?> flush() {
+        return pauseTask;
     }
 
-    /** */
-    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
-        assert fullNodeData.fullData != null;
-        assert fullNodeData.hist != null;
-
-        updateQueue.clear();
+    /**
+     * @param compFut Future which should be completed when worker may proceed with updates.
+     */
+    public void pause(IgniteInternalFuture<?> compFut) {
+        latch = new CountDownLatch(1);
 
-        updateQueue.offer(fullNodeData);
-    }
+        updateQueue.offer(pauseTask = new FutureTask<>(() -> AWAIT));
 
-    /** */
-    public void removeHistItem(long ver) {
-        updateQueue.offer(ver);
+        compFut.listen(f -> latch.countDown());
     }
 
     /** */
-    public void cancel(boolean halt) throws InterruptedException {
-        if (halt)
-            updateQueue.clear();
+    public void update(DistributedMetaStorageHistoryItem histItem) {
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.write(historyItemKey(workerDmsVer.id() + 1), histItem);
 
-        updateQueue.offer(status = halt ? HALT : CANCEL);
+            workerDmsVer = workerDmsVer.nextVersion(histItem);
 
-        Thread runner = runner();
+            metastorage.write(versionKey(), workerDmsVer);
 
-        if (runner != null)
-            runner.join();
+            for (int i = 0, len = histItem.keys().length; i < len; i++)
+                write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
+        }));
     }
 
-    /** {@inheritDoc} */
-    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-        status = CONTINUE;
+    /** */
+    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
+        assert fullNodeData.fullData != null;
+        assert fullNodeData.hist != null;
 
-        try {
-            if (firstStart) {
-                firstStart = false;
+        updateQueue.clear();
 
-                lock.lock();
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
 
-                try {
-                    restore();
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
+            doCleanup();
 
-            while (true) {
-                Object update = updateQueue.peek();
+            for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
+                metastorage.writeRaw(localKey(item.key), item.valBytes);
 
-                try {
-                    update = updateQueue.take();
-                }
-                catch (InterruptedException ignore) {
-                }
+            for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
+                DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
 
-                lock.lock();
+                long histItemVer = fullNodeData.ver.id() + i - (len - 1);
 
-                try {
-                    // process update
-                    if (update instanceof DistributedMetaStorageHistoryItem)
-                        applyUpdate((DistributedMetaStorageHistoryItem)update);
-                    else if (update instanceof DistributedMetaStorageClusterNodeData) {
-                        DistributedMetaStorageClusterNodeData fullNodeData = (DistributedMetaStorageClusterNodeData)update;
+                metastorage.write(historyItemKey(histItemVer), histItem);
+            }
 
-                        metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
+            metastorage.write(versionKey(), fullNodeData.ver);
 
-                        doCleanup();
+            workerDmsVer = fullNodeData.ver;
 
-                        for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
-                            metastorage.writeRaw(localKey(item.key), item.valBytes);
+            metastorage.remove(cleanupGuardKey());
+        }));
+    }
 
-                        for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
-                            DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
+    /** */
+    public void removeHistItem(long ver) {
+        updateQueue.offer(newDmsTask(() -> metastorage.remove(historyItemKey(ver))));
+    }
 
-                            long histItemVer = fullNodeData.ver.id() + i - (len - 1);
+    /** */
+    public void cancel(boolean halt) {
+        if (halt)
+            updateQueue.clear();
 
-                            metastorage.write(historyItemKey(histItemVer), histItem);
-                        }
+        updateQueue.offer(new FutureTask<>(() -> STOP));
 
-                        metastorage.write(versionKey(), fullNodeData.ver);
+        U.join(runner(), log);
+    }
 
-                        workerDmsVer = fullNodeData.ver;
+    /** {@inheritDoc} */
+    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+        while (true) {
+            try {
+                curTask = updateQueue.take();
+            }
+            catch (InterruptedException ignore) {
+            }
 
-                        metastorage.remove(cleanupGuardKey());
-                    }
-                    else if (update instanceof Long) {
-                        long ver = (Long)update;
+            curTask.run();

Review comment:
       Since ``curTask`` currently is a class field we can get NPE or run the previous task twice (if catching InterruptedException) and it doesn't look good too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623834676



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateOnSnapshotFail() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage()
+                        .write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)cctx.database();
+
+        db.addCheckpointListener(new CheckpointListener() {
+            /** {@inheritDoc} */
+            @Override public void onMarkCheckpointBegin(Context ctx) {
+                if (ctx.progress().reason().contains("Snapshot")) {
+                    Map<String, SnapshotFutureTask> locMap = GridTestUtils.getFieldValue(snp(ignite), IgniteSnapshotManager.class,
+                        "locSnpTasks");
+
+                    // Fail the snapshot task with an exception, all metastorage keys must be successfully continued.
+                    locMap.get(SNAPSHOT_NAME).acceptException(new IgniteCheckedException("Test exception"));
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onCheckpointBegin(Context ctx) {}
+
+            /** {@inheritDoc} */
+            @Override public void beforeCheckpointBegin(Context ctx) {}
+        });
+
+        IgniteFuture<?> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        GridTestUtils.assertThrowsAnyCause(log,
+            fut::get,
+            IgniteCheckedException.class,
+            "Test exception");
+
+        stop.set(true);
+
+        // Load future must complete without exceptions, all metastorage keys must be written.
+        updFut.get();
+
+        Set<Integer> allKeys = IntStream.range(0, keyCnt.get()).boxed().collect(Collectors.toSet());
+
+        ignite.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+            @Override public void accept(String key, Serializable val) {
+                try {
+                    assertTrue(allKeys.remove(Integer.parseInt(key.replace(SNAPSHOT_PREFIX, ""))));
+                }
+                catch (Throwable t) {
+                    fail("Exception reading metastorage: " + t.getMessage());
+                }
+            }
+        });

Review comment:
       ```suggestion
           ignite.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> {
               try {
                   assertTrue(allKeys.remove(Integer.parseInt(key.replace(SNAPSHOT_PREFIX, ""))));
               }
               catch (Throwable t) {
                   fail("Exception reading metastorage: " + t.getMessage());
               }
           });
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622199929



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
##########
@@ -1210,6 +1210,39 @@ private void onAckMessage(
         }
     }
 
+    /**
+     * @return Future which will be completed when all the updates prior to the pause processed.
+     */
+    public Future<?> flush() {
+        assert isPersistenceEnabled;
+
+        lock.readLock().lock();

Review comment:
       what readLock is guarding here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621404398



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
##########
@@ -525,76 +537,105 @@ else if (affNode && missed.isEmpty() && cctx.kernalContext().query().moduleEnabl
         for (CacheConfigurationSender ccfgSndr : ccfgSndrs)
             futs.add(CompletableFuture.runAsync(wrapExceptionIfStarted(ccfgSndr::sendCacheConfig), snpSndr.executor()));
 
-        for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
-            int grpId = e.getKey();
+        try {
+            for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+                int grpId = e.getKey();
+                String cacheDirName = cacheDirName(grpId);
 
-            CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
+                // Process partitions for a particular cache group.
+                for (int partId : e.getValue()) {
+                    GroupPartitionId pair = new GroupPartitionId(grpId, partId);
 
-            if (gctx == null) {
-                acceptException(new IgniteCheckedException("Cache group context has not found " +
-                    "due to the cache group is stopped: " + grpId));
+                    Long partLen = partFileLengths.get(pair);
 
-                break;
+                    CompletableFuture<Void> fut0 = CompletableFuture.runAsync(
+                        wrapExceptionIfStarted(() -> {
+                            snpSndr.sendPart(
+                                getPartitionFile(pageStore.workDir(), cacheDirName, partId),
+                                cacheDirName,
+                                pair,
+                                partLen);
+
+                            // Stop partition writer.
+                            partDeltaWriters.get(pair).markPartitionProcessed();
+                        }),
+                        snpSndr.executor())
+                        // Wait for the completion of both futures - checkpoint end, copy partition.
+                        .runAfterBothAsync(cpEndFut,
+                            wrapExceptionIfStarted(() -> {
+                                File delta = partDeltaWriters.get(pair).deltaFile;
+
+                                try {
+                                    // Atomically creates a new, empty delta file if and only if
+                                    // a file with this name does not yet exist.
+                                    delta.createNewFile();
+                                }
+                                catch (IOException ex) {
+                                    throw new IgniteCheckedException(ex);
+                                }
+
+                                snpSndr.sendDelta(delta, cacheDirName, pair);
+
+                                boolean deleted = delta.delete();
+
+                                assert deleted;
+                            }),
+                            snpSndr.executor());
+
+                    futs.add(fut0);
+                }
             }
 
-            // Process partitions for a particular cache group.
-            for (int partId : e.getValue()) {
-                GroupPartitionId pair = new GroupPartitionId(grpId, partId);
-
-                CacheConfiguration<?, ?> ccfg = gctx.config();
+            int futsSize = futs.size();
 
-                assert ccfg != null : "Cache configuration cannot be empty on snapshot creation: " + pair;
+            CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize]))
+                .whenComplete((res, t) -> {
+                    assert t == null : "Exception must never be thrown since a wrapper is used " +
+                        "for each snapshot task: " + t;
 
-                String cacheDirName = cacheDirName(ccfg);
-                Long partLen = partFileLengths.get(pair);
-
-                CompletableFuture<Void> fut0 = CompletableFuture.runAsync(
-                    wrapExceptionIfStarted(() -> {
-                        snpSndr.sendPart(
-                            getPartitionFile(pageStore.workDir(), cacheDirName, partId),
-                            cacheDirName,
-                            pair,
-                            partLen);
-
-                        // Stop partition writer.
-                        partDeltaWriters.get(pair).markPartitionProcessed();
-                    }),
-                    snpSndr.executor())
-                    // Wait for the completion of both futures - checkpoint end, copy partition.
-                    .runAfterBothAsync(cpEndFut,
-                        wrapExceptionIfStarted(() -> {
-                            File delta = partDeltaWriters.get(pair).deltaFile;
-
-                            try {
-                                // Atomically creates a new, empty delta file if and only if
-                                // a file with this name does not yet exist.
-                                delta.createNewFile();
-                            }
-                            catch (IOException ex) {
-                                throw new IgniteCheckedException(ex);
-                            }
+                    closeAsync();
+                });
+        }
+        catch (IgniteCheckedException e) {
+            acceptException(e);
+        }
+    }
 
-                            snpSndr.sendDelta(delta, cacheDirName, pair);
+    /**
+     * @param grpId Cache group id.
+     * @param parts Set of partitions to be processed.
+     * @param dirSupp Directory to init.
+     * @throws IgniteCheckedException If fails.
+     */
+    private void addPartitionWriters(int grpId, Set<Integer> parts, IgniteThrowableSupplier<String> dirSupp) throws IgniteCheckedException {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623824121



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });

Review comment:
       TreeSet (in suggested code) required only for the output (not for comparison)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623822624



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });

Review comment:
       ```suggestion
           Function<IgniteConfiguration, String> pathProv = cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath();
           Set<String> keySet0 = new TreeSet<>();
           Set<String> keySet1 = new TreeSet<>();
   
           IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0), pathProv, SNAPSHOT_NAME, false);
           snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> keySet0.add(key));
   
           stopGrid(0);
   
           IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1), pathProv, SNAPSHOT_NAME, false);
           snp1.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> keySet1.add(key));
   
           assertEquals("Keys must be the same on all nodes", keySet0, keySet1);
   ```
   
   TreeSet (in suggested code) is only for more readable error output (not for comparison)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621297816



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
##########
@@ -453,28 +471,22 @@ else if (affNode && missed.isEmpty() && cctx.kernalContext().query().moduleEnabl
 
                 CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
 
-                if (gctx == null) {
-                    throw new IgniteCheckedException("Cache group context has not found " +
-                        "due to the cache group is stopped: " + grpId);
-                }
-
-                for (int partId : e.getValue()) {
-                    GroupPartitionId pair = new GroupPartitionId(grpId, partId);
-
-                    PageStore store = pageStore.getStore(grpId, partId);
+                if (gctx == null)
+                    throw new IgniteCheckedException("Cache group is stopped : " + grpId);
 
-                    partDeltaWriters.put(pair,
-                        new PageStoreSerialWriter(store,
-                            partDeltaFile(cacheWorkDir(tmpConsIdDir, cacheDirName(gctx.config())), partId)));
+                ccfgs.add(gctx.config());
+                addPartitionWriters(grpId, e.getValue(), () -> FilePageStoreManager.cacheDirName(gctx.config()));
+            }
 
-                    partFileLengths.put(pair, store.size());
-                }
+            if (withMetaStorage) {
+                processed.put(MetaStorage.METASTORAGE_CACHE_ID, MetaStorage.partitions());
 
-                ccfgs.add(gctx.config());
+                addPartitionWriters(MetaStorage.METASTORAGE_CACHE_ID, MetaStorage.partitions(),
+                    () -> cacheDirName(MetaStorage.METASTORAGE_CACHE_ID));

Review comment:
       Why not pass MetaStorage.``META_STORAGE_DIR_NAME`` directly (instead of ``cacheDirName(MetaStorage.METASTORAGE_CACHE_ID)``)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622197012



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
##########
@@ -1210,6 +1210,39 @@ private void onAckMessage(
         }
     }
 
+    /**
+     * @return Future which will be completed when all the updates prior to the pause processed.
+     */
+    public Future<?> flush() {
+        assert isPersistenceEnabled;
+
+        lock.readLock().lock();
+
+        try {
+            return worker.flush();
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @param compFut Future which should be completed when worker may proceed with updates.
+     */
+    public void pauseMetaStorage(IgniteInternalFuture<?> compFut) {

Review comment:
       maybe it would be better to name this method ``suspend`` (or at least remove duplicate ``MetaStorage`` (metastorage.pauseMetaStorage(...))




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r621402696



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
##########
@@ -85,6 +86,15 @@
     /** */
     public static final int METASTORAGE_CACHE_ID = CU.cacheId(METASTORAGE_CACHE_NAME);
 
+    /** Metastorage cache directory to store data. */
+    public static final String META_STORAGE_DIR_NAME = "metastorage";

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623829310



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(Collections.singleton(0),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(Collections.singleton(1),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Iterator reads keys from the node heap map.
+        snp1.context().distributedMetastorage()
+            .iterate(SNAPSHOT_PREFIX, new BiConsumer<String, Serializable>() {
+                @Override public void accept(String key, Serializable value) {
+                    try {
+                        assertTrue("Keys must be the same on all nodes [key=" + key + ", all=" + allKeys + ']',
+                            allKeys.contains(key));
+                    }
+                    catch (Throwable t) {
+                        fail("Exception reading metastorage: " + t.getMessage());
+                    }
+                }
+            });
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateOnSnapshotFail() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage()
+                        .write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        GridCacheSharedContext<?, ?> cctx = ignite.context().cache().context();

Review comment:
       what's the point in declaring this variable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] xtern commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
xtern commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r622232211



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the pause task completed.
+     */
+    public Future<?> flush() {
+        return pauseTask;
     }
 
-    /** */
-    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
-        assert fullNodeData.fullData != null;
-        assert fullNodeData.hist != null;
-
-        updateQueue.clear();
+    /**
+     * @param compFut Future which should be completed when worker may proceed with updates.
+     */
+    public void pause(IgniteInternalFuture<?> compFut) {
+        latch = new CountDownLatch(1);
 
-        updateQueue.offer(fullNodeData);
-    }
+        updateQueue.offer(pauseTask = new FutureTask<>(() -> AWAIT));
 
-    /** */
-    public void removeHistItem(long ver) {
-        updateQueue.offer(ver);
+        compFut.listen(f -> latch.countDown());
     }
 
     /** */
-    public void cancel(boolean halt) throws InterruptedException {
-        if (halt)
-            updateQueue.clear();
+    public void update(DistributedMetaStorageHistoryItem histItem) {
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.write(historyItemKey(workerDmsVer.id() + 1), histItem);
 
-        updateQueue.offer(status = halt ? HALT : CANCEL);
+            workerDmsVer = workerDmsVer.nextVersion(histItem);
 
-        Thread runner = runner();
+            metastorage.write(versionKey(), workerDmsVer);
 
-        if (runner != null)
-            runner.join();
+            for (int i = 0, len = histItem.keys().length; i < len; i++)
+                write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
+        }));
     }
 
-    /** {@inheritDoc} */
-    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-        status = CONTINUE;
+    /** */
+    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
+        assert fullNodeData.fullData != null;
+        assert fullNodeData.hist != null;
 
-        try {
-            if (firstStart) {
-                firstStart = false;
+        updateQueue.clear();
 
-                lock.lock();
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
 
-                try {
-                    restore();
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
+            doCleanup();
 
-            while (true) {
-                Object update = updateQueue.peek();
+            for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
+                metastorage.writeRaw(localKey(item.key), item.valBytes);
 
-                try {
-                    update = updateQueue.take();
-                }
-                catch (InterruptedException ignore) {
-                }
+            for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
+                DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
 
-                lock.lock();
+                long histItemVer = fullNodeData.ver.id() + i - (len - 1);
 
-                try {
-                    // process update
-                    if (update instanceof DistributedMetaStorageHistoryItem)
-                        applyUpdate((DistributedMetaStorageHistoryItem)update);
-                    else if (update instanceof DistributedMetaStorageClusterNodeData) {
-                        DistributedMetaStorageClusterNodeData fullNodeData = (DistributedMetaStorageClusterNodeData)update;
+                metastorage.write(historyItemKey(histItemVer), histItem);
+            }
 
-                        metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
+            metastorage.write(versionKey(), fullNodeData.ver);
 
-                        doCleanup();
+            workerDmsVer = fullNodeData.ver;
 
-                        for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
-                            metastorage.writeRaw(localKey(item.key), item.valBytes);
+            metastorage.remove(cleanupGuardKey());
+        }));
+    }
 
-                        for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
-                            DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
+    /** */
+    public void removeHistItem(long ver) {
+        updateQueue.offer(newDmsTask(() -> metastorage.remove(historyItemKey(ver))));
+    }
 
-                            long histItemVer = fullNodeData.ver.id() + i - (len - 1);
+    /** */
+    public void cancel(boolean halt) {
+        if (halt)
+            updateQueue.clear();
 
-                            metastorage.write(historyItemKey(histItemVer), histItem);
-                        }
+        updateQueue.offer(new FutureTask<>(() -> STOP));
 
-                        metastorage.write(versionKey(), fullNodeData.ver);
+        U.join(runner(), log);
+    }
 
-                        workerDmsVer = fullNodeData.ver;
+    /** {@inheritDoc} */
+    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+        while (true) {
+            try {
+                curTask = updateQueue.take();
+            }
+            catch (InterruptedException ignore) {
+            }
 
-                        metastorage.remove(cleanupGuardKey());
-                    }
-                    else if (update instanceof Long) {
-                        long ver = (Long)update;
+            curTask.run();

Review comment:
       Since ``curTask`` is currently a class field we might run the previous task twice (if ``take()`` throws InterruptedException, for example  ``restore`` task) and that's not good either. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9047: IGNITE-14572 Add metastorage to snapshot

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623073293



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(new HashSet<>(Collections.singletonList(0)),

Review comment:
       Fixed.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.processors.metastorage.persistence.DmsDataWriterWorker;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Cluster-wide snapshot with distributed metastorage test.
+ */
+public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest {
+    /** */
+    private static final String SNAPSHOT_PREFIX = "SNAPSHOT_PREFIX_";
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithMetastorage() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid();
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertEquals("value", snp.context().distributedMetastorage().read("key"));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testMetastorageUpdateDuringSnapshot() throws Exception {
+        AtomicInteger keyCnt = new AtomicInteger();
+        AtomicBoolean stop = new AtomicBoolean();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted() && !stop.get()) {
+                try {
+                    ignite.context().distributedMetastorage().write(SNAPSHOT_PREFIX + keyCnt.getAndIncrement(), "value");
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, 3, "dms-updater");
+
+        DmsDataWriterWorker worker = GridTestUtils.getFieldValue(ignite.context().distributedMetastorage(),
+            DistributedMetaStorageImpl.class, "worker");
+        LinkedBlockingQueue<RunnableFuture<?>> queue = GridTestUtils.getFieldValue(worker, DmsDataWriterWorker.class,
+            "updateQueue");
+
+        RunnableFuture<?> testTask = new FutureTask<>(() -> {
+            U.await(latch);
+
+            return null;
+        });
+
+        queue.offer(testTask);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> queue.size() > 10, getTestTimeout()));
+
+        ignite.context().cache().context().exchange()
+            .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                /** {@inheritDoc} */
+                @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                    latch.countDown();
+                }
+            });
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stop.set(true);
+        updFut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp0 = startGridsFromSnapshot(new HashSet<>(Collections.singletonList(0)),
+            cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        Set<String> allKeys = new HashSet<>();
+        snp0.context().distributedMetastorage().iterate(SNAPSHOT_PREFIX, (key, val) -> allKeys.add(key));
+
+        stopGrid(0);
+
+        IgniteEx snp1 = startGridsFromSnapshot(new HashSet<>(Collections.singletonList(1)),

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org