You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2021/11/22 07:39:57 UTC
[ignite] 02/02: IGNITE-14744 Restore snapshot taken on different topologies (#9539)
This is an automated email from the ASF dual-hosted git repository.
mmuzaf pushed a commit to branch ignite-2.12
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 0a7ccfa12dee96f4eee11fa2a78399fdde8890a8
Author: Maxim Muzafarov <mm...@apache.org>
AuthorDate: Sat Nov 20 16:14:25 2021 +0300
IGNITE-14744 Restore snapshot taken on different topologies (#9539)
---
.../communication/AbstractTransmission.java | 4 +-
.../managers/communication/FileReceiver.java | 3 +-
.../managers/communication/GridIoManager.java | 28 +-
.../communication/GridIoMessageFactory.java | 4 +
.../communication/TransmissionHandler.java | 7 +-
.../processors/cache/CacheGroupContext.java | 2 +-
.../processors/cache/ClusterCachesInfo.java | 4 +-
.../internal/processors/cache/ExchangeActions.java | 4 +-
.../persistence/file/FilePageStoreManager.java | 30 +-
.../snapshot/AbstractSnapshotFutureTask.java | 136 +++
.../snapshot/AbstractSnapshotMessage.java | 108 +++
.../snapshot/IgniteSnapshotManager.java | 872 +++++++++++++++++--
.../snapshot/IgniteSnapshotVerifyException.java | 3 +
.../snapshot/SnapshotFilesFailureMessage.java | 134 +++
.../snapshot/SnapshotFilesRequestMessage.java | 174 ++++
...eption.java => SnapshotFinishedFutureTask.java} | 38 +-
.../persistence/snapshot/SnapshotFutureTask.java | 117 +--
.../persistence/snapshot/SnapshotMetadata.java | 57 +-
.../snapshot/SnapshotPartitionsVerifyHandler.java | 9 +-
.../snapshot/SnapshotResponseRemoteFutureTask.java | 172 ++++
.../snapshot/SnapshotRestoreProcess.java | 962 +++++++++++++++------
.../util/distributed/DistributedProcess.java | 5 +
.../apache/ignite/internal/util/lang/GridFunc.java | 2 +-
.../snapshot/AbstractSnapshotSelfTest.java | 90 +-
.../snapshot/IgniteClusterSnapshotCheckTest.java | 1 +
.../IgniteClusterSnapshotRestoreBaseTest.java | 36 +
.../IgniteClusterSnapshotRestoreSelfTest.java | 77 +-
.../snapshot/IgniteSnapshotManagerSelfTest.java | 5 +-
.../snapshot/IgniteSnapshotRemoteRequestTest.java | 336 +++++++
.../IgniteSnapshotRestoreFromRemoteTest.java | 376 ++++++++
.../IgniteBasicWithPersistenceTestSuite.java | 4 +
...niteClusterSnapshotRestoreWithIndexingTest.java | 48 +-
32 files changed, 3303 insertions(+), 545 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java
index bd1da54..aed45f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java
@@ -57,9 +57,9 @@ abstract class AbstractTransmission implements Closeable {
int chunkSize
) {
A.notNull(meta, "Initial file meta cannot be null");
- A.notNullOrEmpty(meta.name(), "Trasmisson name cannot be empty or null");
+ A.notNullOrEmpty(meta.name(), "Transmission name cannot be empty or null");
A.ensure(meta.offset() >= 0, "File start position cannot be negative");
- A.ensure(meta.count() > 0, "Total number of bytes to transfer must be greater than zero");
+ A.ensure(meta.count() >= 0, "Total number of bytes to transfer can't be less than zero");
A.notNull(stopChecker, "Process stop checker cannot be null");
A.ensure(chunkSize > 0, "Size of chunks to transfer data must be positive");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java
index 6af3ca4..c826e4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java
@@ -82,7 +82,8 @@ class FileReceiver extends TransmissionReceiver {
fileIo.position(meta.offset());
}
catch (IOException e) {
- throw new IgniteException("Unable to open destination file. Receiver will will be stopped", e);
+ throw new IgniteException("Unable to open destination file. Receiver will be stopped: " +
+ file.getAbsolutePath(), e);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 6b4395b..3476192 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -970,7 +970,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (nodeId.equals(e.getValue().rmtNodeId)) {
it.remove();
- interruptRecevier(e.getValue(),
+ interruptReceiver(e.getValue(),
new ClusterTopologyCheckedException("Remote node left the grid. " +
"Receiver has been stopped : " + nodeId));
}
@@ -1179,7 +1179,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
for (ReceiverContext rctx : rcvs) {
- interruptRecevier(rctx, new NodeStoppingException("Local node io manager requested to be stopped: "
+ interruptReceiver(rctx, new NodeStoppingException("Local node io manager requested to be stopped: "
+ ctx.localNodeId()));
}
}
@@ -1965,7 +1965,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
rcvCtx0 = rcvCtxs.remove(topic);
}
- interruptRecevier(rcvCtx0,
+ interruptReceiver(rcvCtx0,
new IgniteCheckedException("Receiver has been closed due to removing corresponding transmission handler " +
"on local node [nodeId=" + ctx.localNodeId() + ']'));
}
@@ -2787,7 +2787,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param rctx Receiver context to use.
* @param ex Exception to close receiver with.
*/
- private void interruptRecevier(ReceiverContext rctx, Exception ex) {
+ private void interruptReceiver(ReceiverContext rctx, Exception ex) {
if (rctx == null)
return;
@@ -2800,9 +2800,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
rctx.lastState = rctx.lastState == null ?
new TransmissionMeta(ex) : rctx.lastState.error(ex);
- rctx.hnd.onException(rctx.rmtNodeId, ex);
+ if (X.hasCause(ex, TransmissionCancelledException.class)) {
+ if (log.isInfoEnabled())
+ log.info("Transmission receiver has been cancelled [rctx=" + rctx + ']');
+ }
+ else
+ U.error(log, "Receiver has been interrupted due to an exception occurred [rctx=" + rctx + ']', ex);
- U.error(log, "Receiver has been interrupted due to an exception occurred [ctx=" + rctx + ']', ex);
+ rctx.hnd.onException(rctx.rmtNodeId, ex);
}
}
@@ -2856,7 +2861,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
"It's not allowed to process different sessions over the same topic simultaneously. " +
"Channel will be closed [initMsg=" + initMsg + ", channel=" + ch + ", nodeId=" + rmtNodeId + ']');
- U.error(log, err);
+ U.error(log, "Error has been sent back to remote node. Receiver holds the local topic " +
+ "[topic=" + topic + ", rmtNodeId=" + rmtNodeId + ", ctx=" + rcvCtx + ']', err);
out.writeObject(new TransmissionMeta(err));
@@ -2881,17 +2887,15 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (rcvCtx.lastState == null || rcvCtx.lastState.error() == null)
receiveFromChannel(topic, rcvCtx, in, out, ch);
else
- interruptRecevier(rcvCtxs.remove(topic), rcvCtx.lastState.error());
+ interruptReceiver(rcvCtxs.remove(topic), rcvCtx.lastState.error());
}
finally {
rcvCtx.lock.unlock();
}
}
catch (Throwable t) {
- U.error(log, "Download session cannot be finished due to an unexpected error [ctx=" + rcvCtx + ']', t);
-
// Do not remove receiver context here, since sender will recconect to get this error.
- interruptRecevier(rcvCtx, new IgniteCheckedException("Channel processing error [nodeId=" + rmtNodeId + ']', t));
+ interruptReceiver(rcvCtx, new IgniteCheckedException("Channel processing error [nodeId=" + rmtNodeId + ']', t));
}
finally {
U.closeQuiet(in);
@@ -2991,7 +2995,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
@Override public void onTimeout() {
- interruptRecevier(rcvCtxs.remove(topic), new IgniteCheckedException("Receiver is closed due to " +
+ interruptReceiver(rcvCtxs.remove(topic), new IgniteCheckedException("Receiver is closed due to " +
"waiting for the reconnect has been timeouted"));
}
});
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index d39f7c3..567cf7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -136,6 +136,8 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotReq
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFilesFailureMessage;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFilesRequestMessage;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -371,6 +373,8 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
factory.register(SessionChannelMessage.TYPE_CODE, SessionChannelMessage::new);
factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new);
factory.register((short)177, TcpInverseConnectionResponseMessage::new);
+ factory.register(SnapshotFilesRequestMessage.TYPE_CODE, SnapshotFilesRequestMessage::new);
+ factory.register(SnapshotFilesFailureMessage.TYPE_CODE, SnapshotFilesFailureMessage::new);
// [-3..119] [124..129] [-23..-28] [-36..-55] - this
// [120..123] - DR
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
index da0dd69..a3c9f2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
@@ -74,13 +74,16 @@ public interface TransmissionHandler {
*
* @param nodeId Remote node id from which request has been received.
* @param initMeta Initial handler meta info.
- * @return Intance of read handler to process incoming data like the {@link FileChannel} manner.
+ * @return Instance of read handler to process incoming data like the {@link FileChannel} manner.
*/
public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta);
/**
+ * The {@link TransmissionCancelledException} will be received by exception handler if the local transmission
+ * ends by the user interruption request.
+ *
* @param nodeId Remote node id on which the error occurred.
- * @param err The err of fail handling process.
+ * @param err The error of fail handling process.
*/
public void onException(UUID nodeId, Throwable err);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 6b7b06c..7d6eec0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -639,7 +639,7 @@ public class CacheGroupContext {
/**
* @return Cache shared context.
*/
- public GridCacheSharedContext shared() {
+ public GridCacheSharedContext<?, ?> shared() {
return ctx;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 2841e02..b8bd968 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -995,6 +995,8 @@ public class ClusterCachesInfo {
DynamicCacheChangeRequest req,
String cacheName
) {
+ assert exchangeActions != null;
+
CacheConfiguration<?, ?> ccfg = req.startCacheConfiguration();
IgniteCheckedException err = null;
@@ -1038,7 +1040,7 @@ public class ClusterCachesInfo {
if (err == null && req.restartId() == null) {
IgniteSnapshotManager snapshotMgr = ctx.cache().context().snapshotMgr();
- if (snapshotMgr.isRestoring(cacheName, ccfg.getGroupName())) {
+ if (snapshotMgr.isRestoring(ccfg)) {
err = new IgniteCheckedException("Cache start failed. A cache or group with the same name is " +
"currently being restored from a snapshot [cache=" + cacheName +
(ccfg.getGroupName() == null ? "" : ", group=" + ccfg.getGroupName()) + ']');
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 8736a88..3b93244 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -446,10 +446,10 @@ public class ExchangeActions {
*
*/
public static class CacheGroupActionData {
- /** */
+ /** Cache group descriptor. */
private final CacheGroupDescriptor desc;
- /** */
+ /** Destroy flag. */
private final boolean destroy;
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 7e2ff26..4601ec0 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -134,6 +134,12 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
public static final String CACHE_GRP_DIR_PREFIX = "cacheGroup-";
/** */
+ public static final Predicate<File> DATA_DIR_FILTER = dir ->
+ dir.getName().startsWith(CACHE_DIR_PREFIX) ||
+ dir.getName().startsWith(CACHE_GRP_DIR_PREFIX) ||
+ dir.getName().equals(MetaStorage.METASTORAGE_DIR_NAME);
+
+ /** */
public static final String CACHE_DATA_FILENAME = "cache_data.dat";
/** */
@@ -1015,16 +1021,34 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
if (files == null)
return Collections.emptyList();
- return Arrays.stream(dir.listFiles())
+ return Arrays.stream(files)
.sorted()
.filter(File::isDirectory)
- .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX) ||
- f.getName().equals(MetaStorage.METASTORAGE_DIR_NAME))
+ .filter(DATA_DIR_FILTER)
.filter(f -> names.test(cacheGroupName(f)))
.collect(Collectors.toList());
}
/**
+ * @param dir Directory to check.
+ * @param grpId Cache group id
+ * @return Files that match cache or cache group pattern.
+ */
+ public static File cacheDirectory(File dir, int grpId) {
+ File[] files = dir.listFiles();
+
+ if (files == null)
+ return null;
+
+ return Arrays.stream(files)
+ .filter(File::isDirectory)
+ .filter(DATA_DIR_FILTER)
+ .filter(f -> CU.cacheId(cacheGroupName(f)) == grpId)
+ .findAny()
+ .orElse(null);
+ }
+
+ /**
* @param partFileName Partition file name.
* @return Partition id.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
new file mode 100644
index 0000000..b4c65af
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * @param <T> Type of snapshot processing result.
+ */
+abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> {
+ /** Shared context. */
+ protected final GridCacheSharedContext<?, ?> cctx;
+
+ /** Ignite logger. */
+ protected final IgniteLogger log;
+
+ /** Node id which cause snapshot operation. */
+ protected final UUID srcNodeId;
+
+ /** Unique identifier of snapshot process. */
+ protected final String snpName;
+
+ /** Snapshot working directory on file system. */
+ protected final File tmpSnpWorkDir;
+
+ /** IO factory which will be used for creating snapshot delta-writers. */
+ protected final FileIOFactory ioFactory;
+
+ /** Snapshot data sender. */
+ @GridToStringExclude
+ protected final SnapshotSender snpSndr;
+
+ /** Partition to be processed. */
+ protected final Map<Integer, Set<Integer>> parts;
+
+ /** An exception which has been occurred during snapshot processing. */
+ protected final AtomicReference<Throwable> err = new AtomicReference<>();
+
+ /**
+ * @param cctx Shared context.
+ * @param srcNodeId Node id which cause snapshot task creation.
+ * @param snpName Unique identifier of snapshot process.
+ * @param tmpWorkDir Working directory for intermediate snapshot results.
+ * @param ioFactory Factory to working with snapshot files.
+ * @param snpSndr Factory which produces snapshot receiver instance.
+ * @param parts Partition to be processed.
+ */
+ protected AbstractSnapshotFutureTask(
+ GridCacheSharedContext<?, ?> cctx,
+ UUID srcNodeId,
+ String snpName,
+ File tmpWorkDir,
+ FileIOFactory ioFactory,
+ SnapshotSender snpSndr,
+ Map<Integer, Set<Integer>> parts
+ ) {
+ assert snpName != null : "Snapshot name cannot be empty or null.";
+ assert snpSndr != null : "Snapshot sender which handles execution tasks must be not null.";
+ assert snpSndr.executor() != null : "Executor service must be not null.";
+
+ this.cctx = cctx;
+ this.log = cctx.logger(AbstractSnapshotFutureTask.class);
+ this.srcNodeId = srcNodeId;
+ this.snpName = snpName;
+ this.tmpSnpWorkDir = new File(tmpWorkDir, snpName);
+ this.ioFactory = ioFactory;
+ this.snpSndr = snpSndr;
+ this.parts = parts;
+ }
+
+ /**
+ * @return Snapshot name.
+ */
+ public String snapshotName() {
+ return snpName;
+ }
+
+ /**
+ * @return Node id which triggers this operation.
+ */
+ public UUID sourceNodeId() {
+ return srcNodeId;
+ }
+
+ /**
+ * Initiates snapshot task.
+ *
+ * @return {@code true} if task started by this call.
+ */
+ public abstract boolean start();
+
+ /**
+ * @param th An exception which occurred during snapshot processing.
+ */
+ public abstract void acceptException(Throwable th);
+
+ /** {@inheritDoc} */
+ @Override public boolean cancel() {
+ // Cancellation of snapshot future should not throw an exception.
+ acceptException(new IgniteFutureCancelledCheckedException("Snapshot operation has been cancelled " +
+ "by external process [snpName=" + snpName + ']'));
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AbstractSnapshotFutureTask.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java
new file mode 100644
index 0000000..e76ca16
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+abstract class AbstractSnapshotMessage implements Message {
+ /** Unique request id. */
+ private String reqId;
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ protected AbstractSnapshotMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param reqId Unique request id.
+ */
+ protected AbstractSnapshotMessage(String reqId) {
+ assert U.alphanumericUnderscore(reqId) : reqId;
+
+ this.reqId = reqId;
+ }
+
+ /**
+ * @return Unique request id.
+ */
+ public String requestId() {
+ return reqId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ if (writer.state() == 0) {
+ if (!writer.writeString("reqId", reqId))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (reader.state() == 0) {
+ reqId = reader.readString("reqId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ return reader.afterMessageRead(AbstractSnapshotMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AbstractSnapshotMessage.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 7fb70ca..7207d57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -36,6 +36,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
@@ -50,26 +51,40 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
+import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSnapshot;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.SnapshotEvent;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFeatures;
@@ -78,6 +93,12 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.communication.TransmissionCancelledException;
+import org.apache.ignite.internal.managers.communication.TransmissionHandler;
+import org.apache.ignite.internal.managers.communication.TransmissionMeta;
+import org.apache.ignite.internal.managers.communication.TransmissionPolicy;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -117,6 +138,7 @@ import org.apache.ignite.internal.util.GridBusyLock;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.distributed.InitMessage;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
@@ -153,6 +175,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridClosureCallMode.BALANCE;
import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
import static org.apache.ignite.internal.IgniteFeatures.PERSISTENCE_CACHE_SNAPSHOT;
+import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
import static org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
import static org.apache.ignite.internal.MarshallerContextImpl.resolveMappingFileStoreWorkDir;
import static org.apache.ignite.internal.MarshallerContextImpl.saveMappings;
@@ -171,6 +194,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirectories;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileName;
import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver.DB_DEFAULT_FOLDER;
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
@@ -209,6 +233,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** Text Reason for checkpoint to start snapshot operation. */
public static final String CP_SNAPSHOT_REASON = "Checkpoint started to enforce snapshot operation: %s";
+ /** Name prefix for each remote snapshot operation. */
+ public static final String RMT_SNAPSHOT_PREFIX = "snapshot_";
+
/** Default snapshot directory for loading remote snapshots. */
public static final String DFLT_SNAPSHOT_TMP_DIR = "snp";
@@ -216,8 +243,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
public static final String SNP_IN_PROGRESS_ERR_MSG = "Operation rejected due to the snapshot operation in progress.";
/** Error message to finalize snapshot tasks. */
- public static final String SNP_NODE_STOPPING_ERR_MSG = "Snapshot has been cancelled due to the local node " +
- "is stopping";
+ public static final String SNP_NODE_STOPPING_ERR_MSG = "The operation is cancelled due to the local node is stopping";
/** Metastorage key to save currently running snapshot. */
public static final String SNP_RUNNING_KEY = "snapshot-running";
@@ -240,6 +266,27 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** Total number of thread to perform local snapshot. */
private static final int SNAPSHOT_THREAD_POOL_SIZE = 4;
+ /** Default snapshot topic to receive snapshots from remote node. */
+ private static final Object DFLT_INITIAL_SNAPSHOT_TOPIC = GridTopic.TOPIC_SNAPSHOT.topic("rmt_snp");
+
+ /** File transmission parameter of cache group id. */
+ private static final String SNP_GRP_ID_PARAM = "grpId";
+
+ /** File transmission parameter of cache partition id. */
+ private static final String SNP_PART_ID_PARAM = "partId";
+
+ /** File transmission parameter of node-sender directory path with its consistentId (e.g. db/IgniteNode0). */
+ private static final String SNP_DB_NODE_PATH_PARAM = "dbNodePath";
+
+ /** File transmission parameter of a cache directory with is currently sends its partitions. */
+ private static final String SNP_CACHE_DIR_NAME_PARAM = "cacheDirName";
+
+ /** Snapshot parameter name for a file transmission. */
+ private static final String RQ_ID_NAME_PARAM = "rqId";
+
+ /** Total snapshot files count which receiver should expect to receive. */
+ private static final String SNP_PARTITIONS_CNT = "partsCnt";
+
/**
* Local buffer to perform copy-on-write operations with pages for {@code SnapshotFutureTask.PageStoreSerialWriter}s.
* It is important to have only one buffer per thread (instead of creating each buffer per
@@ -249,7 +296,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
private final ThreadLocal<ByteBuffer> locBuff;
/** Map of registered cache snapshot processes and their corresponding contexts. */
- private final ConcurrentMap<String, SnapshotFutureTask> locSnpTasks = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, AbstractSnapshotFutureTask<?>> locSnpTasks = new ConcurrentHashMap<>();
/** Lock to protect the resources is used. */
private final GridBusyLock busyLock = new GridBusyLock();
@@ -278,6 +325,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** Local snapshot sender factory. */
private Function<String, SnapshotSender> locSndrFactory = LocalSnapshotSender::new;
+ /** Remote snapshot sender factory. */
+ private BiFunction<String, UUID, SnapshotSender> rmtSndrFactory = this::remoteSnapshotSenderFactory;
+
/** Main snapshot directory to save created snapshots. */
private volatile File locSnpDir;
@@ -314,6 +364,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** Snapshot operation handlers. */
private final SnapshotHandlers handlers = new SnapshotHandlers();
+ /** Manager to receive responses of remote snapshot requests. */
+ private final SequentialRemoteSnapshotManager snpRmtMgr;
+
/**
* @param ctx Kernal context.
*/
@@ -331,6 +384,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
restoreCacheGrpProc = new SnapshotRestoreProcess(ctx);
+
+ // Manage remote snapshots.
+ snpRmtMgr = new SequentialRemoteSnapshotManager();
}
/**
@@ -405,6 +461,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
"the configured via IgniteConfiguration snapshot working path.");
cctx.exchange().registerExchangeAwareComponent(this);
+
ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
cctx.gridEvents().addDiscoveryEventListener(discoLsnr = (evt, discoCache) -> {
@@ -427,19 +484,23 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
endSnpProc.start(snpReq.requestId(), snpReq);
}
- for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+ for (AbstractSnapshotFutureTask<?> sctx : locSnpTasks.values()) {
if (sctx.sourceNodeId().equals(leftNodeId) ||
(reqNodeLeft && snpReq.snapshotName().equals(sctx.snapshotName())))
sctx.acceptException(new ClusterTopologyCheckedException(err));
}
restoreCacheGrpProc.onNodeLeft(leftNodeId);
+ snpRmtMgr.onNodeLeft(leftNodeId);
}
}
finally {
busyLock.leaveBusy();
}
}, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+ cctx.gridIO().addMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC, snpRmtMgr);
+ cctx.kernalContext().io().addTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC, snpRmtMgr);
}
/** {@inheritDoc} */
@@ -450,11 +511,13 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
restoreCacheGrpProc.interrupt(new NodeStoppingException("Node is stopping."));
// Try stop all snapshot processing if not yet.
- for (SnapshotFutureTask sctx : locSnpTasks.values())
+ for (AbstractSnapshotFutureTask<?> sctx : locSnpTasks.values())
sctx.acceptException(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
locSnpTasks.clear();
+ snpRmtMgr.stop();
+
synchronized (snpOpMux) {
if (clusterSnpFut != null) {
clusterSnpFut.onDone(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
@@ -466,6 +529,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
if (snpRunner != null)
snpRunner.shutdownNow();
+ cctx.kernalContext().io().removeMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC);
+ cctx.kernalContext().io().removeTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC);
+
if (discoLsnr != null)
cctx.kernalContext().event().removeDiscoveryEventListener(discoLsnr);
@@ -639,7 +705,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
parts.put(grpId, null);
}
- IgniteInternalFuture<Set<GroupPartitionId>> task0;
+ IgniteInternalFuture<?> task0;
if (parts.isEmpty() && !withMetaStorage)
task0 = new GridFinishedFuture<>(Collections.emptySet());
@@ -650,7 +716,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
withMetaStorage,
locSndrFactory.apply(req.snapshotName()));
- if (withMetaStorage) {
+ if (withMetaStorage && task0 instanceof SnapshotFutureTask) {
((DistributedMetaStorageImpl)cctx.kernalContext().distributedMetastorage())
.suspend(((SnapshotFutureTask)task0).started());
}
@@ -681,7 +747,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
cctx.gridConfig().getDataStorageConfiguration().getPageSize(),
grpIds,
blts,
- fut.result());
+ (Set<GroupPartitionId>)fut.result());
try (OutputStream out = new BufferedOutputStream(new FileOutputStream(smf))) {
U.marshal(marsh, meta, out);
@@ -903,12 +969,11 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/**
* Check if the cache or group with the specified name is currently being restored from the snapshot.
*
- * @param cacheName Cache name.
- * @param grpName Cache group name.
+ * @param ccfg Cache configuration.
* @return {@code True} if the cache or group with the specified name is being restored.
*/
- public boolean isRestoring(String cacheName, @Nullable String grpName) {
- return restoreCacheGrpProc.isRestoring(cacheName, grpName);
+ public boolean isRestoring(CacheConfiguration<?, ?> ccfg) {
+ return restoreCacheGrpProc.isRestoring(ccfg);
}
/**
@@ -980,7 +1045,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
busyLock.enterBusy();
try {
- for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+ for (AbstractSnapshotFutureTask<?> sctx : locSnpTasks.values()) {
if (sctx.snapshotName().equals(name))
sctx.cancel();
}
@@ -1415,7 +1480,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
SnapshotOperationRequest snpReq = clusterSnpReq;
- SnapshotFutureTask task = locSnpTasks.get(snpReq.snapshotName());
+ AbstractSnapshotFutureTask<?> task = locSnpTasks.get(snpReq.snapshotName());
if (task == null)
return;
@@ -1427,7 +1492,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
try {
long start = U.currentTimeMillis();
- task.started().get();
+ ((SnapshotFutureTask)task).started().get();
if (log.isInfoEnabled()) {
log.info("Finished waiting for a synchronized checkpoint under topology lock " +
@@ -1441,12 +1506,45 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
}
/**
+ * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
+ * @param rmtNodeId The remote node to connect to.
+ * @param partHnd Received partition handler.
+ */
+ public IgniteInternalFuture<Void> requestRemoteSnapshotFiles(
+ UUID rmtNodeId,
+ String snpName,
+ Map<Integer, Set<Integer>> parts,
+ BooleanSupplier stopChecker,
+ BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+ ) throws IgniteCheckedException {
+ assert U.alphanumericUnderscore(snpName) : snpName;
+ assert partHnd != null;
+
+ ClusterNode rmtNode = cctx.discovery().node(rmtNodeId);
+
+ if (rmtNode == null) {
+ throw new ClusterTopologyCheckedException("Snapshot remote request cannot be performed. " +
+ "Remote node left the grid [rmtNodeId=" + rmtNodeId + ']');
+ }
+
+ if (!nodeSupports(rmtNode, PERSISTENCE_CACHE_SNAPSHOT))
+ throw new IgniteCheckedException("Snapshot on remote node is not supported: " + rmtNode.id());
+
+ RemoteSnapshotFilesRecevier fut = new RemoteSnapshotFilesRecevier(this, rmtNodeId, snpName, parts, stopChecker, partHnd);
+
+ snpRmtMgr.submit(fut);
+
+ return fut;
+ }
+
+ /**
* @param grps List of cache groups which will be destroyed.
*/
public void onCacheGroupsStopped(List<Integer> grps) {
- for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+ for (AbstractSnapshotFutureTask<?> sctx : F.view(locSnpTasks.values(), t -> t instanceof SnapshotFutureTask)) {
Set<Integer> retain = new HashSet<>(grps);
- retain.retainAll(sctx.affectedCacheGroups());
+
+ retain.retainAll(((SnapshotFutureTask)sctx).affectedCacheGroups());
if (!retain.isEmpty()) {
sctx.acceptException(new IgniteCheckedException("Snapshot has been interrupted due to some of the required " +
@@ -1572,59 +1670,63 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
* @param snpSndr Factory which produces snapshot receiver instance.
* @return Snapshot operation task which should be registered on checkpoint to run.
*/
- SnapshotFutureTask registerSnapshotTask(
+ AbstractSnapshotFutureTask<?> registerSnapshotTask(
String snpName,
UUID srcNodeId,
Map<Integer, Set<Integer>> parts,
boolean withMetaStorage,
SnapshotSender snpSndr
) {
- if (!busyLock.enterBusy()) {
- return new SnapshotFutureTask(
- new IgniteCheckedException("Snapshot manager is stopping [locNodeId=" + cctx.localNodeId() + ']'));
- }
-
- try {
- if (locSnpTasks.containsKey(snpName))
- return new SnapshotFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " + snpName));
+ AbstractSnapshotFutureTask<?> task = registerTask(snpName, new SnapshotFutureTask(cctx, srcNodeId, snpName,
+ tmpWorkDir, ioFactory, snpSndr, parts, withMetaStorage, locBuff));
- SnapshotFutureTask snpFutTask;
+ if (!withMetaStorage) {
+ for (Integer grpId : parts.keySet()) {
+ if (!cctx.cache().isEncrypted(grpId))
+ continue;
- SnapshotFutureTask prev = locSnpTasks.putIfAbsent(snpName,
- snpFutTask = new SnapshotFutureTask(cctx,
- srcNodeId,
- snpName,
- tmpWorkDir,
- ioFactory,
- snpSndr,
- parts,
- withMetaStorage,
- locBuff));
+ task.onDone(new IgniteCheckedException("Snapshot contains encrypted cache group " + grpId +
+ " but doesn't include metastore. Metastore is required because it contains encryption keys " +
+ "required to start with encrypted caches contained in the snapshot."));
- if (prev != null)
- return new SnapshotFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " + snpName));
+ return task;
+ }
+ }
- if (!withMetaStorage) {
- for (Integer grpId : parts.keySet()) {
- if (!cctx.cache().isEncrypted(grpId))
- continue;
+ return task;
+ }
- snpFutTask.onDone(new IgniteCheckedException("Snapshot contains encrypted cache group " + grpId +
- " but doesn't include metastore. Metastore is required because it contains encryption keys " +
- "required to start with encrypted caches contained in the snapshot."));
+ /**
+ * @param task Snapshot operation task to be executed.
+ * @return Snapshot operation task which should be registered on checkpoint to run.
+ */
+ private AbstractSnapshotFutureTask<?> registerTask(String rqId, AbstractSnapshotFutureTask<?> task) {
+ if (!busyLock.enterBusy()) {
+ return new SnapshotFinishedFutureTask(new IgniteCheckedException("Snapshot manager is stopping [locNodeId=" +
+ cctx.localNodeId() + ']'));
+ }
- return snpFutTask;
- }
+ try {
+ if (locSnpTasks.containsKey(rqId)) {
+ return new SnapshotFinishedFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " +
+ rqId));
}
+ AbstractSnapshotFutureTask<?> prev = locSnpTasks.putIfAbsent(rqId, task);
+
+ if (prev != null)
+ return new SnapshotFinishedFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " +
+ rqId));
+
if (log.isInfoEnabled()) {
log.info("Snapshot task has been registered on local node [sctx=" + this +
+ ", task=" + task.getClass().getSimpleName() +
", topVer=" + cctx.discovery().topologyVersionEx() + ']');
}
- snpFutTask.listen(f -> locSnpTasks.remove(snpName));
+ task.listen(f -> locSnpTasks.remove(rqId));
- return snpFutTask;
+ return task;
}
finally {
busyLock.leaveBusy();
@@ -1645,6 +1747,26 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
return locSndrFactory;
}
+ /**
+ * @param factory Factory which produces {@link RemoteSnapshotSender} implementation.
+ */
+ void remoteSnapshotSenderFactory(BiFunction<String, UUID, SnapshotSender> factory) {
+ rmtSndrFactory = factory;
+ }
+
+ /**
+ * @param rqId Request id.
+ * @param nodeId Node id.
+ * @return Snapshot sender related to given node id.
+ */
+ RemoteSnapshotSender remoteSnapshotSenderFactory(String rqId, UUID nodeId) {
+ return new RemoteSnapshotSender(log,
+ snpRunner,
+ databaseRelativePath(pdsSettings.folderName()),
+ cctx.gridIO().openTransmissionSender(nodeId, DFLT_INITIAL_SNAPSHOT_TOPIC),
+ rqId);
+ }
+
/** Snapshot finished successfully or already restored. Key can be removed. */
private void removeLastMetaStorageKey() throws IgniteCheckedException {
cctx.database().checkpointReadLock();
@@ -1700,6 +1822,18 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
}
/**
+ * @param nodeId Remote node id on which requests has been registered.
+ * @return Snapshot future related to given node id.
+ */
+ AbstractSnapshotFutureTask<?> lastScheduledSnapshotResponseRemoteTask(UUID nodeId) {
+ return locSnpTasks.values().stream()
+ .filter(t -> t instanceof SnapshotResponseRemoteFutureTask)
+ .filter(t -> t.sourceNodeId().equals(nodeId))
+ .findFirst()
+ .orElse(null);
+ }
+
+ /**
* @return Relative configured path of persistence data storage directory for the local node.
* Example: {@code snapshotWorkDir/db/IgniteNodeName0}
*/
@@ -1883,6 +2017,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
return new SnapshotHandlerResult<>(hnd.invoke(ctx), null, ctx.localNode());
}
catch (Exception e) {
+ U.error(null, "Error invoking snapshot handler", e);
+
return new SnapshotHandlerResult<>(null, e, ctx.localNode());
}
}
@@ -2115,6 +2251,642 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
}
}
+ /** Remote snapshot future which tracks remote snapshot transmission result. */
+ private static class RemoteSnapshotFilesRecevier extends GridFutureAdapter<Void> {
+ /** Snapshot name to create. */
+ private final String reqId = RMT_SNAPSHOT_PREFIX + U.maskForFileName(UUID.randomUUID().toString());
+
+ /** Ignite snapshot manager. */
+ private final IgniteSnapshotManager snpMgr;
+
+ /** Initial message to send request. */
+ private final SnapshotFilesRequestMessage initMsg;
+
+ /** Remote node id to request snapshot from. */
+ private final UUID rmtNodeId;
+
+ /** Process interrupt checker. */
+ private final BooleanSupplier stopChecker;
+
+ /** Partition handler given by request initiator. */
+ private final BiConsumer<File, Throwable> partHnd;
+
+ /** Temporary working directory for consuming partitions. */
+ private final Path dir;
+
+ /** Counter which show how many partitions left to be received. */
+ private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+ /**
+ * @param snpMgr Ignite snapshot manager.
+ * @param rmtNodeId Remote node to request snapshot from.
+ * @param snpName Snapshot name to request.
+ * @param parts Cache group and partitions to request.
+ * @param stopChecker Process interrupt checker.
+ * @param partHnd Partition handler.
+ */
+ public RemoteSnapshotFilesRecevier(
+ IgniteSnapshotManager snpMgr,
+ UUID rmtNodeId,
+ String snpName,
+ Map<Integer, Set<Integer>> parts,
+ BooleanSupplier stopChecker,
+ BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+ ) {
+ dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+ initMsg = new SnapshotFilesRequestMessage(reqId, snpName, parts);
+
+ this.snpMgr = snpMgr;
+ this.rmtNodeId = rmtNodeId;
+ this.stopChecker = stopChecker;
+ this.partHnd = partHnd;
+ }
+
+ /** Initiate handler by sending request message. */
+ public synchronized void init() {
+ if (isDone())
+ return;
+
+ try {
+ ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+ if (rmtNode == null) {
+ throw new ClusterTopologyCheckedException("Snapshot remote request cannot be performed. " +
+ "Remote node left the grid [rmtNodeId=" + rmtNodeId + ']');
+ }
+
+ snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+ DFLT_INITIAL_SNAPSHOT_TOPIC,
+ initMsg,
+ SYSTEM_POOL,
+ Long.MAX_VALUE,
+ true);
+
+ if (snpMgr.log.isInfoEnabled()) {
+ snpMgr.log.info("Snapshot request is sent to the remote node [rmtNodeId=" + rmtNodeId +
+ ", snpName=" + initMsg.snapshotName() + ", rqId=" + reqId + ']');
+ }
+ }
+ catch (Throwable t) {
+ onDone(t);
+ }
+ }
+
+ /**
+ * @param ex Exception occurred during receiving files.
+ */
+ public synchronized void acceptException(Throwable ex) {
+ if (isDone())
+ return;
+
+ try {
+ partHnd.accept(null, ex);
+ }
+ catch (Throwable t) {
+ ex.addSuppressed(t);
+ }
+
+ onDone(ex);
+ }
+
+ /**
+ * @param part Received file which needs to be handled.
+ */
+ public synchronized void acceptFile(File part) {
+ if (isDone())
+ return;
+
+ if (stopChecker.getAsBoolean())
+ throw new TransmissionCancelledException("Future cancelled prior to the all requested partitions processed.");
+
+ try {
+ partHnd.accept(part, null);
+ }
+ catch (IgniteInterruptedException e) {
+ throw new TransmissionCancelledException(e.getMessage());
+ }
+
+ partsLeft.decrementAndGet();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected synchronized boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
+ U.delete(dir);
+
+ return super.onDone(res, err, cancel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ RemoteSnapshotFilesRecevier future = (RemoteSnapshotFilesRecevier)o;
+
+ return Objects.equals(reqId, future.reqId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return reqId.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(RemoteSnapshotFilesRecevier.class, this);
+ }
+ }
+
+ /**
+ * This manager is responsible for requesting and handling snapshots from a remote node. Each snapshot request
+ * processed asynchronously but strictly one by one.
+ */
+ private class SequentialRemoteSnapshotManager implements TransmissionHandler, GridMessageListener {
+ /** A task currently being executed and must be explicitly finished. */
+ private volatile RemoteSnapshotFilesRecevier active;
+
+ /** Queue of asynchronous tasks to execute. */
+ private final Queue<RemoteSnapshotFilesRecevier> queue = new ConcurrentLinkedDeque<>();
+
+ /** {@code true} if the node is stopping. */
+ private volatile boolean stopping;
+
+ /**
+ * @param next New task for scheduling.
+ */
+ public synchronized void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
+ assert next != null;
+
+ if (stopping) {
+ next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+
+ if (active != null)
+ active.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+
+ RemoteSnapshotFilesRecevier r;
+
+ while ((r = queue.poll()) != null)
+ r.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+
+ return;
+ }
+
+ RemoteSnapshotFilesRecevier curr = active;
+
+ if (curr == null || curr.isDone()) {
+ next.listen(f -> scheduleNext());
+
+ active = next;
+
+ next.init();
+ }
+ else
+ queue.offer(next);
+ }
+
+ /** Schedule next async receiver. */
+ private synchronized void scheduleNext() {
+ RemoteSnapshotFilesRecevier next = queue.poll();
+
+ if (next == null)
+ return;
+
+ submit(next);
+ }
+
+ /** Stopping handler. */
+ public void stop() {
+ stopping = true;
+
+ Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+ GridCompoundFuture<Void, Void> stopFut = new GridCompoundFuture<>();
+
+ try {
+ for (IgniteInternalFuture<Void> fut : futs)
+ stopFut.add(fut);
+
+ stopFut.markInitialized().get();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * @param nodeId A node left the cluster.
+ */
+ public void onNodeLeft(UUID nodeId) {
+ Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+ ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("The node from which a snapshot has been " +
+ "requested left the grid");
+
+ futs.forEach(t -> {
+ if (t.rmtNodeId.equals(nodeId))
+ t.acceptException(ex);
+ });
+ }
+
+ /**
+ * @return The set of currently scheduled tasks, some of them may be already completed.
+ */
+ private Set<RemoteSnapshotFilesRecevier> activeTasks() {
+
+ Set<RemoteSnapshotFilesRecevier> futs = new HashSet<>(queue);
+
+ RemoteSnapshotFilesRecevier active0 = active;
+
+ if (active0 != null)
+ futs.add(active0);
+
+ return futs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ if (msg instanceof SnapshotFilesRequestMessage) {
+ SnapshotFilesRequestMessage reqMsg0 = (SnapshotFilesRequestMessage)msg;
+ String rqId = reqMsg0.requestId();
+ String snpName = reqMsg0.snapshotName();
+
+ try {
+ synchronized (this) {
+ AbstractSnapshotFutureTask<?> task = lastScheduledSnapshotResponseRemoteTask(nodeId);
+
+ if (task != null) {
+ // Task will also be removed from local map due to the listener on future done.
+ task.cancel();
+
+ log.info("Snapshot request has been cancelled due to another request received " +
+ "[prevSnpResp=" + task + ", msg0=" + reqMsg0 + ']');
+ }
+ }
+
+ AbstractSnapshotFutureTask<?> task = registerTask(rqId,
+ new SnapshotResponseRemoteFutureTask(cctx,
+ nodeId,
+ snpName,
+ tmpWorkDir,
+ ioFactory,
+ rmtSndrFactory.apply(rqId, nodeId),
+ reqMsg0.parts()));
+
+ task.listen(f -> {
+ if (f.error() == null)
+ return;
+
+ U.error(log, "Failed to process request of creating a snapshot " +
+ "[from=" + nodeId + ", msg=" + reqMsg0 + ']', f.error());
+
+ try {
+ cctx.gridIO().sendToCustomTopic(nodeId,
+ DFLT_INITIAL_SNAPSHOT_TOPIC,
+ new SnapshotFilesFailureMessage(reqMsg0.requestId(), f.error().getMessage()),
+ SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException ex0) {
+ U.error(log, "Fail to send the response message with processing snapshot request " +
+ "error [request=" + reqMsg0 + ", nodeId=" + nodeId + ']', ex0);
+ }
+ });
+
+ task.start();
+ }
+ catch (Throwable t) {
+ U.error(log, "Error processing snapshot file request message " +
+ "error [request=" + reqMsg0 + ", nodeId=" + nodeId + ']', t);
+
+ cctx.gridIO().sendToCustomTopic(nodeId,
+ DFLT_INITIAL_SNAPSHOT_TOPIC,
+ new SnapshotFilesFailureMessage(reqMsg0.requestId(), t.getMessage()),
+ SYSTEM_POOL);
+ }
+ }
+ else if (msg instanceof SnapshotFilesFailureMessage) {
+ SnapshotFilesFailureMessage respMsg0 = (SnapshotFilesFailureMessage)msg;
+
+ RemoteSnapshotFilesRecevier task = active;
+
+ if (task == null || !task.reqId.equals(respMsg0.requestId())) {
+ if (log.isInfoEnabled()) {
+ log.info("A stale snapshot response message has been received. Will be ignored " +
+ "[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']');
+ }
+
+ return;
+ }
+
+ if (respMsg0.errorMessage() != null) {
+ task.acceptException(new IgniteCheckedException("Request cancelled. The snapshot operation stopped " +
+ "on the remote node with an error: " + respMsg0.errorMessage()));
+ }
+ }
+ }
+ catch (Throwable e) {
+ U.error(log, "Processing snapshot request from remote node fails with an error", e);
+
+ cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onEnd(UUID nodeId) {
+ RemoteSnapshotFilesRecevier task = active;
+
+ if (task == null)
+ return;
+
+ assert task.partsLeft.get() == 0 : task;
+ assert task.rmtNodeId.equals(nodeId);
+
+ if (log.isInfoEnabled()) {
+ log.info("Requested snapshot from remote node has been fully received " +
+ "[rqId=" + task.reqId + ", task=" + task + ']');
+ }
+
+ task.onDone((Void)null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onException(UUID nodeId, Throwable ex) {
+ RemoteSnapshotFilesRecevier task = active;
+
+ if (task == null)
+ return;
+
+ assert task.rmtNodeId.equals(nodeId);
+
+ task.acceptException(ex);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+ Integer partId = (Integer)fileMeta.params().get(SNP_PART_ID_PARAM);
+ String rmtDbNodePath = (String)fileMeta.params().get(SNP_DB_NODE_PATH_PARAM);
+ String cacheDirName = (String)fileMeta.params().get(SNP_CACHE_DIR_NAME_PARAM);
+
+ String rqId = (String)fileMeta.params().get(RQ_ID_NAME_PARAM);
+ Integer partsCnt = (Integer)fileMeta.params().get(SNP_PARTITIONS_CNT);
+
+ RemoteSnapshotFilesRecevier task = active;
+
+ if (task == null || task.isDone() || !task.reqId.equals(rqId)) {
+ throw new TransmissionCancelledException("Stale snapshot transmission will be ignored " +
+ "[rqId=" + rqId + ", meta=" + fileMeta + ", task=" + task + ']');
+ }
+
+ assert task.reqId.equals(rqId) && task.rmtNodeId.equals(nodeId) :
+ "Another transmission in progress [task=" + task + ", nodeId=" + rqId + ']';
+
+ busyLock.enterBusy();
+
+ try {
+ task.partsLeft.compareAndSet(-1, partsCnt);
+
+ File cacheDir = U.resolveWorkDirectory(task.dir.toString(),
+ Paths.get(rmtDbNodePath, cacheDirName).toString(),
+ false);
+
+ return Paths.get(cacheDir.getAbsolutePath(), getPartitionFileName(partId)).toString();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) {
+ throw new UnsupportedOperationException("Loading file by chunks is not supported: " + nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) {
+ Integer grpId = (Integer)initMeta.params().get(SNP_GRP_ID_PARAM);
+ Integer partId = (Integer)initMeta.params().get(SNP_PART_ID_PARAM);
+ String rqId = (String)initMeta.params().get(RQ_ID_NAME_PARAM);
+
+ assert grpId != null;
+ assert partId != null;
+ assert rqId != null;
+
+ RemoteSnapshotFilesRecevier task = active;
+
+ if (task == null || task.isDone() || !task.reqId.equals(rqId)) {
+ throw new TransmissionCancelledException("Stale snapshot transmission will be ignored " +
+ "[rqId=" + rqId + ", meta=" + initMeta + ", task=" + task + ']');
+ }
+
+ return new Consumer<File>() {
+ @Override public void accept(File file) {
+ RemoteSnapshotFilesRecevier task0 = active;
+
+ if (task0 == null || !task0.equals(task) || task0.isDone()) {
+ throw new TransmissionCancelledException("Snapshot request is cancelled [rqId=" + rqId +
+ ", grpId=" + grpId + ", partId=" + partId + ']');
+ }
+
+ busyLock.enterBusy();
+
+ try {
+ if (stopping)
+ throw new IgniteException(SNP_NODE_STOPPING_ERR_MSG);
+
+ task0.acceptFile(file);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ };
+ }
+ }
+
+ /**
+ * Such an executor can executes tasks not in a single thread, but executes them
+ * on different threads sequentially. It's important for some {@link SnapshotSender}'s
+ * to process sub-task sequentially due to all these sub-tasks may share a single socket
+ * channel to send data to.
+ */
+ private static class SequentialExecutorWrapper implements Executor {
+ /** Ignite logger. */
+ private final IgniteLogger log;
+
+ /** Queue of task to execute. */
+ private final Queue<Runnable> tasks = new ArrayDeque<>();
+
+ /** Delegate executor. */
+ private final Executor executor;
+
+ /** Currently running task. */
+ private volatile Runnable active;
+
+ /** If wrapped executor is shutting down. */
+ private volatile boolean stopping;
+
+ /**
+ * @param executor Executor to run tasks on.
+ */
+ public SequentialExecutorWrapper(IgniteLogger log, Executor executor) {
+ this.log = log.getLogger(SequentialExecutorWrapper.class);
+ this.executor = executor;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void execute(final Runnable r) {
+ assert !stopping : "Task must be cancelled prior to the wrapped executor is shutting down.";
+
+ tasks.offer(() -> {
+ try {
+ r.run();
+ }
+ finally {
+ scheduleNext();
+ }
+ });
+
+ if (active == null)
+ scheduleNext();
+ }
+
+ /** */
+ private synchronized void scheduleNext() {
+ if ((active = tasks.poll()) != null) {
+ try {
+ executor.execute(active);
+ }
+ catch (RejectedExecutionException e) {
+ tasks.clear();
+
+ stopping = true;
+
+ log.warning("Task is outdated. Wrapped executor is shutting down.", e);
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RemoteSnapshotSender extends SnapshotSender {
+ /** The sender which sends files to remote node. */
+ private final GridIoManager.TransmissionSender sndr;
+
+ /** Snapshot name. */
+ private final String rqId;
+
+ /** Local node persistent directory with consistent id. */
+ private final String relativeNodePath;
+
+ /** The number of cache partition files expected to be processed. */
+ private int partsCnt;
+
+ /**
+ * @param log Ignite logger.
+ * @param sndr File sender instance.
+ * @param rqId Snapshot name.
+ */
+ public RemoteSnapshotSender(
+ IgniteLogger log,
+ Executor exec,
+ String relativeNodePath,
+ GridIoManager.TransmissionSender sndr,
+ String rqId
+ ) {
+ super(log, new SequentialExecutorWrapper(log, exec));
+
+ this.sndr = sndr;
+ this.rqId = rqId;
+ this.relativeNodePath = relativeNodePath;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void init(int partsCnt) {
+ this.partsCnt = partsCnt;
+
+ if (F.isEmpty(relativeNodePath))
+ throw new IgniteException("Relative node path cannot be empty.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long len) {
+ try {
+ assert part.exists();
+ assert len > 0 : "Requested partitions has incorrect file length " +
+ "[pair=" + pair + ", cacheDirName=" + cacheDirName + ']';
+
+ sndr.send(part, 0, len, transmissionParams(rqId, cacheDirName, pair), TransmissionPolicy.FILE);
+
+ if (log.isInfoEnabled()) {
+ log.info("Partition file has been send [part=" + part.getName() + ", pair=" + pair +
+ ", length=" + len + ']');
+ }
+ }
+ catch (TransmissionCancelledException e) {
+ if (log.isInfoEnabled()) {
+ log.info("Transmission partition file has been interrupted [part=" + part.getName() +
+ ", pair=" + pair + ']');
+ }
+ }
+ catch (IgniteCheckedException | InterruptedException | IOException e) {
+ U.error(log, "Error sending partition file [part=" + part.getName() + ", pair=" + pair +
+ ", length=" + len + ']', e);
+
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair) {
+ throw new UnsupportedOperationException("Sending files by chunks of data is not supported: " + delta.getAbsolutePath());
+ }
+
+ /**
+ * @param cacheDirName Cache directory name.
+ * @param pair Cache group id with corresponding partition id.
+ * @return Map of params.
+ */
+ private Map<String, Serializable> transmissionParams(String rqId, String cacheDirName,
+ GroupPartitionId pair) {
+ Map<String, Serializable> params = new HashMap<>();
+
+ params.put(SNP_GRP_ID_PARAM, pair.getGroupId());
+ params.put(SNP_PART_ID_PARAM, pair.getPartitionId());
+ params.put(SNP_DB_NODE_PATH_PARAM, relativeNodePath);
+ params.put(SNP_CACHE_DIR_NAME_PARAM, cacheDirName);
+ params.put(RQ_ID_NAME_PARAM, rqId);
+ params.put(SNP_PARTITIONS_CNT, partsCnt);
+
+ return params;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close0(@Nullable Throwable th) {
+ U.closeQuiet(sndr);
+
+ if (th == null) {
+ if (log.isInfoEnabled())
+ log.info("The remote snapshot sender closed normally [snpName=" + rqId + ']');
+ }
+ else {
+ U.warn(log, "The remote snapshot sender closed due to an error occurred while processing " +
+ "snapshot operation [snpName=" + rqId + ']', th);
+ }
+ }
+ }
+
/**
* Snapshot sender which writes all data to local directory.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java
index bcaea42..ac42ccd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
/**
* Compound snapshot verification exception from the nodes where the verification process executed.
@@ -36,6 +37,8 @@ public class IgniteSnapshotVerifyException extends IgniteException {
* @param map Map of received exceptions.
*/
public IgniteSnapshotVerifyException(Map<ClusterNode, ? extends Exception> map) {
+ super(F.first(map.values()));
+
exs.putAll(map);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesFailureMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesFailureMessage.java
new file mode 100644
index 0000000..3cb9056
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesFailureMessage.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Message indicating a failure occurred during processing snapshot files request.
+ */
+public class SnapshotFilesFailureMessage extends AbstractSnapshotMessage {
+ /** Snapshot response message type (value is {@code 179}). */
+ public static final short TYPE_CODE = 179;
+
+ /** Serialization version. */
+ private static final long serialVersionUID = 0L;
+
+ /** Exception message which is occurred during snapshot request processing. */
+ private String errMsg;
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public SnapshotFilesFailureMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param reqId Request id to which response related to.
+ * @param errMsg Response error message.
+ */
+ public SnapshotFilesFailureMessage(String reqId, String errMsg) {
+ super(reqId);
+
+ this.errMsg = errMsg;
+ }
+
+ /**
+ * @return Response error message.
+ */
+ public String errorMessage() {
+ return errMsg;
+ }
+
+ /**
+ * @param errMsg Response error message.
+ * @return {@code this} for chaining.
+ */
+ public SnapshotFilesFailureMessage errorMessage(String errMsg) {
+ this.errMsg = errMsg;
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ if (writer.state() == 1) {
+ if (!writer.writeString("errMsg", errMsg))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ if (reader.state() == 1) {
+ errMsg = reader.readString("errMsg");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ return reader.afterMessageRead(SnapshotFilesFailureMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SnapshotFilesFailureMessage.class, this, super.toString());
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
new file mode 100644
index 0000000..65d1345
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class SnapshotFilesRequestMessage extends AbstractSnapshotMessage {
+ /** Snapshot request message type (value is {@code 178}). */
+ public static final short TYPE_CODE = 178;
+
+ /** Serialization version. */
+ private static final long serialVersionUID = 0L;
+
+ /** Snapshot name to request. */
+ private String snpName;
+
+ /** Map of cache group ids and corresponding set of its partition ids. */
+ @GridDirectMap(keyType = Integer.class, valueType = int[].class)
+ private Map<Integer, int[]> parts;
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public SnapshotFilesRequestMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param reqId Unique request id.
+ * @param snpName Snapshot name.
+ * @param parts Map of cache group ids and corresponding set of its partition ids to be snapshot.
+ */
+ public SnapshotFilesRequestMessage(String reqId, String snpName, Map<Integer, Set<Integer>> parts) {
+ super(reqId);
+
+ assert parts != null && !parts.isEmpty();
+
+ this.snpName = snpName;
+ this.parts = new HashMap<>();
+
+ for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
+ this.parts.put(e.getKey(), U.toIntArray(e.getValue()));
+ }
+
+ /**
+ * @return The demanded cache group partitions per each cache group.
+ */
+ public Map<Integer, Set<Integer>> parts() {
+ Map<Integer, Set<Integer>> res = new HashMap<>();
+
+ for (Map.Entry<Integer, int[]> e : parts.entrySet()) {
+ res.put(e.getKey(), e.getValue().length == 0 ? null : Arrays.stream(e.getValue())
+ .boxed()
+ .collect(Collectors.toSet()));
+ }
+
+ return res;
+ }
+
+ /**
+ * @return Requested snapshot name.
+ */
+ public String snapshotName() {
+ return snpName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ if (writer.state() == 1) {
+ if (!writer.writeString("snpName", snpName))
+ return false;
+
+ writer.incrementState();
+ }
+
+ if (writer.state() == 2) {
+ if (!writer.writeMap("parts", parts, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ if (reader.state() == 1) {
+ snpName = reader.readString("snpName");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ if (reader.state() == 2) {
+ parts = reader.readMap("parts", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ return reader.afterMessageRead(SnapshotFilesRequestMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SnapshotFilesRequestMessage.class, this, super.toString());
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFinishedFutureTask.java
similarity index 53%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFinishedFutureTask.java
index bcaea42..561bb16 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFinishedFutureTask.java
@@ -17,32 +17,26 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterNode;
-
-/**
- * Compound snapshot verification exception from the nodes where the verification process executed.
- */
-public class IgniteSnapshotVerifyException extends IgniteException {
- /** Serial version UID. */
- private static final long serialVersionUID = 0L;
-
- /** Map of received exceptions. */
- private final Map<ClusterNode, Exception> exs = new HashMap<>();
+import org.apache.ignite.IgniteCheckedException;
+/** */
+public class SnapshotFinishedFutureTask extends AbstractSnapshotFutureTask<Void> {
/**
- * @param map Map of received exceptions.
+ * @param e Finished snapshot task future with particular exception.
*/
- public IgniteSnapshotVerifyException(Map<ClusterNode, ? extends Exception> map) {
- exs.putAll(map);
+ public SnapshotFinishedFutureTask(IgniteCheckedException e) {
+ super(null, null, null, null, null, null, null);
+
+ onDone(e);
}
- /**
- * @return Map of received exceptions.
- */
- public Map<ClusterNode, Exception> exceptions() {
- return exs;
+ /** {@inheritDoc} */
+ @Override public boolean start() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void acceptException(Throwable th) {
+ onDone(th);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 2fefe35..97f7d72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -38,7 +38,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerArray;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
@@ -47,7 +46,6 @@ import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
@@ -87,33 +85,20 @@ import static org.apache.ignite.internal.processors.cache.persistence.snapshot.I
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.partDeltaFile;
/**
- *
+ * The requested map of cache groups and its partitions to include into snapshot represented as <tt>Map<Integer, Set<Integer>></tt>.
+ * If array of partitions is {@code null} than all OWNING partitions for given cache groups will be included into snapshot.
+ * In this case if all partitions have OWNING state the index partition also will be included.
+ * <p>
+ * If partitions for particular cache group are not provided that they will be collected and added
+ * on checkpoint under the write-lock.
*/
-class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implements CheckpointListener {
- /** Shared context. */
- private final GridCacheSharedContext<?, ?> cctx;
-
+class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId>> implements CheckpointListener {
/** File page store manager for accessing cache group associated files. */
private final FilePageStoreManager pageStore;
- /** Ignite logger. */
- private final IgniteLogger log;
-
- /** Node id which cause snapshot operation. */
- private final UUID srcNodeId;
-
- /** Unique identifier of snapshot process. */
- private final String snpName;
-
- /** Snapshot working directory on file system. */
- private final File tmpSnpWorkDir;
-
/** Local buffer to perform copy-on-write operations for {@link PageStoreSerialWriter}. */
private final ThreadLocal<ByteBuffer> locBuff;
- /** IO factory which will be used for creating snapshot delta-writers. */
- private final FileIOFactory ioFactory;
-
/**
* The length of file size per each cache partition file.
* Partition has value greater than zero only for partitions in OWNING state.
@@ -135,20 +120,6 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
*/
private final List<CacheConfigurationSender> ccfgSndrs = new CopyOnWriteArrayList<>();
- /** Snapshot data sender. */
- @GridToStringExclude
- private final SnapshotSender snpSndr;
-
- /**
- * Requested map of cache groups and its partitions to include into snapshot. If array of partitions
- * is {@code null} than all OWNING partitions for given cache groups will be included into snapshot.
- * In this case if all of partitions have OWNING state the index partition also will be included.
- * <p>
- * If partitions for particular cache group are not provided that they will be collected and added
- * on checkpoint under the write lock.
- */
- private final Map<Integer, Set<Integer>> parts;
-
/** {@code true} if all metastorage data must be also included into snapshot. */
private final boolean withMetaStorage;
@@ -167,38 +138,16 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
/** Future which will be completed when task requested to be closed. Will be executed on system pool. */
private volatile CompletableFuture<Void> closeFut;
- /** An exception which has been occurred during snapshot processing. */
- private final AtomicReference<Throwable> err = new AtomicReference<>();
-
/** Flag indicates that task already scheduled on checkpoint. */
private final AtomicBoolean started = new AtomicBoolean();
/**
- * @param e Finished snapshot task future with particular exception.
- */
- public SnapshotFutureTask(IgniteCheckedException e) {
- assert e != null : "Exception for a finished snapshot task must be not null";
-
- cctx = null;
- pageStore = null;
- log = null;
- snpName = null;
- srcNodeId = null;
- tmpSnpWorkDir = null;
- snpSndr = null;
-
- err.set(e);
- startedFut.onDone(e);
- onDone(e);
- parts = null;
- withMetaStorage = false;
- ioFactory = null;
- locBuff = null;
- }
-
- /**
- * @param snpName Unique identifier of snapshot task.
- * @param ioFactory Factory to working with delta as file storage.
+ * @param cctx Shared context.
+ * @param srcNodeId Node id which cause snapshot task creation.
+ * @param snpName Unique identifier of snapshot process.
+ * @param tmpWorkDir Working directory for intermediate snapshot results.
+ * @param ioFactory Factory to working with snapshot files.
+ * @param snpSndr Factory which produces snapshot receiver instance.
* @param parts Map of cache groups and its partitions to include into snapshot, if set of partitions
* is {@code null} than all OWNING partitions for given cache groups will be included into snapshot.
*/
@@ -213,47 +162,20 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
boolean withMetaStorage,
ThreadLocal<ByteBuffer> locBuff
) {
+ super(cctx, srcNodeId, snpName, tmpWorkDir, ioFactory, snpSndr, parts);
+
assert snpName != null : "Snapshot name cannot be empty or null.";
assert snpSndr != null : "Snapshot sender which handles execution tasks must be not null.";
assert snpSndr.executor() != null : "Executor service must be not null.";
assert cctx.pageStore() instanceof FilePageStoreManager : "Snapshot task can work only with physical files.";
assert !parts.containsKey(MetaStorage.METASTORAGE_CACHE_ID) : "The withMetaStorage must be used instead.";
- this.parts = parts;
this.withMetaStorage = withMetaStorage;
- this.cctx = cctx;
this.pageStore = (FilePageStoreManager)cctx.pageStore();
- this.log = cctx.logger(SnapshotFutureTask.class);
- this.snpName = snpName;
- this.srcNodeId = srcNodeId;
- this.tmpSnpWorkDir = new File(tmpWorkDir, snpName);
- this.snpSndr = snpSndr;
- this.ioFactory = ioFactory;
this.locBuff = locBuff;
}
/**
- * @return Snapshot name.
- */
- public String snapshotName() {
- return snpName;
- }
-
- /**
- * @return Node id which triggers this operation.
- */
- public UUID sourceNodeId() {
- return srcNodeId;
- }
-
- /**
- * @return Type of snapshot operation.
- */
- public Class<? extends SnapshotSender> type() {
- return snpSndr.getClass();
- }
-
- /**
* @return Set of cache groups included into snapshot operation.
*/
public Set<Integer> affectedCacheGroups() {
@@ -263,7 +185,7 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
/**
* @param th An exception which occurred during snapshot processing.
*/
- public void acceptException(Throwable th) {
+ @Override public void acceptException(Throwable th) {
if (th == null)
return;
@@ -323,7 +245,7 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
*
* @return {@code true} if task started by this call.
*/
- public boolean start() {
+ @Override public boolean start() {
if (stopping())
return false;
@@ -676,8 +598,7 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
/** {@inheritDoc} */
@Override public boolean cancel() {
- acceptException(new IgniteFutureCancelledCheckedException("Snapshot operation has been cancelled " +
- "by external process [snpName=" + snpName + ']'));
+ super.cancel();
try {
closeAsync().get();
@@ -725,7 +646,7 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(SnapshotFutureTask.class, this);
+ return S.toString(SnapshotFutureTask.class, this, super.toString());
}
/** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
index f5fb6cba..20cb5d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
@@ -17,7 +17,10 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+import java.io.IOException;
+import java.io.InvalidObjectException;
import java.io.Serializable;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -28,6 +31,7 @@ import java.util.UUID;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Snapshot metadata file.
@@ -67,7 +71,7 @@ public class SnapshotMetadata implements Serializable {
* since for instance, due to the node filter there is no cache data on node.
*/
@GridToStringInclude
- private final Map<Integer, Set<Integer>> locParts = new HashMap<>();
+ private transient Map<Integer, Set<Integer>> locParts = new HashMap<>();
/**
* @param rqId Unique snapshot request id.
@@ -155,7 +159,56 @@ public class SnapshotMetadata implements Serializable {
* saved on the local node because some of them may be skipped due to cache node filter).
*/
public Map<Integer, Set<Integer>> partitions() {
- return locParts;
+ return Collections.unmodifiableMap(locParts);
+ }
+
+ /** Save the state of this <tt>HashMap</tt> partitions and cache groups to a stream. */
+ private void writeObject(java.io.ObjectOutputStream s)
+ throws java.io.IOException {
+ // Write out any hidden serialization.
+ s.defaultWriteObject();
+
+ // Write out size of map.
+ s.writeInt(locParts.size());
+
+ // Write out all elements in the proper order.
+ for (Map.Entry<Integer, Set<Integer>> e : locParts.entrySet()) {
+ s.writeInt(e.getKey());
+ s.writeInt(e.getValue().size());
+
+ for (Integer partId : e.getValue())
+ s.writeInt(partId);
+ }
+ }
+
+ /** Reconstitute the <tt>HashMap</tt> instance of partitions and cache groups from a stream. */
+ private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException {
+ // Read in any hidden serialization.
+ s.defaultReadObject();
+
+ // Read size and verify non-negative.
+ int size = s.readInt();
+
+ if (size < 0)
+ throw new InvalidObjectException("Illegal size: " + size);
+
+ locParts = U.newHashMap(size);
+
+ // Read in all elements in the proper order.
+ for (int i = 0; i < size; i++) {
+ int grpId = s.readInt();
+ int total = s.readInt();
+
+ if (total < 0)
+ throw new InvalidObjectException("Illegal size: " + total);
+
+ Set<Integer> parts = U.newHashSet(total);
+
+ for (int k = 0; k < total; k++)
+ parts.add(s.readInt());
+
+ locParts.put(grpId, parts);
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
index a227f66..6b4fba4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -103,7 +104,8 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part
if (!grps.remove(grpId))
continue;
- Set<Integer> parts = new HashSet<>(meta.partitions().get(grpId));
+ Set<Integer> parts = meta.partitions().get(grpId) == null ? Collections.emptySet() :
+ new HashSet<>(meta.partitions().get(grpId));
for (File part : cachePartitionFiles(dir)) {
int partId = partId(part.getName());
@@ -213,6 +215,11 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part
}
);
}
+ catch (Throwable t) {
+ log.error("Error executing handler: ", t);
+
+ throw t;
+ }
finally {
for (GridComponent comp : snpCtx)
comp.stop(true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
new file mode 100644
index 0000000..4894032
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirectory;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
+
+/** */
+public class SnapshotResponseRemoteFutureTask extends AbstractSnapshotFutureTask<Void> {
+ /**
+ * @param cctx Shared context.
+ * @param srcNodeId Node id which cause snapshot task creation.
+ * @param snpName Unique identifier of snapshot process.
+ * @param tmpWorkDir Working directory for intermediate snapshot results.
+ * @param ioFactory Factory to working with snapshot files.
+ * @param snpSndr Factory which produces snapshot receiver instance.
+ */
+ public SnapshotResponseRemoteFutureTask(
+ GridCacheSharedContext<?, ?> cctx,
+ UUID srcNodeId,
+ String snpName,
+ File tmpWorkDir,
+ FileIOFactory ioFactory,
+ SnapshotSender snpSndr,
+ Map<Integer, Set<Integer>> parts
+ ) {
+ super(cctx, srcNodeId, snpName, tmpWorkDir, ioFactory, snpSndr, parts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean start() {
+ if (F.isEmpty(parts))
+ return false;
+
+ try {
+ List<GroupPartitionId> handled = new ArrayList<>();
+
+ for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
+ ofNullable(e.getValue()).orElse(Collections.emptySet())
+ .forEach(p -> handled.add(new GroupPartitionId(e.getKey(), p)));
+ }
+
+ snpSndr.init(handled.size());
+
+ File snpDir = cctx.snapshotMgr().snapshotLocalDir(snpName);
+
+ List<CompletableFuture<Void>> futs = new ArrayList<>();
+ List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName);
+
+ for (SnapshotMetadata meta : metas) {
+ Map<Integer, Set<Integer>> parts0 = meta.partitions();
+
+ if (F.isEmpty(parts0))
+ continue;
+
+ handled.removeIf(gp -> {
+ if (ofNullable(parts0.get(gp.getGroupId()))
+ .orElse(Collections.emptySet())
+ .contains(gp.getPartitionId())
+ ) {
+ futs.add(CompletableFuture.runAsync(() -> {
+ if (err.get() != null)
+ return;
+
+ File cacheDir = cacheDirectory(new File(snpDir, databaseRelativePath(meta.folderName())),
+ gp.getGroupId());
+
+ if (cacheDir == null) {
+ throw new IgniteException("Cache directory not found [snpName=" + snpName + ", meta=" + meta +
+ ", pair=" + gp + ']');
+ }
+
+ File snpPart = getPartitionFile(cacheDir.getParentFile(), cacheDir.getName(), gp.getPartitionId());
+
+ if (!snpPart.exists()) {
+ throw new IgniteException("Snapshot partition file not found [cacheDir=" + cacheDir +
+ ", pair=" + gp + ']');
+ }
+
+ snpSndr.sendPart(snpPart, cacheDir.getName(), gp, snpPart.length());
+ },
+ snpSndr.executor())
+ .whenComplete((r, t) -> err.compareAndSet(null, t)));
+
+ return true;
+ }
+
+ return false;
+ });
+ }
+
+ if (!handled.isEmpty()) {
+ err.compareAndSet(null, new IgniteException("Snapshot partitions missed on local node [snpName=" + snpName +
+ ", missed=" + handled + ']'));
+ }
+
+ int size = futs.size();
+
+ CompletableFuture.allOf(futs.toArray(new CompletableFuture[size]))
+ .whenComplete((r, t) -> {
+ Throwable th = ofNullable(err.get()).orElse(t);
+
+ if (th == null && log.isInfoEnabled()) {
+ log.info("Snapshot partitions have been sent to the remote node [snpName=" + snpName +
+ ", rmtNodeId=" + srcNodeId + ']');
+ }
+
+ close(th);
+ });
+
+ return true;
+ }
+ catch (Throwable t) {
+ if (err.compareAndSet(null, t))
+ close(t);
+
+ return false;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void acceptException(Throwable th) {
+ if (err.compareAndSet(null, th))
+ close(th);
+ }
+
+ /**
+ * @param th Additional close exception if occurred.
+ */
+ private void close(@Nullable Throwable th) {
+ if (th == null) {
+ snpSndr.close(null);
+ onDone((Void)null);
+ }
+ else {
+ snpSndr.close(th);
+ onDone(th);
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 99a730a..c796299 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -18,7 +18,10 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
@@ -26,30 +29,38 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
+import java.util.function.IntFunction;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
@@ -62,16 +73,23 @@ import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
+import static java.util.Optional.ofNullable;
import static org.apache.ignite.internal.IgniteFeatures.SNAPSHOT_RESTORE_CACHE_GROUP;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
@@ -96,7 +114,10 @@ public class SnapshotRestoreProcess {
private final GridKernalContext ctx;
/** Cache group restore prepare phase. */
- private final DistributedProcess<SnapshotOperationRequest, ArrayList<StoredCacheData>> prepareRestoreProc;
+ private final DistributedProcess<SnapshotOperationRequest, SnapshotRestoreOperationResponse> prepareRestoreProc;
+
+ /** Cache group restore preload partitions phase. */
+ private final DistributedProcess<UUID, Boolean> preloadProc;
/** Cache group restore cache start phase. */
private final DistributedProcess<UUID, Boolean> cacheStartProc;
@@ -124,6 +145,9 @@ public class SnapshotRestoreProcess {
prepareRestoreProc = new DistributedProcess<>(
ctx, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, this::prepare, this::finishPrepare);
+ preloadProc = new DistributedProcess<>(
+ ctx, RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD, this::preload, this::finishPreload);
+
cacheStartProc = new DistributedProcess<>(
ctx, RESTORE_CACHE_GROUP_SNAPSHOT_START, this::cacheStart, this::finishCacheStart);
@@ -248,19 +272,16 @@ public class SnapshotRestoreProcess {
cacheGrpNames.stream().collect(Collectors.toMap(CU::cacheId, v -> v));
for (Map.Entry<ClusterNode, List<SnapshotMetadata>> entry : metas.entrySet()) {
- SnapshotMetadata meta = F.first(entry.getValue());
-
- assert meta != null : entry.getKey().id();
-
- if (!entry.getKey().consistentId().toString().equals(meta.consistentId()))
- continue;
+ dataNodes.add(entry.getKey().id());
- if (snpBltNodes == null)
- snpBltNodes = new HashSet<>(meta.baselineNodes());
+ for (SnapshotMetadata meta : entry.getValue()) {
+ assert meta != null : entry.getKey().id();
- dataNodes.add(entry.getKey().id());
+ if (snpBltNodes == null)
+ snpBltNodes = new HashSet<>(meta.baselineNodes());
- reqGrpIds.keySet().removeAll(meta.partitions().keySet());
+ reqGrpIds.keySet().removeAll(meta.partitions().keySet());
+ }
}
if (snpBltNodes == null) {
@@ -277,20 +298,10 @@ public class SnapshotRestoreProcess {
return;
}
- Collection<String> bltNodes = F.viewReadOnly(ctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
- node -> node.consistentId().toString(), (node) -> CU.baselineNode(node, ctx.state().clusterState()));
-
- snpBltNodes.removeAll(bltNodes);
-
- if (!snpBltNodes.isEmpty()) {
- finishProcess(fut0.rqId, new IgniteIllegalStateException(OP_REJECT_MSG + "Some nodes required to " +
- "restore a cache group are missing [nodeId(s)=" + snpBltNodes + ", snapshot=" + snpName + ']'));
-
- return;
- }
+ Collection<UUID> bltNodes = F.viewReadOnly(ctx.discovery().discoCache().aliveBaselineNodes(), F.node2id());
SnapshotOperationRequest req =
- new SnapshotOperationRequest(fut0.rqId, F.first(dataNodes), snpName, cacheGrpNames, dataNodes);
+ new SnapshotOperationRequest(fut0.rqId, F.first(dataNodes), snpName, cacheGrpNames, new HashSet<>(bltNodes));
prepareRestoreProc.start(req.requestId(), req);
});
@@ -315,28 +326,36 @@ public class SnapshotRestoreProcess {
}
/**
- * Check if the cache or group with the specified name is currently being restored from the snapshot.
- *
- * @param cacheName Cache name.
- * @param grpName Cache group name.
+ * @param ccfg Cache configuration.
* @return {@code True} if the cache or group with the specified name is currently being restored.
*/
- public boolean isRestoring(String cacheName, @Nullable String grpName) {
- assert cacheName != null;
+ public boolean isRestoring(CacheConfiguration<?, ?> ccfg) {
+ return isRestoring(ccfg, opCtx);
+ }
- SnapshotRestoreContext opCtx0 = opCtx;
+ /**
+ * Check if the cache or group with the specified name is currently being restored from the snapshot.
+ * @param opCtx Restoring context.
+ * @param ccfg Cache configuration.
+ * @return {@code True} if the cache or group with the specified name is currently being restored.
+ */
+ private boolean isRestoring(CacheConfiguration<?, ?> ccfg, @Nullable SnapshotRestoreContext opCtx) {
+ assert ccfg != null;
- if (opCtx0 == null)
+ if (opCtx == null)
return false;
- Map<Integer, StoredCacheData> cacheCfgs = opCtx0.cfgs;
+ Map<Integer, StoredCacheData> cacheCfgs = opCtx.cfgs;
+
+ String cacheName = ccfg.getName();
+ String grpName = ccfg.getGroupName();
int cacheId = CU.cacheId(cacheName);
if (cacheCfgs.containsKey(cacheId))
return true;
- for (File grpDir : opCtx0.dirs) {
+ for (File grpDir : opCtx.dirs) {
String locGrpName = FilePageStoreManager.cacheGroupName(grpDir);
if (grpName != null) {
@@ -364,7 +383,7 @@ public class SnapshotRestoreProcess {
if (opCtx0 == null || !reqId.globalId().equals(opCtx0.reqId))
return Collections.emptySet();
- return Collections.unmodifiableSet(opCtx0.nodes);
+ return new HashSet<>(opCtx0.nodes());
}
/**
@@ -412,7 +431,7 @@ public class SnapshotRestoreProcess {
public void onNodeLeft(UUID leftNodeId) {
SnapshotRestoreContext opCtx0 = opCtx;
- if (opCtx0 != null && opCtx0.nodes.contains(leftNodeId)) {
+ if (opCtx0 != null && opCtx0.nodes().contains(leftNodeId)) {
opCtx0.err.compareAndSet(null, new ClusterTopologyCheckedException(OP_REJECT_MSG +
"Required node has left the cluster [nodeId=" + leftNodeId + ']'));
}
@@ -498,17 +517,18 @@ public class SnapshotRestoreProcess {
* @param req Request to prepare cache group restore from the snapshot.
* @return Result future.
*/
- private IgniteInternalFuture<ArrayList<StoredCacheData>> prepare(SnapshotOperationRequest req) {
+ private IgniteInternalFuture<SnapshotRestoreOperationResponse> prepare(SnapshotOperationRequest req) {
if (ctx.clientNode())
return new GridFinishedFuture<>();
try {
DiscoveryDataClusterState state = ctx.state().clusterState();
+ IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
if (state.state() != ClusterState.ACTIVE || state.transition())
throw new IgniteCheckedException(OP_REJECT_MSG + "The cluster should be active.");
- if (ctx.cache().context().snapshotMgr().isSnapshotCreating())
+ if (snpMgr.isSnapshotCreating())
throw new IgniteCheckedException(OP_REJECT_MSG + "A cluster snapshot operation is in progress.");
if (ctx.encryption().isMasterKeyChangeInProgress()) {
@@ -530,6 +550,13 @@ public class SnapshotRestoreProcess {
}
}
+ if (log.isInfoEnabled()) {
+ log.info("Starting local snapshot prepare restore operation" +
+ " [reqId=" + req.requestId() +
+ ", snapshot=" + req.snapshotName() +
+ ", caches=" + req.groups() + ']');
+ }
+
SnapshotRestoreContext opCtx0 = prepareContext(req);
synchronized (this) {
@@ -537,13 +564,10 @@ public class SnapshotRestoreProcess {
ClusterSnapshotFuture fut0 = fut;
- if (fut0 != null && fut0.interruptEx != null)
- opCtx0.err.compareAndSet(null, fut0.interruptEx);
+ if (fut0 != null)
+ opCtx0.errHnd.accept(fut0.interruptEx);
}
- if (opCtx0.dirs.isEmpty())
- return new GridFinishedFuture<>();
-
// Ensure that shared cache groups has no conflicts.
for (StoredCacheData cfg : opCtx0.cfgs.values()) {
ensureCacheAbsent(cfg.config().getName());
@@ -552,46 +576,11 @@ public class SnapshotRestoreProcess {
ensureCacheAbsent(cfg.config().getGroupName());
}
- if (log.isInfoEnabled()) {
- log.info("Starting local snapshot restore operation" +
- " [reqId=" + req.requestId() +
- ", snapshot=" + req.snapshotName() +
- ", cache(s)=" + F.viewReadOnly(opCtx0.cfgs.values(), data -> data.config().getName()) + ']');
- }
-
- Consumer<Throwable> errHnd = (ex) -> opCtx.err.compareAndSet(null, ex);
- BooleanSupplier stopChecker = () -> opCtx.err.get() != null;
- GridFutureAdapter<ArrayList<StoredCacheData>> retFut = new GridFutureAdapter<>();
-
if (ctx.isStopping())
- throw new NodeStoppingException("Node is stopping.");
-
- opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+ throw new NodeStoppingException("The node is stopping: " + ctx.localNodeId());
- restoreAsync(opCtx0.snpName, opCtx0.dirs, ctx.localNodeId().equals(req.operationalNodeId()), stopChecker, errHnd)
- .thenAccept(res -> {
- try {
- Throwable err = opCtx.err.get();
-
- if (err != null)
- throw err;
-
- for (File src : opCtx0.dirs)
- Files.move(formatTmpDirName(src).toPath(), src.toPath(), StandardCopyOption.ATOMIC_MOVE);
- }
- catch (Throwable t) {
- log.error("Unable to restore cache group(s) from the snapshot " +
- "[reqId=" + opCtx.reqId + ", snapshot=" + opCtx.snpName + ']', t);
-
- retFut.onDone(t);
-
- return;
- }
-
- retFut.onDone(new ArrayList<>(opCtx.cfgs.values()));
- });
-
- return retFut;
+ return new GridFinishedFuture<>(new SnapshotRestoreOperationResponse(opCtx0.cfgs.values(),
+ opCtx0.metasPerNode.get(ctx.localNodeId())));
}
catch (IgniteIllegalStateException | IgniteCheckedException | RejectedExecutionException e) {
log.error("Unable to restore cache group(s) from the snapshot " +
@@ -605,85 +594,11 @@ public class SnapshotRestoreProcess {
* @param cacheDir Cache directory.
* @return Temporary directory.
*/
- private File formatTmpDirName(File cacheDir) {
+ private static File formatTmpDirName(File cacheDir) {
return new File(cacheDir.getParent(), TMP_CACHE_DIR_PREFIX + cacheDir.getName());
}
/**
- * Copy partition files and update binary metadata.
- *
- * @param snpName Snapshot name.
- * @param dirs Cache directories to restore from the snapshot.
- * @param updateMeta Update binary metadata flag.
- * @param stopChecker Process interrupt checker.
- * @param errHnd Error handler.
- * @throws IgniteCheckedException If failed.
- */
- private CompletableFuture<Void> restoreAsync(
- String snpName,
- Collection<File> dirs,
- boolean updateMeta,
- BooleanSupplier stopChecker,
- Consumer<Throwable> errHnd
- ) throws IgniteCheckedException {
- IgniteSnapshotManager snapshotMgr = ctx.cache().context().snapshotMgr();
- String pdsFolderName = ctx.pdsFolderResolver().resolveFolders().folderName();
-
- List<CompletableFuture<Void>> futs = new ArrayList<>();
-
- if (updateMeta) {
- File binDir = binaryWorkDir(snapshotMgr.snapshotLocalDir(snpName).getAbsolutePath(), pdsFolderName);
-
- futs.add(CompletableFuture.runAsync(() -> {
- try {
- ctx.cacheObjects().updateMetadata(binDir, stopChecker);
- }
- catch (Throwable t) {
- errHnd.accept(t);
- }
- }, snapshotMgr.snapshotExecutorService()));
- }
-
- for (File cacheDir : dirs) {
- File tmpCacheDir = formatTmpDirName(cacheDir);
- File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(snpName),
- Paths.get(databaseRelativePath(pdsFolderName), cacheDir.getName()).toString());
-
- assert snpCacheDir.exists() : "node=" + ctx.localNodeId() + ", dir=" + snpCacheDir;
-
- for (File snpFile : snpCacheDir.listFiles()) {
- futs.add(CompletableFuture.runAsync(() -> {
- if (stopChecker.getAsBoolean())
- return;
-
- try {
- if (Thread.interrupted())
- throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
-
- File target = new File(tmpCacheDir, snpFile.getName());
-
- if (log.isDebugEnabled()) {
- log.debug("Copying file from the snapshot " +
- "[snapshot=" + snpName +
- ", src=" + snpFile +
- ", target=" + target + "]");
- }
-
- IgniteSnapshotManager.copy(snapshotMgr.ioFactory(), snpFile, target, snpFile.length());
- }
- catch (Throwable t) {
- errHnd.accept(t);
- }
- }, ctx.cache().context().snapshotMgr().snapshotExecutorService()));
- }
- }
-
- int futsSize = futs.size();
-
- return CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize]));
- }
-
- /**
* @param req Request to prepare cache group restore from the snapshot.
* @return Snapshot restore operation context.
* @throws IgniteCheckedException If failed.
@@ -693,76 +608,76 @@ public class SnapshotRestoreProcess {
throw new IgniteCheckedException(OP_REJECT_MSG +
"The previous snapshot restore operation was not completed.");
}
-
GridCacheSharedContext<?, ?> cctx = ctx.cache().context();
- SnapshotMetadata meta = F.first(cctx.snapshotMgr().readSnapshotMetadatas(req.snapshotName()));
+ List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(req.snapshotName());
+
+ // Collection of baseline nodes that must survive and additional discovery data required for the affinity calculation.
+ DiscoCache discoCache = ctx.discovery().discoCache();
+
+ if (!F.transform(discoCache.aliveBaselineNodes(), F.node2id()).containsAll(req.nodes()))
+ throw new IgniteCheckedException("Restore context cannot be inited since the required baseline nodes missed: " + discoCache);
+
+ DiscoCache discoCache0 = discoCache.copy(discoCache.version(), null);
- if (meta == null || !meta.consistentId().equals(cctx.localNode().consistentId().toString()))
- return new SnapshotRestoreContext(req, Collections.emptyList(), Collections.emptyMap());
+ if (F.first(metas) == null)
+ return new SnapshotRestoreContext(req, discoCache0, Collections.emptyMap(), cctx.localNodeId(), Collections.emptyList());
- if (meta.pageSize() != cctx.database().pageSize()) {
+ if (F.first(metas).pageSize() != cctx.database().pageSize()) {
throw new IgniteCheckedException("Incompatible memory page size " +
- "[snapshotPageSize=" + meta.pageSize() +
+ "[snapshotPageSize=" + F.first(metas).pageSize() +
", local=" + cctx.database().pageSize() +
", snapshot=" + req.snapshotName() +
", nodeId=" + cctx.localNodeId() + ']');
}
- List<File> cacheDirs = new ArrayList<>();
Map<String, StoredCacheData> cfgsByName = new HashMap<>();
FilePageStoreManager pageStore = (FilePageStoreManager)cctx.pageStore();
// Collect the cache configurations and prepare a temporary directory for copying files.
// Metastorage can be restored only manually by directly copying files.
- for (File snpCacheDir : cctx.snapshotMgr().snapshotCacheDirectories(req.snapshotName(), meta.folderName(),
- name -> !METASTORAGE_CACHE_NAME.equals(name)))
- {
- String grpName = FilePageStoreManager.cacheGroupName(snpCacheDir);
+ for (SnapshotMetadata meta : metas) {
+ for (File snpCacheDir : cctx.snapshotMgr().snapshotCacheDirectories(req.snapshotName(), meta.folderName(),
+ name -> !METASTORAGE_CACHE_NAME.equals(name))) {
+ String grpName = FilePageStoreManager.cacheGroupName(snpCacheDir);
- if (!F.isEmpty(req.groups()) && !req.groups().contains(grpName))
- continue;
+ if (!F.isEmpty(req.groups()) && !req.groups().contains(grpName))
+ continue;
- File cacheDir = pageStore.cacheWorkDir(snpCacheDir.getName().startsWith(CACHE_GRP_DIR_PREFIX), grpName);
+ File cacheDir = pageStore.cacheWorkDir(snpCacheDir.getName().startsWith(CACHE_GRP_DIR_PREFIX), grpName);
- if (cacheDir.exists()) {
- if (!cacheDir.isDirectory()) {
- throw new IgniteCheckedException("Unable to restore cache group, file with required directory " +
- "name already exists [group=" + grpName + ", file=" + cacheDir + ']');
- }
+ if (cacheDir.exists()) {
+ if (!cacheDir.isDirectory()) {
+ throw new IgniteCheckedException("Unable to restore cache group, file with required directory " +
+ "name already exists [group=" + grpName + ", file=" + cacheDir + ']');
+ }
- if (cacheDir.list().length > 0) {
- throw new IgniteCheckedException("Unable to restore cache group, directory is not empty " +
- "[group=" + grpName + ", dir=" + cacheDir + ']');
- }
+ if (cacheDir.list().length > 0) {
+ throw new IgniteCheckedException("Unable to restore cache group, directory is not empty " +
+ "[group=" + grpName + ", dir=" + cacheDir + ']');
+ }
- if (!cacheDir.delete()) {
- throw new IgniteCheckedException("Unable to remove empty cache directory " +
- "[group=" + grpName + ", dir=" + cacheDir + ']');
+ if (!cacheDir.delete()) {
+ throw new IgniteCheckedException("Unable to remove empty cache directory " +
+ "[group=" + grpName + ", dir=" + cacheDir + ']');
+ }
}
- }
- File tmpCacheDir = formatTmpDirName(cacheDir);
+ File tmpCacheDir = formatTmpDirName(cacheDir);
- if (tmpCacheDir.exists()) {
- throw new IgniteCheckedException("Unable to restore cache group, temp directory already exists " +
- "[group=" + grpName + ", dir=" + tmpCacheDir + ']');
- }
+ if (tmpCacheDir.exists()) {
+ throw new IgniteCheckedException("Unable to restore cache group, temp directory already exists " +
+ "[group=" + grpName + ", dir=" + tmpCacheDir + ']');
+ }
- if (!tmpCacheDir.mkdir()) {
- throw new IgniteCheckedException("Unable to restore cache group, cannot create temp directory " +
- "[group=" + grpName + ", dir=" + tmpCacheDir + ']');
+ pageStore.readCacheConfigurations(snpCacheDir, cfgsByName);
}
-
- cacheDirs.add(cacheDir);
-
- pageStore.readCacheConfigurations(snpCacheDir, cfgsByName);
}
Map<Integer, StoredCacheData> cfgsById =
cfgsByName.values().stream().collect(Collectors.toMap(v -> CU.cacheId(v.config().getName()), v -> v));
- return new SnapshotRestoreContext(req, cacheDirs, cfgsById);
+ return new SnapshotRestoreContext(req, discoCache0, cfgsById, cctx.localNodeId(), metas);
}
/**
@@ -770,7 +685,7 @@ public class SnapshotRestoreProcess {
* @param res Results.
* @param errs Errors.
*/
- private void finishPrepare(UUID reqId, Map<UUID, ArrayList<StoredCacheData>> res, Map<UUID, Exception> errs) {
+ private void finishPrepare(UUID reqId, Map<UUID, SnapshotRestoreOperationResponse> res, Map<UUID, Exception> errs) {
if (ctx.clientNode())
return;
@@ -787,31 +702,358 @@ public class SnapshotRestoreProcess {
}
if (failure == null)
- failure = checkNodeLeft(opCtx0.nodes, res.keySet());
+ failure = checkNodeLeft(opCtx0.nodes(), res.keySet());
// Context has been created - should rollback changes cluster-wide.
if (failure != null) {
- opCtx0.err.compareAndSet(null, failure);
-
- if (U.isLocalNodeCoordinator(ctx.discovery()))
- rollbackRestoreProc.start(reqId, reqId);
+ opCtx0.errHnd.accept(failure);
return;
}
Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
- for (List<StoredCacheData> storedCfgs : res.values()) {
- if (storedCfgs == null)
- continue;
+ for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e : res.entrySet()) {
+ if (e.getValue().ccfgs != null) {
+ for (StoredCacheData cacheData : e.getValue().ccfgs) {
+ globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
- for (StoredCacheData cacheData : storedCfgs)
- globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
+ opCtx0.dirs.add(((FilePageStoreManager)ctx.cache().context().pageStore()).cacheWorkDir(cacheData.config()));
+ }
+ }
+
+ opCtx0.metasPerNode.computeIfAbsent(e.getKey(), id -> new ArrayList<>())
+ .addAll(e.getValue().metas);
}
opCtx0.cfgs = globalCfgs;
if (U.isLocalNodeCoordinator(ctx.discovery()))
+ preloadProc.start(reqId, reqId);
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @param cache Discovery cache.
+ * @return Map of affinity per each cache group.
+ */
+ private static GridAffinityAssignmentCache calculateAffinity(
+ GridKernalContext ctx,
+ CacheConfiguration<?, ?> ccfg,
+ DiscoCache cache
+ ) {
+ GridAffinityAssignmentCache affCache = GridAffinityAssignmentCache.create(ctx, ccfg.getAffinity(), ccfg);
+
+ affCache.calculate(cache.version(), null, cache);
+
+ return affCache;
+ }
+
+ /**
+ * @param metas List of snapshot metadata to check.
+ * @param grpId Group id.
+ * @param parts Set of partitions to search for.
+ * @return Snapshot metadata which contains a full set of given partitions or {@code null} the otherwise.
+ */
+ private static @Nullable SnapshotMetadata findMetadataWithSamePartitions(
+ List<SnapshotMetadata> metas,
+ int grpId,
+ Set<Integer> parts
+ ) {
+ assert !F.isEmpty(parts) && !parts.contains(INDEX_PARTITION) : parts;
+
+ // Try to copy everything right from the single snapshot part.
+ for (SnapshotMetadata meta : metas) {
+ Set<Integer> grpParts = meta.partitions().get(grpId);
+ Set<Integer> grpWoIndex = grpParts == null ? Collections.emptySet() : new HashSet<>(grpParts);
+
+ grpWoIndex.remove(INDEX_PARTITION);
+
+ if (grpWoIndex.equals(parts))
+ return meta;
+ }
+
+ return null;
+ }
+
+ /**
+ * @param reqId Request id.
+ * @return Future which will be completed when the preload ends.
+ */
+ private IgniteInternalFuture<Boolean> preload(UUID reqId) {
+ if (ctx.clientNode())
+ return new GridFinishedFuture<>();
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+ GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+
+ if (opCtx0 == null)
+ return new GridFinishedFuture<>(new IgniteCheckedException("Snapshot restore process has incorrect restore state: " + reqId));
+
+ if (opCtx0.dirs.isEmpty())
+ return new GridFinishedFuture<>();
+
+ try {
+ if (ctx.isStopping())
+ throw new NodeStoppingException("Node is stopping: " + ctx.localNodeId());
+
+ IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
+
+ synchronized (this) {
+ opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Starting snapshot preload operation to restore cache groups" +
+ "[snapshot=" + opCtx0.snpName +
+ ", caches=" + F.transform(opCtx0.dirs, File::getName) + ']');
+ }
+
+ CompletableFuture<Void> metaFut = ctx.localNodeId().equals(opCtx0.opNodeId) ?
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ SnapshotMetadata meta = F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
+
+ File binDir = binaryWorkDir(snpMgr.snapshotLocalDir(opCtx0.snpName).getAbsolutePath(),
+ meta.folderName());
+
+ ctx.cacheObjects().updateMetadata(binDir, opCtx0.stopChecker);
+ }
+ catch (Throwable t) {
+ log.error("Unable to perform metadata update operation for the cache groups restore process", t);
+
+ opCtx0.errHnd.accept(t);
+ }
+ }, snpMgr.snapshotExecutorService()) : CompletableFuture.completedFuture(null);
+
+ for (StoredCacheData data : opCtx0.cfgs.values()) {
+ opCtx0.affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
+ grp -> calculateAffinity(ctx, data.config(), opCtx0.discoCache));
+ }
+
+ // First preload everything from the local node.
+ List<SnapshotMetadata> locMetas = opCtx0.metasPerNode.get(ctx.localNodeId());
+
+ Map<Integer, Set<PartitionRestoreFuture>> rmtLoadParts = new HashMap<>();
+ ClusterNode locNode = ctx.cache().context().localNode();
+
+ for (File dir : opCtx0.dirs) {
+ String cacheOrGrpName = cacheGroupName(dir);
+ int grpId = CU.cacheId(cacheOrGrpName);
+
+ File tmpCacheDir = formatTmpDirName(dir);
+ tmpCacheDir.mkdir();
+
+ Set<PartitionRestoreFuture> leftParts;
+
+ opCtx0.locProgress.put(grpId,
+ nodeAffinityPartitions(opCtx0.affCache.get(cacheOrGrpName), locNode, PartitionRestoreFuture::new));
+
+ rmtLoadParts.put(grpId, leftParts = new HashSet<>(opCtx0.locProgress.get(grpId)));
+
+ if (leftParts.isEmpty())
+ continue;
+
+ SnapshotMetadata full = findMetadataWithSamePartitions(locMetas,
+ grpId,
+ leftParts.stream().map(p -> p.partId).collect(Collectors.toSet()));
+
+ for (SnapshotMetadata meta : full == null ? locMetas : Collections.singleton(full)) {
+ if (leftParts.isEmpty())
+ break;
+
+ File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx.snpName),
+ Paths.get(databaseRelativePath(meta.folderName()), dir.getName()).toString());
+
+ leftParts.removeIf(partFut -> {
+ boolean doCopy = ofNullable(meta.partitions().get(grpId))
+ .orElse(Collections.emptySet())
+ .contains(partFut.partId);
+
+ if (doCopy) {
+ copyLocalAsync(ctx.cache().context().snapshotMgr(),
+ opCtx,
+ snpCacheDir,
+ tmpCacheDir,
+ partFut);
+ }
+
+ return doCopy;
+ });
+
+ if (meta == full) {
+ assert leftParts.isEmpty() : leftParts;
+
+ if (log.isInfoEnabled()) {
+ log.info("The snapshot was taken on the same cluster topology. The index will be copied to " +
+ "restoring cache group if necessary [snpName=" + opCtx0.snpName + ", dir=" + dir.getName() + ']');
+ }
+
+ File idxFile = new File(snpCacheDir, FilePageStoreManager.getPartitionFileName(INDEX_PARTITION));
+
+ if (idxFile.exists()) {
+ PartitionRestoreFuture idxFut;
+
+ opCtx0.locProgress.computeIfAbsent(grpId, g -> new HashSet<>())
+ .add(idxFut = new PartitionRestoreFuture(INDEX_PARTITION));
+
+ copyLocalAsync(ctx.cache().context().snapshotMgr(),
+ opCtx,
+ snpCacheDir,
+ tmpCacheDir,
+ idxFut);
+ }
+ }
+ }
+ }
+
+ // Load other partitions from remote nodes.
+ List<PartitionRestoreFuture> rmtAwaitParts = rmtLoadParts.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+
+ // This is necessary for sending only one partitions request per each cluster node.
+ Map<UUID, Map<Integer, Set<Integer>>> snpAff = snapshotAffinity(
+ opCtx0.metasPerNode.entrySet()
+ .stream()
+ .filter(e -> !e.getKey().equals(ctx.localNodeId()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
+ (grpId, partId) -> rmtLoadParts.get(grpId) != null &&
+ rmtLoadParts.get(grpId).remove(new PartitionRestoreFuture(partId)));
+
+ Map<Integer, File> grpToDir = opCtx0.dirs.stream()
+ .collect(Collectors.toMap(d -> CU.cacheId(FilePageStoreManager.cacheGroupName(d)),
+ d -> d));
+
+ try {
+ if (log.isInfoEnabled() && !snpAff.isEmpty()) {
+ log.info("Trying to request partitions from remote nodes" +
+ "[snapshot=" + opCtx0.snpName +
+ ", map=" + snpAff.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+ e -> partitionsMapToCompactString(e.getValue()))) + ']');
+ }
+
+ for (Map.Entry<UUID, Map<Integer, Set<Integer>>> m : snpAff.entrySet()) {
+ ctx.cache().context().snapshotMgr()
+ .requestRemoteSnapshotFiles(m.getKey(),
+ opCtx0.snpName,
+ m.getValue(),
+ opCtx0.stopChecker,
+ (snpFile, t) -> {
+ if (opCtx0.stopChecker.getAsBoolean())
+ throw new IgniteInterruptedException("Snapshot remote operation request cancelled.");
+
+ if (t == null) {
+ int grpId = CU.cacheId(cacheGroupName(snpFile.getParentFile()));
+ int partId = partId(snpFile.getName());
+
+ PartitionRestoreFuture partFut = F.find(opCtx0.locProgress.get(grpId),
+ null,
+ new IgnitePredicate<PartitionRestoreFuture>() {
+ @Override public boolean apply(PartitionRestoreFuture f) {
+ return f.partId == partId;
+ }
+ });
+
+ assert partFut != null : snpFile.getAbsolutePath();
+
+ File tmpCacheDir = formatTmpDirName(grpToDir.get(grpId));
+
+ Path partFile = Paths.get(tmpCacheDir.getAbsolutePath(), snpFile.getName());
+
+ try {
+ IgniteSnapshotManager.copy(snpMgr.ioFactory(),
+ snpFile,
+ partFile.toFile(),
+ snpFile.length());
+
+ partFut.complete(partFile);
+ }
+ catch (Exception e) {
+ opCtx0.errHnd.accept(e);
+ completeListExceptionally(rmtAwaitParts, e);
+ }
+ }
+ else {
+ opCtx0.errHnd.accept(t);
+ completeListExceptionally(rmtAwaitParts, t);
+ }
+ });
+ }
+ }
+ catch (IgniteCheckedException e) {
+ opCtx0.errHnd.accept(e);
+ completeListExceptionally(rmtAwaitParts, e);
+ }
+
+ List<PartitionRestoreFuture> allParts = opCtx0.locProgress.values().stream().flatMap(Collection::stream)
+ .collect(Collectors.toList());
+
+ int size = allParts.size();
+
+ CompletableFuture.allOf(allParts.toArray(new CompletableFuture[size]))
+ .runAfterBothAsync(metaFut, () -> {
+ try {
+ if (opCtx0.stopChecker.getAsBoolean())
+ throw new IgniteInterruptedException("The operation has been stopped on temporary directory switch.");
+
+ for (File src : opCtx0.dirs)
+ Files.move(formatTmpDirName(src).toPath(), src.toPath(), StandardCopyOption.ATOMIC_MOVE);
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }, snpMgr.snapshotExecutorService())
+ .whenComplete((r, t) -> opCtx0.errHnd.accept(t))
+ .whenComplete((res, t) -> {
+ Throwable t0 = ofNullable(opCtx0.err.get()).orElse(t);
+
+ if (t0 == null)
+ retFut.onDone(true);
+ else {
+ log.error("Unable to restore cache group(s) from a snapshot " +
+ "[reqId=" + opCtx.reqId + ", snapshot=" + opCtx.snpName + ']', t0);
+
+ retFut.onDone(t0);
+ }
+ });
+ }
+ catch (Exception ex) {
+ opCtx0.errHnd.accept(ex);
+
+ return new GridFinishedFuture<>(ex);
+ }
+
+ return retFut;
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @param res Results.
+ * @param errs Errors.
+ */
+ private void finishPreload(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+ if (ctx.clientNode())
+ return;
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Exception failure = errs.values().stream().findFirst().
+ orElse(checkNodeLeft(opCtx0.nodes(), res.keySet()));
+
+ opCtx0.errHnd.accept(failure);
+
+ if (failure != null) {
+ opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ rollbackRestoreProc.start(reqId, reqId);
+
+ return;
+ }
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
cacheStartProc.start(reqId, reqId);
}
@@ -843,7 +1085,8 @@ public class SnapshotRestoreProcess {
// We set the topology node IDs required to successfully start the cache, if any of the required nodes leave
// the cluster during the cache startup, the whole procedure will be rolled back.
- return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true, false, IgniteUuid.fromUuid(reqId));
+ return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true, false,
+ IgniteUuid.fromUuid(reqId));
}
/**
@@ -858,7 +1101,7 @@ public class SnapshotRestoreProcess {
SnapshotRestoreContext opCtx0 = opCtx;
Exception failure = errs.values().stream().findFirst().
- orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+ orElse(checkNodeLeft(opCtx0.nodes(), res.keySet()));
if (failure == null) {
finishProcess(reqId);
@@ -873,11 +1116,48 @@ public class SnapshotRestoreProcess {
}
/**
+ * @param metas Map of snapshot metadata distribution across the cluster.
+ * @return Map of cache partitions per each node.
+ */
+ private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
+ Map<UUID, List<SnapshotMetadata>> metas,
+ BiPredicate<Integer, Integer> filter
+ ) {
+ Map<UUID, Map<Integer, Set<Integer>>> nodeToSnp = new HashMap<>();
+
+ List<UUID> nodes = new ArrayList<>(metas.keySet());
+ Collections.shuffle(nodes);
+
+ Map<UUID, List<SnapshotMetadata>> shuffleMetas = new LinkedHashMap<>();
+ nodes.forEach(k -> shuffleMetas.put(k, metas.get(k)));
+
+ for (Map.Entry<UUID, List<SnapshotMetadata>> e : shuffleMetas.entrySet()) {
+ UUID nodeId = e.getKey();
+
+ for (SnapshotMetadata meta : ofNullable(e.getValue()).orElse(Collections.emptyList())) {
+ Map<Integer, Set<Integer>> parts = ofNullable(meta.partitions()).orElse(Collections.emptyMap());
+
+ for (Map.Entry<Integer, Set<Integer>> metaParts : parts.entrySet()) {
+ for (Integer partId : metaParts.getValue()) {
+ if (filter.test(metaParts.getKey(), partId)) {
+ nodeToSnp.computeIfAbsent(nodeId, n -> new HashMap<>())
+ .computeIfAbsent(metaParts.getKey(), k -> new HashSet<>())
+ .add(partId);
+ }
+ }
+ }
+ }
+ }
+
+ return nodeToSnp;
+ }
+
+ /**
* @param reqNodes Set of required topology nodes.
* @param respNodes Set of responding topology nodes.
* @return Error, if no response was received from the required topology node.
*/
- private Exception checkNodeLeft(Set<UUID> reqNodes, Set<UUID> respNodes) {
+ private Exception checkNodeLeft(Collection<UUID> reqNodes, Set<UUID> respNodes) {
if (!respNodes.containsAll(reqNodes)) {
Set<UUID> leftNodes = new HashSet<>(reqNodes);
@@ -907,46 +1187,46 @@ public class SnapshotRestoreProcess {
synchronized (this) {
opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+ }
- try {
- ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
- if (log.isInfoEnabled()) {
- log.info("Removing restored cache directories [reqId=" + reqId +
- ", snapshot=" + opCtx0.snpName + ", dirs=" + opCtx0.dirs + ']');
- }
+ try {
+ ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
+ if (log.isInfoEnabled()) {
+ log.info("Removing restored cache directories [reqId=" + reqId +
+ ", snapshot=" + opCtx0.snpName + ", dirs=" + opCtx0.dirs + ']');
+ }
- IgniteCheckedException ex = null;
+ IgniteCheckedException ex = null;
- for (File cacheDir : opCtx0.dirs) {
- File tmpCacheDir = formatTmpDirName(cacheDir);
+ for (File cacheDir : opCtx0.dirs) {
+ File tmpCacheDir = formatTmpDirName(cacheDir);
- if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
- log.error("Unable to perform rollback routine completely, cannot remove temp directory " +
- "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
+ if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
+ log.error("Unable to perform rollback routine completely, cannot remove temp directory " +
+ "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
- ex = new IgniteCheckedException("Unable to remove temporary cache directory " + cacheDir);
- }
+ ex = new IgniteCheckedException("Unable to remove temporary cache directory " + cacheDir);
+ }
- if (cacheDir.exists() && !U.delete(cacheDir)) {
- log.error("Unable to perform rollback routine completely, cannot remove cache directory " +
- "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + cacheDir + ']');
+ if (cacheDir.exists() && !U.delete(cacheDir)) {
+ log.error("Unable to perform rollback routine completely, cannot remove cache directory " +
+ "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + cacheDir + ']');
- ex = new IgniteCheckedException("Unable to remove cache directory " + cacheDir);
- }
+ ex = new IgniteCheckedException("Unable to remove cache directory " + cacheDir);
}
+ }
- if (ex != null)
- retFut.onDone(ex);
- else
- retFut.onDone(true);
- });
- }
- catch (RejectedExecutionException e) {
- log.error("Unable to perform rollback routine, task has been rejected " +
- "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ']');
+ if (ex != null)
+ retFut.onDone(ex);
+ else
+ retFut.onDone(true);
+ });
+ }
+ catch (RejectedExecutionException e) {
+ log.error("Unable to perform rollback routine, task has been rejected " +
+ "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ']');
- retFut.onDone(e);
- }
+ retFut.onDone(e);
}
return retFut;
@@ -968,8 +1248,8 @@ public class SnapshotRestoreProcess {
SnapshotRestoreContext opCtx0 = opCtx;
- if (!res.keySet().containsAll(opCtx0.nodes)) {
- Set<UUID> leftNodes = new HashSet<>(opCtx0.nodes);
+ if (!res.keySet().containsAll(opCtx0.nodes())) {
+ Set<UUID> leftNodes = new HashSet<>(opCtx0.nodes());
leftNodes.removeAll(res.keySet());
@@ -981,6 +1261,83 @@ public class SnapshotRestoreProcess {
}
/**
+ * @param mgr Ignite snapshot manager.
+ * @param opCtx Snapshot operation context.
+ * @param srcDir Snapshot directory to copy from.
+ * @param targetDir Destination directory to copy to.
+ */
+ private static void copyLocalAsync(
+ IgniteSnapshotManager mgr,
+ SnapshotRestoreContext opCtx,
+ File srcDir,
+ File targetDir,
+ PartitionRestoreFuture partFut
+ ) {
+ File snpFile = new File(srcDir, FilePageStoreManager.getPartitionFileName(partFut.partId));
+ Path partFile = Paths.get(targetDir.getAbsolutePath(), FilePageStoreManager.getPartitionFileName(partFut.partId));
+
+ CompletableFuture.supplyAsync(() -> {
+ if (opCtx.stopChecker.getAsBoolean())
+ throw new IgniteInterruptedException("The operation has been stopped on copy file: " + snpFile.getAbsolutePath());
+
+ if (Thread.interrupted())
+ throw new IgniteInterruptedException("Thread has been interrupted: " + Thread.currentThread().getName());
+
+ if (!snpFile.exists()) {
+ throw new IgniteException("Partition snapshot file doesn't exist [snpName=" + opCtx.snpName +
+ ", snpDir=" + snpFile.getAbsolutePath() + ", name=" + snpFile.getName() + ']');
+ }
+
+ IgniteSnapshotManager.copy(mgr.ioFactory(), snpFile, partFile.toFile(), snpFile.length());
+
+ return partFile;
+ }, mgr.snapshotExecutorService())
+ .whenComplete((r, t) -> opCtx.errHnd.accept(t))
+ .whenComplete((r, t) -> {
+ if (t == null)
+ partFut.complete(partFile);
+ else
+ partFut.completeExceptionally(t);
+ });
+ }
+
+ /**
+ * @param affCache Affinity cache.
+ * @param node Cluster node to get assigned partitions.
+ * @return The set of partitions assigned to the given node.
+ */
+ private static <T> Set<T> nodeAffinityPartitions(
+ GridAffinityAssignmentCache affCache,
+ ClusterNode node,
+ IntFunction<T> factory
+ ) {
+ return IntStream.range(0, affCache.partitions())
+ .filter(p -> affCache.idealAssignment().assignment().get(p).contains(node))
+ .mapToObj(factory)
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * @param col Collection of sets to complete.
+ * @param ex Exception to set.
+ */
+ private static void completeListExceptionally(List<PartitionRestoreFuture> col, Throwable ex) {
+ for (PartitionRestoreFuture f : col)
+ f.completeExceptionally(ex);
+ }
+
+ /**
+ * @param map Map of partitions and cache groups.
+ * @return String representation.
+ */
+ private static String partitionsMapToCompactString(Map<Integer, Set<Integer>> map) {
+ return map.entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> S.compact(e.getValue())))
+ .toString();
+ }
+
+ /**
* Cache group restore from snapshot operation context.
*/
private static class SnapshotRestoreContext {
@@ -990,15 +1347,43 @@ public class SnapshotRestoreProcess {
/** Snapshot name. */
private final String snpName;
- /** Baseline node IDs that must be alive to complete the operation. */
- private final Set<UUID> nodes;
+ /** Baseline discovery cache for node IDs that must be alive to complete the operation.*/
+ private final DiscoCache discoCache;
- /** List of restored cache group directories. */
- private final Collection<File> dirs;
+ /** Operational node id. */
+ private final UUID opNodeId;
+
+ /**
+ * Set of restored cache groups path on local node. Collected when all cache configurations received
+ * from the <tt>prepare</tt> distributed process.
+ */
+ private final Set<File> dirs = new HashSet<>();
/** The exception that led to the interruption of the process. */
private final AtomicReference<Throwable> err = new AtomicReference<>();
+ /** Distribution of snapshot metadata files across the cluster. */
+ private final Map<UUID, List<SnapshotMetadata>> metasPerNode = new HashMap<>();
+
+ /** Context error handler. */
+ private final Consumer<Throwable> errHnd = (ex) -> err.compareAndSet(null, ex);
+
+ /** Stop condition checker. */
+ private final BooleanSupplier stopChecker = () -> err.get() != null;
+
+ /** Progress of processing cache group partitions on the local node.*/
+ private final Map<Integer, Set<PartitionRestoreFuture>> locProgress = new HashMap<>();
+
+ /**
+ * The stop future responsible for stopping cache groups during the rollback phase. Will be completed when the rollback
+ * process executes and all the cache group stop actions completes (the processCacheStopRequestOnExchangeDone finishes
+ * successfully and all the data deleted from disk).
+ */
+ private final GridFutureAdapter<Void> locStopCachesCompleteFut = new GridFutureAdapter<>();
+
+ /** Calculated affinity assignment cache per each cache group. */
+ private final Map<String, GridAffinityAssignmentCache> affCache = new ConcurrentHashMap<>();
+
/** Cache ID to configuration mapping. */
private volatile Map<Integer, StoredCacheData> cfgs;
@@ -1007,17 +1392,90 @@ public class SnapshotRestoreProcess {
/**
* @param req Request to prepare cache group restore from the snapshot.
- * @param dirs List of cache group names to restore from the snapshot.
* @param cfgs Cache ID to configuration mapping.
*/
- protected SnapshotRestoreContext(SnapshotOperationRequest req, Collection<File> dirs,
- Map<Integer, StoredCacheData> cfgs) {
+ protected SnapshotRestoreContext(
+ SnapshotOperationRequest req,
+ DiscoCache discoCache,
+ Map<Integer, StoredCacheData> cfgs,
+ UUID locNodeId,
+ List<SnapshotMetadata> locMetas
+ ) {
reqId = req.requestId();
snpName = req.snapshotName();
- nodes = new HashSet<>(req.nodes());
+ opNodeId = req.operationalNodeId();
+ this.discoCache = discoCache;
- this.dirs = dirs;
this.cfgs = cfgs;
+
+ metasPerNode.computeIfAbsent(locNodeId, id -> new ArrayList<>()).addAll(locMetas);
+ }
+
+ /**
+ * @return Required baseline nodeIds that must be alive to complete restore operation.
+ */
+ public Collection<UUID> nodes() {
+ return F.transform(discoCache.aliveBaselineNodes(), F.node2id());
+ }
+ }
+
+ /** Snapshot operation prepare response. */
+ private static class SnapshotRestoreOperationResponse implements Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache configurations on local node. */
+ private final List<StoredCacheData> ccfgs;
+
+ /** Snapshot metadata files on local node. */
+ private final List<SnapshotMetadata> metas;
+
+ /**
+ * @param ccfgs Cache configurations on local node.
+ * @param metas Snapshot metadata files on local node.
+ */
+ public SnapshotRestoreOperationResponse(
+ Collection<StoredCacheData> ccfgs,
+ Collection<SnapshotMetadata> metas
+ ) {
+ this.ccfgs = new ArrayList<>(ccfgs);
+ this.metas = new ArrayList<>(metas);
+ }
+ }
+
+ /** Future will be completed when partition processing ends. */
+ private static class PartitionRestoreFuture extends CompletableFuture<Path> {
+ /** Partition id. */
+ private final int partId;
+
+ /**
+ * @param partId Partition id.
+ */
+ private PartitionRestoreFuture(int partId) {
+ this.partId = partId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ PartitionRestoreFuture future = (PartitionRestoreFuture)o;
+
+ return partId == future.partId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(partId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PartitionRestoreFuture.class, this);
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
index 730b9e1..d91b5bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
@@ -453,6 +453,11 @@ public class DistributedProcess<I extends Serializable, R extends Serializable>
RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE,
/**
+ * Cache group restore preload phase.
+ */
+ RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD,
+
+ /**
* Cache group restore cache start phase.
*/
RESTORE_CACHE_GROUP_SNAPSHOT_START,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 77de5cd..e7c06a1 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -716,7 +716,7 @@ public class GridFunc {
*
* @return Closure which converts node to node ID.
*/
- public static IgniteClosure<ClusterNode, UUID> node2id() {
+ public static IgniteClosure<? super ClusterNode, UUID> node2id() {
return NODE2ID;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 93ef0e2..82847cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -37,6 +37,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -58,6 +59,7 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.encryption.AbstractEncryptionTest;
@@ -358,9 +360,32 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
Function<Integer, V> factory,
CacheConfiguration<Integer, V>... ccfgs
) throws Exception {
- for (int g = 0; g < grids; g++)
- startGrid(optimize(getConfiguration(getTestIgniteInstanceName(g))
- .setCacheConfiguration(ccfgs)));
+ return startGridsWithCache(grids, keys, factory, (id, cfg) -> cfg.getWorkDirectory(), ccfgs);
+ }
+
+ /**
+ * @param grids Number of ignite instances to start.
+ * @param keys Number of keys to create.
+ * @param factory Factory which produces values.
+ * @param <V> Cache value type.
+ * @return Ignite coordinator instance.
+ * @throws Exception If fails.
+ */
+ protected <V> IgniteEx startGridsWithCache(
+ int grids,
+ int keys,
+ Function<Integer, V> factory,
+ BiFunction<Integer, IgniteConfiguration, String> newWorkDir,
+ CacheConfiguration<Integer, V>... ccfgs
+ ) throws Exception {
+ for (int g = 0; g < grids; g++) {
+ IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(g))
+ .setCacheConfiguration(ccfgs));
+
+ cfg.setWorkDirectory(newWorkDir.apply(g, cfg));
+
+ startGrid(cfg);
+ }
IgniteEx ig = grid(0);
@@ -519,32 +544,6 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
}
/**
- * @param snpName Unique snapshot name.
- * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
- * @param snpSndr Sender which used for snapshot sub-task processing.
- * @return Future which will be completed when snapshot is done.
- */
- protected SnapshotFutureTask startLocalSnapshotTask(
- GridCacheSharedContext<?, ?> cctx,
- String snpName,
- Map<Integer, Set<Integer>> parts,
- SnapshotSender snpSndr
- ) throws IgniteCheckedException {
- SnapshotFutureTask snpFutTask = cctx.snapshotMgr().registerSnapshotTask(snpName, cctx.localNodeId(), parts, encryption, snpSndr);
-
- snpFutTask.start();
-
- // Snapshot is still in the INIT state. beforeCheckpoint has been skipped
- // due to checkpoint already running and we need to schedule the next one
- // right after current will be completed.
- cctx.database().forceCheckpoint(String.format(CP_SNAPSHOT_REASON, snpName));
-
- snpFutTask.started().get();
-
- return snpFutTask;
- }
-
- /**
* @param grids Grids to block snapshot executors.
* @return Wrapped snapshot executor list.
*/
@@ -604,6 +603,39 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
}
/**
+ * @param snpName Unique snapshot name.
+ * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
+ * @param snpSndr Sender which used for snapshot sub-task processing.
+ * @return Future which will be completed when snapshot is done.
+ */
+ protected static IgniteInternalFuture<?> startLocalSnapshotTask(
+ GridCacheSharedContext<?, ?> cctx,
+ String snpName,
+ Map<Integer, Set<Integer>> parts,
+ boolean withMetaStorage,
+ SnapshotSender snpSndr
+ ) throws IgniteCheckedException {
+ AbstractSnapshotFutureTask<?> task = cctx.snapshotMgr().registerSnapshotTask(snpName, cctx.localNodeId(), parts,
+ withMetaStorage, snpSndr);
+
+ if (!(task instanceof SnapshotFutureTask))
+ throw new IgniteCheckedException("Snapshot task hasn't been registered: " + task);
+
+ SnapshotFutureTask snpFutTask = (SnapshotFutureTask)task;
+
+ snpFutTask.start();
+
+ // Snapshot is still in the INIT state. beforeCheckpoint has been skipped
+ // due to checkpoint already running and we need to schedule the next one
+ // right after current will be completed.
+ cctx.database().forceCheckpoint(String.format(CP_SNAPSHOT_REASON, snpName));
+
+ snpFutTask.started().get();
+
+ return snpFutTask;
+ }
+
+ /**
* @param ignite Ignite instance to resolve discovery spi to.
* @return BlockingCustomMessageDiscoverySpi instance.
*/
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
index 49a3c62..c5b4ccb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
@@ -598,6 +598,7 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx,
SNAPSHOT_NAME,
F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+ encryption,
mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME));
snpFut.get();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java
index 579e9c3..a876f74 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java
@@ -21,12 +21,25 @@ import java.util.Collections;
import java.util.function.Function;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
+import org.apache.ignite.lang.IgniteFuture;
import org.junit.runners.Parameterized;
/**
* Snapshot restore test base.
*/
public abstract class IgniteClusterSnapshotRestoreBaseTest extends AbstractSnapshotSelfTest {
+ /** Cache 1 name. */
+ protected static final String CACHE1 = "cache1";
+
+ /** Cache 2 name. */
+ protected static final String CACHE2 = "cache2";
+
+ /** Default shared cache group name. */
+ protected static final String SHARED_GRP = "shared";
+
/** Parameters. Encrypted snapshots are not supported. */
@Parameterized.Parameters(name = "Encryption is disabled")
public static Iterable<Boolean> disabledEncryption() {
@@ -43,6 +56,29 @@ public abstract class IgniteClusterSnapshotRestoreBaseTest extends AbstractSnaps
return startGridsWithSnapshot(nodesCnt, keysCnt, false);
}
+ /**
+ * @param spi Test communication spi.
+ * @param restorePhase The type of distributed process on which communication is blocked.
+ * @param grpName Cache group name.
+ * @return Snapshot restore future.
+ * @throws InterruptedException if interrupted.
+ */
+ protected IgniteFuture<Void> waitForBlockOnRestore(
+ TestRecordingCommunicationSpi spi,
+ DistributedProcess.DistributedProcessType restorePhase,
+ String grpName
+ ) throws InterruptedException {
+ spi.blockMessages((node, msg) ->
+ msg instanceof SingleNodeMessage && ((SingleNodeMessage<?>)msg).type() == restorePhase.ordinal());
+
+ IgniteFuture<Void> fut =
+ grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(grpName));
+
+ spi.waitForBlocked();
+
+ return fut;
+ }
+
/** */
protected class BinaryValueBuilder implements Function<Integer, Object> {
/** Binary type name. */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
index e6f93b5..b6361e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
@@ -35,7 +35,6 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.IgniteSnapshot;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
@@ -59,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType;
import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
+import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.IgniteSpiException;
@@ -74,6 +74,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileName;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.TMP_CACHE_DIR_PREFIX;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
@@ -86,15 +87,6 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
/** Type name used for binary and SQL. */
private static final String TYPE_NAME = "CustomType";
- /** Cache 1 name. */
- private static final String CACHE1 = "cache1";
-
- /** Cache 2 name. */
- private static final String CACHE2 = "cache2";
-
- /** Default shared cache group name. */
- private static final String SHARED_GRP = "shared";
-
/** Reset consistent ID flag. */
private boolean resetConsistentId;
@@ -164,14 +156,25 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
awaitPartitionMapExchange();
+ for (Ignite g : G.allGrids())
+ TestRecordingCommunicationSpi.spi(g).record(SnapshotFilesRequestMessage.class);
+
// Restore all cache groups.
grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
+ awaitPartitionMapExchange(true, true, null, true);
+
assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
assertCacheKeys(ignite.cache(CACHE1), CACHE_KEYS_RANGE);
assertCacheKeys(ignite.cache(CACHE2), CACHE_KEYS_RANGE);
waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
+
+ // Ensure there is no remote snapshot requests occurred.
+ for (Ignite g : G.allGrids()) {
+ assertTrue("Snapshot files remote requests must not happened due to all the files are available locally",
+ TestRecordingCommunicationSpi.spi(g).recordedMessages(true).isEmpty());
+ }
}
/** @throws Exception If failed. */
@@ -364,14 +367,10 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
resetBaselineTopology();
- IgniteFuture<Void> fut =
- grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+ grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
- GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), IgniteIllegalStateException.class, null);
-
- ensureCacheAbsent(dfltCacheCfg);
-
- waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED);
+ assertCacheKeys(grid(0).cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
}
/** @throws Exception If failed. */
@@ -507,7 +506,7 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
*/
@Test
public void testParallelCacheStartWithTheSameNameOnPrepare() throws Exception {
- checkCacheStartWithTheSameName(RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, IgniteCheckedException.class,
+ checkCacheStartWithTheSameName(RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD, IgniteCheckedException.class,
"Cache start failed. A cache or group with the same name is currently being restored from a snapshot");
}
@@ -555,7 +554,7 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(3));
- IgniteFuture<Void> fut = waitForBlockOnRestore(spi, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, DEFAULT_CACHE_NAME);
+ IgniteFuture<Void> fut = waitForBlockOnRestore(spi, RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD, DEFAULT_CACHE_NAME);
IgniteInternalFuture<?> fut0 = runAsync(() -> stopGrid(3, true));
@@ -574,12 +573,15 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED);
- GridTestUtils.assertThrowsAnyCause(
- log,
- () -> startGrid(3),
- IgniteSpiException.class,
- "to add the node to cluster - remove directories with the caches"
- );
+ dfltCacheCfg = null;
+
+ // Should start successfully.
+ Ignite ignite = startGrid(3);
+
+ resetBaselineTopology();
+ awaitPartitionMapExchange();
+
+ assertNull(ignite.cache(DEFAULT_CACHE_NAME));
}
/** @throws Exception If failed. */
@@ -594,7 +596,7 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
CountDownLatch stopLatch = new CountDownLatch(1);
spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage &&
- ((SingleNodeMessage<?>)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal());
+ ((SingleNodeMessage<?>)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD.ordinal());
String failingFilePath = Paths.get(CACHE_DIR_PREFIX + DEFAULT_CACHE_NAME,
PART_FILE_PREFIX + (dfltCacheCfg.getAffinity().partitions() / 2) + FILE_SUFFIX).toString();
@@ -760,29 +762,6 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
}
/**
- * @param spi Test communication spi.
- * @param restorePhase The type of distributed process on which communication is blocked.
- * @param grpName Cache group name.
- * @return Snapshot restore future.
- * @throws InterruptedException if interrupted.
- */
- private IgniteFuture<Void> waitForBlockOnRestore(
- TestRecordingCommunicationSpi spi,
- DistributedProcessType restorePhase,
- String grpName
- ) throws InterruptedException {
- spi.blockMessages((node, msg) ->
- msg instanceof SingleNodeMessage && ((SingleNodeMessage<?>)msg).type() == restorePhase.ordinal());
-
- IgniteFuture<Void> fut =
- grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(grpName));
-
- spi.waitForBlocked();
-
- return fut;
- }
-
- /**
* Custom I/O factory to preprocessing created files.
*/
private static class CustomFileIOFactory implements FileIOFactory {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
index 55468f4..dd83af1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
@@ -131,7 +131,7 @@ public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
}, 5, "cache-loader-");
// Register task but not schedule it on the checkpoint.
- SnapshotFutureTask snpFutTask = mgr.registerSnapshotTask(SNAPSHOT_NAME,
+ SnapshotFutureTask snpFutTask = (SnapshotFutureTask)mgr.registerSnapshotTask(SNAPSHOT_NAME,
cctx.localNodeId(),
F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
encryption,
@@ -256,6 +256,7 @@ public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx0,
SNAPSHOT_NAME,
F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+ encryption,
mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME));
// Check the right exception thrown.
@@ -279,6 +280,7 @@ public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
IgniteInternalFuture<?> fut = startLocalSnapshotTask(ig.context().cache().context(),
SNAPSHOT_NAME,
parts,
+ encryption,
new DelegateSnapshotSender(log, mgr0.snapshotExecutorService(),
mgr0.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)) {
@Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
@@ -314,6 +316,7 @@ public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx0,
SNAPSHOT_NAME,
F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+ encryption,
new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)) {
@Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
try {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
new file mode 100644
index 0000000..8d78836
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.TransmissionCancelledException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+public class IgniteSnapshotRemoteRequestTest extends IgniteClusterSnapshotRestoreBaseTest {
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotRemoteRequestFromSingleNode() throws Exception {
+ int rqCnt = 10;
+
+ IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ Map<Integer, Set<Integer>> parts = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME), grid(1).localNode().id());
+
+ awaitPartitionMapExchange();
+
+ CountDownLatch latch = new CountDownLatch(parts.values().stream().mapToInt(Set::size).sum() * rqCnt);
+ GridCompoundFuture<Void, Void> compFut = new GridCompoundFuture<>();
+
+ IgniteInternalFuture<?> runFut = GridTestUtils.runMultiThreadedAsync(() -> {
+ try {
+ Map<Integer, Set<Integer>> parts0 = new HashMap<>();
+
+ for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
+ parts0.computeIfAbsent(e.getKey(), k -> new HashSet<>()).addAll(e.getValue());
+
+ IgniteInternalFuture<Void> locFut = null;
+
+ compFut.add(locFut = snp(ignite).requestRemoteSnapshotFiles(grid(1).localNode().id(),
+ SNAPSHOT_NAME,
+ parts,
+ () -> false,
+ defaultPartitionConsumer(parts0, latch)));
+
+ locFut.listen(f -> assertEquals("All partitions must be handled: " + parts0,
+ F.size(parts0.values(), Set::isEmpty), parts0.size()));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }, rqCnt, "rq-creator-");
+
+ runFut.get(TIMEOUT);
+ U.await(latch, TIMEOUT, TimeUnit.MILLISECONDS);
+ compFut.markInitialized().get(TIMEOUT);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotRemoteRequestEachOther() throws Exception {
+ IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ IgniteSnapshotManager mgr0 = snp(ignite);
+ IgniteSnapshotManager mgr1 = snp(grid(1));
+
+ UUID node0 = grid(0).localNode().id();
+ UUID node1 = grid(1).localNode().id();
+
+ Map<Integer, Set<Integer>> fromNode1 = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME), node1);
+ Map<Integer, Set<Integer>> fromNode0 = owningParts(grid(1), CU.cacheId(DEFAULT_CACHE_NAME), node0);
+
+ G.allGrids().forEach(g -> TestRecordingCommunicationSpi.spi(g)
+ .blockMessages((n, msg) -> msg instanceof SnapshotFilesRequestMessage));
+
+ CountDownLatch latch = new CountDownLatch(fromNode1.values().stream().mapToInt(Set::size).sum() +
+ fromNode0.values().stream().mapToInt(Set::size).sum());
+
+ // Snapshot must be taken on node1 and transmitted to node0.
+ IgniteInternalFuture<?> futFrom1To0 = mgr0.requestRemoteSnapshotFiles(node1, SNAPSHOT_NAME, fromNode1, () -> false,
+ defaultPartitionConsumer(fromNode1, latch));
+ IgniteInternalFuture<?> futFrom0To1 = mgr1.requestRemoteSnapshotFiles(node0, SNAPSHOT_NAME, fromNode0, () -> false,
+ defaultPartitionConsumer(fromNode0, latch));
+
+ G.allGrids().forEach(g -> TestRecordingCommunicationSpi.spi(g).stopBlock());
+
+ latch.await(TIMEOUT, TimeUnit.MILLISECONDS);
+
+ futFrom0To1.get(TIMEOUT);
+ futFrom1To0.get(TIMEOUT);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testRemoteRequestedInitiatorNodeLeft() throws Exception {
+ IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ awaitPartitionMapExchange();
+
+ IgniteSnapshotManager mgr1 = snp(grid(1));
+ UUID rmtNodeId = grid(1).localNode().id();
+ UUID locNodeId = grid(0).localNode().id();
+
+ Map<Integer, Set<Integer>> parts = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME), rmtNodeId);
+
+ CountDownLatch sndLatch = new CountDownLatch(1);
+
+ mgr1.remoteSnapshotSenderFactory(new BiFunction<String, UUID, SnapshotSender>() {
+ @Override public SnapshotSender apply(String s, UUID uuid) {
+ return new DelegateSnapshotSender(log, mgr1.snapshotExecutorService(), mgr1.remoteSnapshotSenderFactory(s, uuid)) {
+ @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+ if (partId(part.getName()) > 0) {
+ try {
+ sndLatch.await(TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ super.sendPart0(part, cacheDirName, pair, length);
+ }
+ };
+ }
+ });
+
+ snp(ignite).requestRemoteSnapshotFiles(grid(1).localNode().id(),
+ SNAPSHOT_NAME,
+ parts,
+ () -> false,
+ (part, t) -> {
+ });
+
+ IgniteInternalFuture<?>[] futs = new IgniteInternalFuture[1];
+
+ assertTrue(waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ IgniteInternalFuture<?> snpFut = snp(grid(1)).lastScheduledSnapshotResponseRemoteTask(locNodeId);
+
+ if (snpFut == null)
+ return false;
+ else {
+ futs[0] = snpFut;
+
+ return true;
+ }
+ }
+ }, 5_000L));
+
+ stopGrid(0);
+ sndLatch.countDown();
+
+ GridTestUtils.assertThrowsAnyCause(log, () -> futs[0].get(TIMEOUT), ClusterTopologyCheckedException.class, null);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotRequestRemoteSourceNodeLeft() throws Exception {
+ IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ Map<Integer, Set<Integer>> parts = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME),
+ grid(1).localNode().id());
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> fut = snp(ignite).requestRemoteSnapshotFiles(grid(1).localNode().id(),
+ SNAPSHOT_NAME,
+ parts,
+ () -> false,
+ (part, t) -> {
+ if (t == null) {
+ int grpId = CU.cacheId(cacheGroupName(part.getParentFile()));
+
+ assertTrue("Received cache group has not been requested", parts.containsKey(grpId));
+ assertTrue("Received partition has not been requested",
+ parts.get(grpId).contains(partId(part.getName())));
+
+ try {
+ U.await(latch, TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ else {
+ assertTrue(t instanceof ClusterTopologyCheckedException);
+ assertNull(part);
+ }
+ });
+
+ stopGrid(1);
+
+ latch.countDown();
+
+ assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class,
+ "he node from which a snapshot has been requested left the grid");
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotRequestRemoteCancel() throws Exception {
+ IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ Map<Integer, Set<Integer>> parts = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME),
+ grid(1).localNode().id());
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicBoolean stopChecker = new AtomicBoolean();
+
+ IgniteInternalFuture<Void> fut = snp(ignite).requestRemoteSnapshotFiles(grid(1).localNode().id(),
+ SNAPSHOT_NAME,
+ parts,
+ stopChecker::get,
+ (part, t) -> {
+ try {
+ U.await(latch, TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteException(e);
+ }
+ });
+
+ IgniteInternalFuture<?>[] futs = new IgniteInternalFuture[1];
+
+ assertTrue(waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ IgniteInternalFuture<?> snpFut = snp(grid(1))
+ .lastScheduledSnapshotResponseRemoteTask(grid(0).localNode().id());
+
+ if (snpFut == null)
+ return false;
+ else {
+ futs[0] = snpFut;
+
+ return true;
+ }
+ }
+ }, 5_000L));
+
+ stopChecker.set(true);
+ latch.countDown();
+
+ assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), TransmissionCancelledException.class,
+ "Future cancelled prior to the all requested partitions processed");
+ }
+
+ /**
+ * @param parts Expected partitions.
+ * @param latch Latch to await partitions processed.
+ * @return Consumer.
+ */
+ private static BiConsumer<File, Throwable> defaultPartitionConsumer(Map<Integer, Set<Integer>> parts, CountDownLatch latch) {
+ return (part, t) -> {
+ assertNull(t);
+
+ int grpId = CU.cacheId(cacheGroupName(part.getParentFile()));
+
+ assertTrue("Received cache group has not been requested", parts.containsKey(grpId));
+ assertTrue("Received partition has not been requested",
+ parts.get(grpId).remove(partId(part.getName())));
+
+ latch.countDown();
+ };
+ }
+
+ /**
+ * @param src Source node to calculate.
+ * @param grpId Group id to collect OWNING partitions.
+ * @param rmtNodeId Remote node id.
+ * @return Map of collected parts.
+ */
+ private static Map<Integer, Set<Integer>> owningParts(IgniteEx src, int grpId, UUID rmtNodeId) {
+ return Collections.singletonMap(grpId, src.context()
+ .cache()
+ .cacheGroup(grpId)
+ .topology()
+ .partitions(rmtNodeId)
+ .entrySet()
+ .stream()
+ .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet()));
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
new file mode 100644
index 0000000..2c532fd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT;
+import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED;
+import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/** */
+public class IgniteSnapshotRestoreFromRemoteTest extends IgniteClusterSnapshotRestoreBaseTest {
+ /** */
+ private static final String FIRST_CLUSTER_PREFIX = "one_";
+
+ /** */
+ private static final String SECOND_CLUSTER_PREFIX = "two_";
+
+ /** */
+ private static final String CACHE_WITH_NODE_FILTER = "cacheWithFilter";
+
+ /** Node filter filter test restoring on some nodes only. */
+ private static final IgnitePredicate<ClusterNode> ZERO_SUFFIX_NODE_FILTER = new IgnitePredicate<ClusterNode>() {
+ @Override public boolean apply(ClusterNode node) {
+ return node.consistentId().toString().endsWith("0");
+ }
+ };
+
+ /** {@code true} if snapshot parts has been initialized on test-class startup. */
+ private static boolean inited;
+
+ /** Snapshot parts on dedicated cluster. Each part has its own local directory. */
+ private static final Set<Path> snpParts = new HashSet<>();
+
+ /** */
+ private static final Function<String, BiFunction<Integer, IgniteConfiguration, String>> CLUSTER_DIR =
+ new Function<String, BiFunction<Integer, IgniteConfiguration, String>>() {
+ @Override public BiFunction<Integer, IgniteConfiguration, String> apply(String prefix) {
+ return (id, cfg) -> Paths.get(defaultWorkDirectory().toString(),
+ prefix + U.maskForFileName(cfg.getIgniteInstanceName())).toString();
+ }
+ };
+
+ /** Cache value builder. */
+ private final Function<Integer, Object> valBuilder = String::valueOf;
+
+ /** @throws Exception If fails. */
+ @Before
+ public void prepareDedicatedSnapshot() throws Exception {
+ if (!inited) {
+ cleanupDedicatedPersistenceDirs(FIRST_CLUSTER_PREFIX);
+
+ CacheConfiguration<Integer, Object> cacheCfg1 =
+ txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE1)).setGroupName(SHARED_GRP);
+
+ CacheConfiguration<Integer, Object> cacheCfg2 =
+ txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE2)).setGroupName(SHARED_GRP);
+
+ CacheConfiguration<Integer, Object> cacheCfg3 =
+ txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE_WITH_NODE_FILTER))
+ .setBackups(1)
+ .setAffinity(new RendezvousAffinityFunction(false, 16))
+ .setNodeFilter(ZERO_SUFFIX_NODE_FILTER);
+
+ IgniteEx ignite = startDedicatedGridsWithCache(FIRST_CLUSTER_PREFIX, 6, CACHE_KEYS_RANGE, valBuilder,
+ dfltCacheCfg.setBackups(0), cacheCfg1, cacheCfg2, cacheCfg3);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ awaitPartitionMapExchange();
+ stopAllGrids();
+
+ snpParts.addAll(findSnapshotParts(FIRST_CLUSTER_PREFIX, SNAPSHOT_NAME));
+
+ inited = true;
+ }
+
+ beforeTestSnapshot();
+ cleanupDedicatedPersistenceDirs(SECOND_CLUSTER_PREFIX);
+ }
+
+ /** @throws Exception If fails. */
+ @After
+ public void afterSwitchSnapshot() throws Exception {
+ afterTestSnapshot();
+ cleanupDedicatedPersistenceDirs(SECOND_CLUSTER_PREFIX);
+ }
+
+ /** */
+ @AfterClass
+ public static void cleanupSnapshot() {
+ snpParts.forEach(U::delete);
+ cleanupDedicatedPersistenceDirs(FIRST_CLUSTER_PREFIX);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testRestoreAllGroups() throws Exception {
+ IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+ scc.cluster().state(ClusterState.ACTIVE);
+
+ copyAndShuffle(snpParts, G.allGrids());
+
+ grid(0).cache(DEFAULT_CACHE_NAME).destroy();
+
+ for (Ignite g : G.allGrids())
+ TestRecordingCommunicationSpi.spi(g).record(SnapshotFilesRequestMessage.class);
+
+ // Restore all cache groups.
+ grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
+
+ awaitPartitionMapExchange(true, true, null, true);
+
+ assertCacheKeys(scc.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+ assertCacheKeys(scc.cache(CACHE1), CACHE_KEYS_RANGE);
+ assertCacheKeys(scc.cache(CACHE2), CACHE_KEYS_RANGE);
+
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
+
+ List<Object> msgs = new ArrayList<>();
+
+ for (Ignite g : G.allGrids())
+ msgs.addAll(TestRecordingCommunicationSpi.spi(g).recordedMessages(true));
+
+ assertPartitionsDuplicates(msgs);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testRestoreNoRebalance() throws Exception {
+ IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+ scc.cluster().state(ClusterState.ACTIVE);
+
+ copyAndShuffle(snpParts, G.allGrids());
+
+ grid(0).cache(DEFAULT_CACHE_NAME).destroy();
+
+ for (Ignite g : G.allGrids())
+ TestRecordingCommunicationSpi.spi(g).record(GridDhtPartitionDemandMessage.class);
+
+ grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(CACHE_WITH_NODE_FILTER)).get(TIMEOUT);
+
+ awaitPartitionMapExchange(true, true, null, true);
+
+ assertCacheKeys(scc.cache(CACHE_WITH_NODE_FILTER), CACHE_KEYS_RANGE);
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
+
+ for (Ignite g : G.allGrids())
+ assertTrue(TestRecordingCommunicationSpi.spi(g).recordedMessages(true).isEmpty());
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testSnapshotCachesStoppedIfLoadingFailOnRemote() throws Exception {
+ IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+ scc.cluster().state(ClusterState.ACTIVE);
+
+ copyAndShuffle(snpParts, G.allGrids());
+
+ grid(0).cache(DEFAULT_CACHE_NAME).destroy();
+
+ IgniteSnapshotManager mgr = snp(grid(1));
+ mgr.remoteSnapshotSenderFactory(new BiFunction<String, UUID, SnapshotSender>() {
+ @Override public SnapshotSender apply(String s, UUID uuid) {
+ return new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.remoteSnapshotSenderFactory(s, uuid)) {
+ @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+ if (partId(part.getName()) > 0)
+ throw new IgniteException("Test exception. Uploading partition file failed: " + pair);
+
+ super.sendPart0(part, cacheDirName, pair, length);
+ }
+ };
+ }
+ });
+
+ IgniteFuture<?> fut = grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null);
+
+ GridTestUtils.assertThrowsAnyCause(log,
+ () -> fut.get(TIMEOUT),
+ IgniteException.class,
+ "Test exception. Uploading partition file failed");
+ assertNull(scc.cache(DEFAULT_CACHE_NAME));
+ ensureCacheAbsent(dfltCacheCfg);
+ }
+
+ /**
+ * @param snpParts Snapshot parts.
+ * @param toNodes List of toNodes to copy parts to.
+ */
+ private static void copyAndShuffle(Set<Path> snpParts, List<Ignite> toNodes) {
+ AtomicInteger cnt = new AtomicInteger();
+
+ snpParts.forEach(p -> {
+ try {
+ IgniteEx loc = (IgniteEx)toNodes.get(cnt.getAndIncrement() % toNodes.size());
+ String snpName = p.getFileName().toString();
+
+ U.copy(p.toFile(),
+ Paths.get(resolveSnapshotWorkDirectory(loc.configuration()).getAbsolutePath(), snpName).toFile(),
+ false);
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ });
+ }
+
+ /**
+ * @param clusterPrefix Array of prefixes to clean up directories.
+ */
+ private static void cleanupDedicatedPersistenceDirs(String... clusterPrefix) {
+ for (String prefix : clusterPrefix) {
+ try (DirectoryStream<Path> ds = Files.newDirectoryStream(defaultWorkDirectory(),
+ path -> Files.isDirectory(path) && path.getFileName().toString().toLowerCase().startsWith(prefix))
+ ) {
+ for (Path dir : ds)
+ U.delete(dir);
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+
+ /**
+ * @return Collection of dedicated snapshot paths located in Ignite working directory.
+ */
+ private static Set<Path> findSnapshotParts(String prefix, String snpName) {
+ Set<Path> snpPaths = new HashSet<>();
+
+ try (DirectoryStream<Path> ds = Files.newDirectoryStream(defaultWorkDirectory(),
+ path -> Files.isDirectory(path) && path.getFileName().toString().toLowerCase().startsWith(prefix))
+ ) {
+ for (Path dir : ds)
+ snpPaths.add(searchDirectoryRecursively(dir, snpName)
+ .orElseThrow(() -> new IgniteException("Snapshot not found in the Ignite work directory " +
+ "[dir=" + dir.toString() + ", snpName=" + snpName + ']')));
+
+ return snpPaths;
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * @param grids Number of ignite instances to start.
+ * @param keys Number of keys to create.
+ * @param valMapper Factory which produces values.
+ * @param <V> Cache value type.
+ * @return Ignite coordinator instance.
+ * @throws Exception If fails.
+ */
+ private <V> IgniteEx startDedicatedGridsWithCache(
+ String prefix,
+ int grids,
+ int keys,
+ Function<Integer, V> valMapper,
+ CacheConfiguration<Integer, V>... ccfgs
+ ) throws Exception {
+ return startGridsWithCache(grids,
+ keys,
+ valMapper,
+ CLUSTER_DIR.apply(prefix),
+ ccfgs);
+ }
+
+ /**
+ * @param grids Number of ignite instances to start.
+ * @return Ignite coordinator instance.
+ * @throws Exception If fails.
+ */
+ private IgniteEx startDedicatedGrids(String prefix, int grids) throws Exception {
+ for (int g = 0; g < grids; g++)
+ startDedicatedGrid(prefix, g);
+
+ grid(0).events().localListen(e -> locEvts.add(e.type()), EVTS_CLUSTER_SNAPSHOT);
+
+ return grid(0);
+ }
+
+ /**
+ * @param prefix Grid work directory prefix.
+ * @param id Grid index.
+ * @return Grid instance.
+ * @throws Exception If fails.
+ */
+ private IgniteEx startDedicatedGrid(String prefix, int id) throws Exception {
+ IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(id)));
+ cfg.setWorkDirectory(CLUSTER_DIR.apply(prefix).apply(id, cfg));
+
+ return startGrid(cfg);
+ }
+
+ /**
+ * @return Default work directory.
+ */
+ private static Path defaultWorkDirectory() {
+ try {
+ return Paths.get(U.defaultWorkDirectory());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** */
+ private static void assertPartitionsDuplicates(List<Object> msgs) {
+ List<GroupPartitionId> all = new ArrayList<>();
+
+ for (Object o : msgs) {
+ SnapshotFilesRequestMessage msg0 = (SnapshotFilesRequestMessage)o;
+ Map<Integer, Set<Integer>> parts = msg0.parts();
+
+ for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
+ for (Integer partId : e.getValue())
+ all.add(new GroupPartitionId(e.getKey(), partId));
+ }
+ }
+
+ assertEquals(all.size(), new HashSet<>(all).size());
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
index 402f2f7..12dc32d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
@@ -47,6 +47,8 @@ import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCl
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotSelfTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRemoteRequestTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRestoreFromRemoteTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotWithMetastorageTest;
import org.apache.ignite.internal.processors.performancestatistics.CacheStartTest;
import org.apache.ignite.internal.processors.performancestatistics.CheckpointTest;
@@ -101,12 +103,14 @@ import org.junit.runners.Suite;
IgniteSnapshotManagerSelfTest.class,
IgniteClusterSnapshotSelfTest.class,
+ IgniteSnapshotRemoteRequestTest.class,
IgniteClusterSnapshotCheckTest.class,
IgniteSnapshotWithMetastorageTest.class,
IgniteSnapshotMXBeanTest.class,
IgniteClusterSnapshotRestoreSelfTest.class,
IgniteClusterSnapshotHandlerTest.class,
EncryptedSnapshotTest.class,
+ IgniteSnapshotRestoreFromRemoteTest.class,
IgniteClusterIdTagTest.class,
FullyConnectedComponentSearcherTest.class,
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
index 11f5f8c..7db3ff6 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
@@ -35,11 +35,14 @@ import org.apache.ignite.events.SnapshotEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
+import static java.util.Optional.ofNullable;
import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT;
import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED;
import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED;
@@ -54,9 +57,6 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
/** Number of cache keys to pre-create at node start. */
private static final int CACHE_KEYS_RANGE = 10_000;
- /** Cache value builder. */
- private Function<Integer, Object> valBuilder = new BinaryValueBuilder(TYPE_NAME);
-
/** {@inheritDoc} */
@Override protected <K, V> CacheConfiguration<K, V> txCacheConfig(CacheConfiguration<K, V> ccfg) {
return super.txCacheConfig(ccfg).setSqlIndexMaxInlineSize(255).setSqlSchema("PUBLIC")
@@ -67,11 +67,6 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
.setIndexes(Collections.singletonList(new QueryIndex("id")))));
}
- /** {@inheritDoc} */
- @Override protected Function<Integer, Object> valueBuilder() {
- return valBuilder;
- }
-
/** @throws Exception If failed. */
@Test
public void testBasicClusterSnapshotRestore() throws Exception {
@@ -82,6 +77,7 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
assertCacheKeys(client.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+ assertRebuildIndexes(client.cache(DEFAULT_CACHE_NAME), false);
waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
}
@@ -89,6 +85,8 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
/** @throws Exception If failed. */
@Test
public void testBasicClusterSnapshotRestoreWithMetadata() throws Exception {
+ valBuilder = new BinaryValueBuilder(TYPE_NAME);
+
IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
// Remove metadata.
@@ -101,6 +99,7 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE);
+ assertRebuildIndexes(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), false);
for (Ignite grid : G.allGrids())
assertNotNull(((IgniteEx)grid).context().cacheObjects().metadata(typeId));
@@ -111,9 +110,11 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
/** @throws Exception If failed. */
@Test
public void testClusterSnapshotRestoreOnBiggerTopology() throws Exception {
+ valBuilder = new BinaryValueBuilder(TYPE_NAME);
+
int nodesCnt = 4;
- startGridsWithCache(nodesCnt - 2, CACHE_KEYS_RANGE, valBuilder, dfltCacheCfg);
+ startGridsWithCache(nodesCnt - 2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
@@ -146,7 +147,11 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
awaitPartitionMapExchange();
- assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE);
+ for (Ignite g : G.allGrids())
+ ofNullable(indexRebuildFuture((IgniteEx)g, CU.cacheId(DEFAULT_CACHE_NAME))).orElse(new GridFinishedFuture<>()).get(TIMEOUT);
+
+ for (Ignite g : G.allGrids())
+ assertCacheKeys(g.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE);
GridTestUtils.waitForCondition(() -> evts.size() == 2, TIMEOUT);
assertEquals(2, evts.size());
@@ -168,12 +173,6 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
String nodeId = ctx.localNodeId().toString();
- assertTrue("nodeId=" + nodeId, grid.cache(cache.getName()).indexReadyFuture().isDone());
-
- // Make sure no index rebuild happened.
- assertEquals("nodeId=" + nodeId,
- 0, ctx.cache().cache(cache.getName()).context().cache().metrics0().getIndexRebuildKeysProcessed());
-
GridQueryProcessor qry = ((IgniteEx)grid).context().query();
// Make sure SQL works fine.
@@ -188,6 +187,23 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
}
}
+ /**
+ * @param cache Ignite cache.
+ * @param rebuild Rebuild index happened.
+ */
+ private void assertRebuildIndexes(IgniteCache<Object, Object> cache, boolean rebuild) {
+ for (Ignite grid : G.allGrids()) {
+ GridKernalContext ctx = ((IgniteEx)grid).context();
+
+ assertTrue("nodeId=" + ctx.localNodeId(), grid.cache(cache.getName()).indexReadyFuture().isDone());
+
+ // Make sure no index rebuild happened.
+ assertEquals("nodeId=" + ctx.localNodeId(),
+ rebuild, ctx.cache().cache(cache.getName()).context().cache().metrics0()
+ .getIndexRebuildKeysProcessed() > 0);
+ }
+ }
+
/** */
private static class IndexedValueBuilder implements Function<Integer, Object> {
/** {@inheritDoc} */