You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/03 07:21:54 UTC

[16/50] [abbrv] ignite git commit: GG-11860 Implement snapshot status on platform level -refactoring RESTORE

GG-11860 Implement snapshot status on platform level
-refactoring RESTORE


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a62cc454
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a62cc454
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a62cc454

Branch: refs/heads/ignite-gg-8.0.3.ea6-clients-test
Commit: a62cc454466f921e2edd26bb0a6e0646bdb3a7f1
Parents: ef35f4f
Author: EdShangGG <es...@gridgain.com>
Authored: Thu Mar 2 16:33:58 2017 +0300
Committer: EdShangGG <es...@gridgain.com>
Committed: Thu Mar 2 16:34:27 2017 +0300

----------------------------------------------------------------------
 .../pagemem/snapshot/SnapshotOperation.java     |   3 +-
 .../GridDhtPartitionsExchangeFuture.java        | 119 +++++++++++++++++--
 2 files changed, 109 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a62cc454/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
index 3f84b97..f3b5eee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
@@ -105,7 +105,8 @@ public class SnapshotOperation implements Serializable {
      * @param op Op.
      */
     public static Collection<File> getOptionalPathsParameter(SnapshotOperation op) {
-        assert op.type() == SnapshotOperationType.CHECK || op.extraParameter() instanceof Collection;
+        assert (op.type() == SnapshotOperationType.CHECK || op.type() == SnapshotOperationType.RESTORE)
+            && (op.extraParameter() == null || op.extraParameter() instanceof Collection);
 
         return (Collection<File>)op.extraParameter();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a62cc454/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 987ba54..4c179e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -48,6 +48,8 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
+import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperation;
+import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperationType;
 import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
@@ -59,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -81,6 +84,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteRunnable;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -510,10 +514,36 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                     exchange = onCacheChangeRequest(crdNode);
                 }
-                else if (msg instanceof StartSnapshotOperationAckDiscoveryMessage)
+                else if (msg instanceof StartSnapshotOperationAckDiscoveryMessage) {
                     exchange = CU.clientNode(discoEvt.eventNode()) ?
                         onClientNodeEvent(crdNode) :
                         onServerNodeEvent(crdNode);
+
+                    StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = (StartSnapshotOperationAckDiscoveryMessage)msg;
+
+                    if (!cctx.localNode().isDaemon()) {
+                        SnapshotOperation op = snapshotOperationMsg.snapshotOperation();
+
+                        if (op.type() == SnapshotOperationType.RESTORE) {
+                            if (reqs != null)
+                                reqs = new ArrayList<>(reqs);
+                            else
+                                reqs = new ArrayList<>();
+
+                            List<DynamicCacheChangeRequest> destroyRequests = getStopCacheRequests(
+                                cctx.cache(), op.cacheNames(), cctx.localNodeId());
+
+                            reqs.addAll(destroyRequests);
+
+                            if (!reqs.isEmpty()) { //Emulate destroy cache request
+                                if (op.type() == SnapshotOperationType.RESTORE)
+                                    cctx.cache().onCustomEvent(new DynamicCacheChangeBatch(reqs), topVer);
+
+                                onCacheChangeRequest(crdNode);
+                            }
+                        }
+                    }
+                }
                 else {
                     assert affChangeMsg != null : this;
 
@@ -578,6 +608,36 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
+     * @param cache Cache.
+     * @param cacheNames Cache names.
+     * @param locNodeId Local node id.
+     */
+    @NotNull public static List<DynamicCacheChangeRequest> getStopCacheRequests(GridCacheProcessor cache,
+        Set<String> cacheNames, UUID locNodeId) {
+        List<DynamicCacheChangeRequest> destroyRequests = new ArrayList<>();
+
+        for (String cacheName : cacheNames) {
+            DynamicCacheDescriptor desc = cache.cacheDescriptor(CU.cacheId(cacheName));
+
+            if (desc == null)
+                continue;
+
+            DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, locNodeId);
+
+            t.stop(true);
+            t.destroy(true);
+
+            t.deploymentId(desc.deploymentId());
+
+            t.restart(true);
+
+            destroyRequests.add(t);
+        }
+
+        return destroyRequests;
+    }
+
+    /**
      * @throws IgniteCheckedException If failed.
      */
     private void initTopologies() throws IgniteCheckedException {
@@ -806,19 +866,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         cctx.database().beforeExchange(this);
 
-        // If a backup request, synchronously wait for backup start.
-        if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
-            DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)discoEvt).customMessage();
-
-            if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage) {
-                StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = (StartSnapshotOperationAckDiscoveryMessage)customMsg;
+        StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = getSnapshotOperationMessage();
 
-                if (!cctx.localNode().isClient() && !cctx.localNode().isDaemon()) {
-                    IgniteInternalFuture fut = cctx.database().startLocalSnapshotOperation(snapshotOperationMsg);
+        // If it's a snapshot operation request, synchronously wait for backup start.
+        if (snapshotOperationMsg != null) {
+            if (!cctx.localNode().isClient() && !cctx.localNode().isDaemon()) {
+                SnapshotOperation op = snapshotOperationMsg.snapshotOperation();
 
-                    if (fut != null)
-                        fut.get();
-                }
+                if (op.type() != SnapshotOperationType.RESTORE)
+                    startLocalSnasphotOperation(snapshotOperationMsg);
             }
         }
 
@@ -833,6 +889,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
+     * @param snapshotOperationMsg Snapshot operation message.
+     */
+    private void startLocalSnasphotOperation(StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg
+    ) throws IgniteCheckedException {
+        IgniteInternalFuture fut = cctx.database().startLocalSnapshotOperation(snapshotOperationMsg);
+
+        if (fut != null)
+            fut.get();
+    }
+
+    /**
      * @throws IgniteCheckedException If failed.
      */
     private void waitPartitionRelease() throws IgniteCheckedException {
@@ -1168,6 +1235,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 cctx.cache().completeStartFuture(req);
         }
 
+        StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = getSnapshotOperationMessage();
+
+        if (snapshotOperationMsg != null && !cctx.localNode().isClient() && !cctx.localNode().isDaemon()) {
+            SnapshotOperation op = snapshotOperationMsg.snapshotOperation();
+
+            if (op.type() == SnapshotOperationType.RESTORE)
+                try {
+                    startLocalSnasphotOperation(snapshotOperationMsg);
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Error while starting snapshot operation", e);
+                }
+        }
+
         if (exchangeOnChangeGlobalState && err == null)
             cctx.kernalContext().state().onExchangeDone();
 
@@ -1196,6 +1277,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         return dummy;
     }
 
+    /**
+     *
+     */
+    private StartSnapshotOperationAckDiscoveryMessage getSnapshotOperationMessage() {
+        // If it's a snapshot operation request, synchronously wait for backup start.
+        if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+            DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)discoEvt).customMessage();
+
+            if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage)
+                return  (StartSnapshotOperationAckDiscoveryMessage)customMsg;
+        }
+        return null;
+    }
+
     /** {@inheritDoc} */
     @Nullable @Override public Throwable validateCache(
         GridCacheContext cctx,