You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/03 07:21:54 UTC
[16/50] [abbrv] ignite git commit: GG-11860 Implement snapshot status
on platform level -refactoring RESTORE
GG-11860 Implement snapshot status on platform level
-refactoring RESTORE
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a62cc454
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a62cc454
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a62cc454
Branch: refs/heads/ignite-gg-8.0.3.ea6-clients-test
Commit: a62cc454466f921e2edd26bb0a6e0646bdb3a7f1
Parents: ef35f4f
Author: EdShangGG <es...@gridgain.com>
Authored: Thu Mar 2 16:33:58 2017 +0300
Committer: EdShangGG <es...@gridgain.com>
Committed: Thu Mar 2 16:34:27 2017 +0300
----------------------------------------------------------------------
.../pagemem/snapshot/SnapshotOperation.java | 3 +-
.../GridDhtPartitionsExchangeFuture.java | 119 +++++++++++++++++--
2 files changed, 109 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a62cc454/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
index 3f84b97..f3b5eee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
@@ -105,7 +105,8 @@ public class SnapshotOperation implements Serializable {
* @param op Op.
*/
public static Collection<File> getOptionalPathsParameter(SnapshotOperation op) {
- assert op.type() == SnapshotOperationType.CHECK || op.extraParameter() instanceof Collection;
+ assert (op.type() == SnapshotOperationType.CHECK || op.type() == SnapshotOperationType.RESTORE)
+ && (op.extraParameter() == null || op.extraParameter() instanceof Collection);
return (Collection<File>)op.extraParameter();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a62cc454/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 987ba54..4c179e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -48,6 +48,8 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
+import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperation;
+import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperationType;
import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
@@ -59,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -81,6 +84,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteRunnable;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -510,10 +514,36 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
exchange = onCacheChangeRequest(crdNode);
}
- else if (msg instanceof StartSnapshotOperationAckDiscoveryMessage)
+ else if (msg instanceof StartSnapshotOperationAckDiscoveryMessage) {
exchange = CU.clientNode(discoEvt.eventNode()) ?
onClientNodeEvent(crdNode) :
onServerNodeEvent(crdNode);
+
+ StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = (StartSnapshotOperationAckDiscoveryMessage)msg;
+
+ if (!cctx.localNode().isDaemon()) {
+ SnapshotOperation op = snapshotOperationMsg.snapshotOperation();
+
+ if (op.type() == SnapshotOperationType.RESTORE) {
+ if (reqs != null)
+ reqs = new ArrayList<>(reqs);
+ else
+ reqs = new ArrayList<>();
+
+ List<DynamicCacheChangeRequest> destroyRequests = getStopCacheRequests(
+ cctx.cache(), op.cacheNames(), cctx.localNodeId());
+
+ reqs.addAll(destroyRequests);
+
+ if (!reqs.isEmpty()) { //Emulate destroy cache request
+ if (op.type() == SnapshotOperationType.RESTORE)
+ cctx.cache().onCustomEvent(new DynamicCacheChangeBatch(reqs), topVer);
+
+ onCacheChangeRequest(crdNode);
+ }
+ }
+ }
+ }
else {
assert affChangeMsg != null : this;
@@ -578,6 +608,36 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
+ * @param cache Cache.
+ * @param cacheNames Cache names.
+ * @param locNodeId Local node id.
+ */
+ @NotNull public static List<DynamicCacheChangeRequest> getStopCacheRequests(GridCacheProcessor cache,
+ Set<String> cacheNames, UUID locNodeId) {
+ List<DynamicCacheChangeRequest> destroyRequests = new ArrayList<>();
+
+ for (String cacheName : cacheNames) {
+ DynamicCacheDescriptor desc = cache.cacheDescriptor(CU.cacheId(cacheName));
+
+ if (desc == null)
+ continue;
+
+ DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, locNodeId);
+
+ t.stop(true);
+ t.destroy(true);
+
+ t.deploymentId(desc.deploymentId());
+
+ t.restart(true);
+
+ destroyRequests.add(t);
+ }
+
+ return destroyRequests;
+ }
+
+ /**
* @throws IgniteCheckedException If failed.
*/
private void initTopologies() throws IgniteCheckedException {
@@ -806,19 +866,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
cctx.database().beforeExchange(this);
- // If a backup request, synchronously wait for backup start.
- if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
- DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)discoEvt).customMessage();
-
- if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage) {
- StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = (StartSnapshotOperationAckDiscoveryMessage)customMsg;
+ StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = getSnapshotOperationMessage();
- if (!cctx.localNode().isClient() && !cctx.localNode().isDaemon()) {
- IgniteInternalFuture fut = cctx.database().startLocalSnapshotOperation(snapshotOperationMsg);
+ // If it's a snapshot operation request, synchronously wait for backup start.
+ if (snapshotOperationMsg != null) {
+ if (!cctx.localNode().isClient() && !cctx.localNode().isDaemon()) {
+ SnapshotOperation op = snapshotOperationMsg.snapshotOperation();
- if (fut != null)
- fut.get();
- }
+ if (op.type() != SnapshotOperationType.RESTORE)
+ startLocalSnasphotOperation(snapshotOperationMsg);
}
}
@@ -833,6 +889,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
+ * @param snapshotOperationMsg Snapshot operation message.
+ */
+ private void startLocalSnasphotOperation(StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg
+ ) throws IgniteCheckedException {
+ IgniteInternalFuture fut = cctx.database().startLocalSnapshotOperation(snapshotOperationMsg);
+
+ if (fut != null)
+ fut.get();
+ }
+
+ /**
* @throws IgniteCheckedException If failed.
*/
private void waitPartitionRelease() throws IgniteCheckedException {
@@ -1168,6 +1235,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
cctx.cache().completeStartFuture(req);
}
+ StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = getSnapshotOperationMessage();
+
+ if (snapshotOperationMsg != null && !cctx.localNode().isClient() && !cctx.localNode().isDaemon()) {
+ SnapshotOperation op = snapshotOperationMsg.snapshotOperation();
+
+ if (op.type() == SnapshotOperationType.RESTORE)
+ try {
+ startLocalSnasphotOperation(snapshotOperationMsg);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Error while starting snapshot operation", e);
+ }
+ }
+
if (exchangeOnChangeGlobalState && err == null)
cctx.kernalContext().state().onExchangeDone();
@@ -1196,6 +1277,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
return dummy;
}
+ /**
+ *
+ */
+ private StartSnapshotOperationAckDiscoveryMessage getSnapshotOperationMessage() {
+ // If it's a snapshot operation request, synchronously wait for backup start.
+ if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+ DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)discoEvt).customMessage();
+
+ if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage)
+ return (StartSnapshotOperationAckDiscoveryMessage)customMsg;
+ }
+ return null;
+ }
+
/** {@inheritDoc} */
@Nullable @Override public Throwable validateCache(
GridCacheContext cctx,