You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/17 14:51:05 UTC
[5/6] ignite git commit: Done.
Done.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/deeee8cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/deeee8cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/deeee8cb
Branch: refs/heads/ignite-4565-ddl
Commit: deeee8cb5b05137303dbb06ba4d0180426f4dd43
Parents: 19381da
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 17:02:15 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 17:02:15 2017 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 25 ++++++++++++++------
.../processors/cache/GridCacheProcessor.java | 10 ++++++++
2 files changed, 28 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/deeee8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1ce8cfe..b4604e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
@@ -221,10 +222,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchFut = exchangeFuture(exchId, evt, cache,null, null);
}
else {
- DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt;
+ DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage();
- if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) {
- DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage();
+ if (customMsg instanceof DynamicCacheChangeBatch) {
+ DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
@@ -256,8 +257,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchFut = exchangeFuture(exchId, evt, cache, valid, null);
}
}
- else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) {
- CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customEvt.customMessage();
+ else if (customMsg instanceof CacheAffinityChangeMessage) {
+ CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg;
if (msg.exchangeId() == null) {
if (msg.exchangeNeeded()) {
@@ -266,8 +267,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchFut = exchangeFuture(exchId, evt, cache, null, msg);
}
}
- else
- exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
+ else {
+ exchangeFuture(msg.exchangeId(), null, null, null, null)
+ .onAffinityChangeMessage(evt.eventNode(), msg);
+ }
+ }
+ else {
+ // Process event as custom discovery task if needed.
+ CachePartitionExchangeWorkerTask task =
+ cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg);
+
+ if (task != null)
+ exchWorker.addCustomTask(task);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/deeee8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 459cf3a..a7d38a7 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -367,6 +367,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * Create exchange worker task for custom discovery message.
+ *
+ * @param msg Custom discovery message.
+ * @return Task or {@code null} if message doesn't require any special processing.
+ */
+ public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg) {
+ return null;
+ }
+
+ /**
* Process custom exchange task.
*
* @param task Task.