You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/17 14:51:05 UTC

[5/6] ignite git commit: Done.

Done.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/deeee8cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/deeee8cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/deeee8cb

Branch: refs/heads/ignite-4565-ddl
Commit: deeee8cb5b05137303dbb06ba4d0180426f4dd43
Parents: 19381da
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 17:02:15 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 17:02:15 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 25 ++++++++++++++------
 .../processors/cache/GridCacheProcessor.java    | 10 ++++++++
 2 files changed, 28 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/deeee8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1ce8cfe..b4604e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
@@ -221,10 +222,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     exchFut = exchangeFuture(exchId, evt, cache,null, null);
                 }
                 else {
-                    DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt;
+                    DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage();
 
-                    if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) {
-                        DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage();
+                    if (customMsg instanceof DynamicCacheChangeBatch) {
+                        DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
 
                         Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
 
@@ -256,8 +257,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             exchFut = exchangeFuture(exchId, evt, cache, valid, null);
                         }
                     }
-                    else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) {
-                        CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customEvt.customMessage();
+                    else if (customMsg instanceof CacheAffinityChangeMessage) {
+                        CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg;
 
                         if (msg.exchangeId() == null) {
                             if (msg.exchangeNeeded()) {
@@ -266,8 +267,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                 exchFut = exchangeFuture(exchId, evt, cache, null, msg);
                             }
                         }
-                        else
-                            exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
+                        else {
+                            exchangeFuture(msg.exchangeId(), null, null, null, null)
+                                .onAffinityChangeMessage(evt.eventNode(), msg);
+                        }
+                    }
+                    else {
+                        // Process event as custom discovery task if needed.
+                        CachePartitionExchangeWorkerTask task =
+                            cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg);
+
+                        if (task != null)
+                            exchWorker.addCustomTask(task);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/deeee8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 459cf3a..a7d38a7 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -367,6 +367,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Create exchange worker task for custom discovery message.
+     *
+     * @param msg Custom discovery message.
+     * @return Task or {@code null} if message doesn't require any special processing.
+     */
+    public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg) {
+        return null;
+    }
+
+    /**
      * Process custom exchange task.
      *
      * @param task Task.