You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/25 14:01:35 UTC
[14/14] ignite git commit: ignite-6667
ignite-6667
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/46501d92
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/46501d92
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/46501d92
Branch: refs/heads/ignite-6667
Commit: 46501d92b7c07c1bdeb86c27a6f0e170adb10f12
Parents: 7c8ffc4
Author: sboikov <sb...@gridgain.com>
Authored: Wed Oct 25 16:52:40 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Oct 25 17:01:10 2017 +0300
----------------------------------------------------------------------
.../internal/managers/discovery/DiscoCache.java | 31 ++---
.../discovery/DiscoveryCustomMessage.java | 10 +-
.../discovery/GridDiscoveryManager.java | 132 +++++--------------
.../cache/CacheAffinityChangeMessage.java | 7 +-
.../ClientCacheChangeDiscoveryMessage.java | 5 +-
.../ClientCacheChangeDummyDiscoveryMessage.java | 5 +-
.../cache/DynamicCacheChangeBatch.java | 11 +-
.../binary/MetadataUpdateAcceptedMessage.java | 4 +-
.../binary/MetadataUpdateProposedMessage.java | 4 +-
.../cluster/ChangeGlobalStateFinishMessage.java | 7 +-
.../cluster/ChangeGlobalStateMessage.java | 9 +-
.../continuous/AbstractContinuousMessage.java | 5 +-
.../marshaller/MappingAcceptedMessage.java | 4 +-
.../marshaller/MappingProposedMessage.java | 5 +-
.../message/SchemaAbstractDiscoveryMessage.java | 4 +-
.../IgniteDiscoveryCacheReuseSelfTest.java | 28 +++-
16 files changed, 106 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/46501d92/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 32aa23b..9ed70aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -17,6 +17,12 @@
package org.apache.ignite.internal.managers.discovery;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
@@ -29,13 +35,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteProductVersion;
import org.jetbrains.annotations.Nullable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
/**
*
*/
@@ -95,6 +94,7 @@ public class DiscoCache {
* @param cacheGrpAffNodes Affinity nodes by cache group ID.
* @param nodeMap Node map.
* @param alives Alive nodes.
+ * @param minNodeVer Minimum node version.
*/
DiscoCache(
AffinityTopologyVersion topVer,
@@ -109,7 +109,7 @@ public class DiscoCache {
Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
Map<UUID, ClusterNode> nodeMap,
Set<UUID> alives,
- IgniteProductVersion minVer) {
+ IgniteProductVersion minNodeVer) {
this.topVer = topVer;
this.state = state;
this.loc = loc;
@@ -122,7 +122,7 @@ public class DiscoCache {
this.cacheGrpAffNodes = cacheGrpAffNodes;
this.nodeMap = nodeMap;
this.alives.addAll(alives);
- minNodeVer = minVer;
+ this.minNodeVer = minNodeVer;
}
/**
@@ -316,17 +316,14 @@ public class DiscoCache {
}
/**
- * Returns copy of discovery cache suitable for further reuse.
- *
- * @param ver Version.
- * @param st State.
- *
- * @return Copy.
+ * @param ver Topology version.
+ * @param state Not {@code null} state if need override state, otherwise current state is used.
+ * @return Copy of discovery cache with new version.
*/
- public DiscoCache copy(AffinityTopologyVersion ver, @Nullable DiscoveryDataClusterState st) {
+ public DiscoCache copy(AffinityTopologyVersion ver, @Nullable DiscoveryDataClusterState state) {
return new DiscoCache(
ver,
- st == null ? state : st,
+ state == null ? this.state : state,
loc,
rmtNodes,
allNodes,
http://git-wip-us.apache.org/repos/asf/ignite/blob/46501d92/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
index 2f4c5fe..c708c62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.managers.discovery;
import java.io.Serializable;
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
@@ -93,15 +92,12 @@ public interface DiscoveryCustomMessage extends Serializable {
public boolean isMutable();
/**
- * Reuses current discovery cache in most efficient way for given message type.
+ * Creates new discovery cache if message caused topology version change.
*
- * @param ctx Context.
+ * @param mgr Discovery manager.
* @param topVer New topology version.
* @param discoCache Current discovery cache.
* @return Reused discovery cache.
- *
- * @throws UnsupportedOperationException If message doesn't support discovery cache reuse.
*/
- public DiscoCache reuseDiscoCache(GridKernalContext ctx, AffinityTopologyVersion topVer,
- DiscoCache discoCache) throws UnsupportedOperationException;
+ public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46501d92/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 03d46de..ad32116 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -69,7 +69,6 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessage;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
@@ -78,7 +77,6 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
-import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
@@ -603,7 +601,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (snapshots != null)
topHist = snapshots;
- boolean verChanged, incMinorTopVer = false;
+ boolean verChanged;
if (type == EVT_NODE_METRICS_UPDATED)
verChanged = false;
@@ -627,32 +625,20 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
updateClientNodes(node.id());
}
- DiscoCache discoCache = null;
-
boolean locJoinEvt = type == EVT_NODE_JOINED && node.id().equals(locNode.id());
- IgniteInternalFuture<Boolean> transitionWaitFut = null;
-
ChangeGlobalStateFinishMessage stateFinishMsg = null;
- if (locJoinEvt) {
- discoCache = createDiscoCache(new AffinityTopologyVersion(topVer, minorTopVer),
- ctx.state().clusterState(),
- locNode,
- topSnapshot);
-
- transitionWaitFut = ctx.state().onLocalJoin(discoCache);
- }
- else if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT)
+ if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT)
stateFinishMsg = ctx.state().onNodeLeft(node);
final AffinityTopologyVersion nextTopVer;
- final Snapshot snapshot = topSnap.get();
-
if (type == EVT_DISCOVERY_CUSTOM_EVT) {
assert customMsg != null;
+ boolean incMinorTopVer;
+
if (customMsg instanceof ChangeGlobalStateMessage) {
incMinorTopVer = ctx.state().onStateChangeMessage(
new AffinityTopologyVersion(topVer, minorTopVer),
@@ -660,12 +646,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
discoCache());
}
else if (customMsg instanceof ChangeGlobalStateFinishMessage) {
- final ChangeGlobalStateFinishMessage finishMsg = (ChangeGlobalStateFinishMessage)customMsg;
+ ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg);
+
+ Snapshot snapshot = topSnap.get();
- ctx.state().onStateFinishMessage(finishMsg);
+ // Topology version does not change, but need create DiscoCache with new state.
+ DiscoCache discoCache = snapshot.discoCache.copy(snapshot.topVer, ctx.state().clusterState());
- topSnap.set(new Snapshot(snapshot.topVer, (discoCache =
- finishMsg.reuseDiscoCache(ctx, snapshot.topVer, snapshot.discoCache))));
+ topSnap.set(new Snapshot(snapshot.topVer, discoCache));
+
+ incMinorTopVer = false;
}
else {
incMinorTopVer = ctx.cache().onCustomEvent(
@@ -704,18 +694,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
}
+ DiscoCache discoCache;
+
// Put topology snapshot into discovery history.
// There is no race possible between history maintenance and concurrent discovery
// event notifications, since SPI notifies manager about all events from this listener.
if (verChanged) {
- if (discoCache == null) {
- discoCache = incMinorTopVer ?
- customMsg.reuseDiscoCache(ctx, nextTopVer, snapshot.discoCache) :
- createDiscoCache(nextTopVer,
- ctx.state().clusterState(),
- locNode,
- topSnapshot);
+ Snapshot snapshot = topSnap.get();
+
+ if (customMsg == null) {
+ discoCache = createDiscoCache(nextTopVer,
+ ctx.state().clusterState(),
+ locNode,
+ topSnapshot);
}
+ else
+ discoCache = customMsg.createDiscoCache(GridDiscoveryManager.this, nextTopVer, snapshot.discoCache);
discoCacheHist.put(nextTopVer, discoCache);
@@ -736,10 +730,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (gridStartTime == 0)
gridStartTime = getSpi().getGridStartTime();
- final AffinityTopologyVersion locTopVer = new AffinityTopologyVersion(locNode.order());
-
- if (topSnap.get().topVer.compareTo(locTopVer) < 0)
- topSnap.set(new Snapshot(locTopVer, discoCache));
+ topSnap.set(new Snapshot(nextTopVer, discoCache));
startLatch.countDown();
@@ -756,6 +747,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (!isLocDaemon && !ctx.clientDisconnected())
ctx.cache().context().exchange().onLocalJoin(discoEvt, discoCache);
+ IgniteInternalFuture<Boolean> transitionWaitFut = ctx.state().onLocalJoin(discoCache);
+
locJoin.onDone(new DiscoveryLocalJoinData(discoEvt,
discoCache,
transitionWaitFut,
@@ -2222,6 +2215,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size());
ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size());
+ IgniteProductVersion minVer = null;
+
for (ClusterNode node : topSnapshot) {
if (alive(node))
alives.add(node.id());
@@ -2239,6 +2234,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
nodeMap.put(node.id(), node);
+
+ if (minVer == null)
+ minVer = node.version();
+ else if (node.version().compareTo(minVer) < 0)
+ minVer = node.version();
}
assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" +
@@ -2250,17 +2250,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
fillAffinityNodeCaches(allNodes, allCacheNodes, cacheGrpAffNodes, rmtNodesWithCaches);
- IgniteProductVersion minVer = null;
-
- for (int i = 0; i < allNodes.size(); i++) {
- ClusterNode node = allNodes.get(i);
-
- if (minVer == null)
- minVer = node.version();
- else if (node.version().compareTo(minVer) < 0)
- minVer = node.version();
- }
-
return new DiscoCache(
topVer,
state,
@@ -3070,13 +3059,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Reuses discovery cache for {@link DynamicCacheChangeBatch}
+ * Creates discovery cache after {@link DynamicCacheChangeBatch} received.
*
- * @param msg Message.
* @param topVer Topology version.
- * @param discoCache Disco cache.
+ * @param discoCache Current disco cache.
*/
- public DiscoCache reuseDiscoCache(DynamicCacheChangeBatch msg, AffinityTopologyVersion topVer,
+ public DiscoCache createDiscoCacheOnCacheChange(AffinityTopologyVersion topVer,
DiscoCache discoCache) {
List<ClusterNode> allNodes = discoCache.allNodes();
Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
@@ -3100,52 +3088,4 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
discoCache.alives,
discoCache.minimumNodeVersion());
}
-
- /**
- * Reuses discovery cache for {@link CacheAffinityChangeMessage}
- *
- * @param msg Message.
- * @param topVer Topology version.
- * @param discoCache Disco cache.
- */
- public DiscoCache reuseDiscoCache(CacheAffinityChangeMessage msg, AffinityTopologyVersion topVer,
- DiscoCache discoCache) {
- return discoCache.copy(topVer, null);
- }
-
- /**
- * Reuses discovery cache for {@link SnapshotDiscoveryMessage}
- *
- * @param msg Message.
- * @param topVer Topology version.
- * @param discoCache Disco cache.
- */
- public DiscoCache reuseDiscoCache(SnapshotDiscoveryMessage msg, AffinityTopologyVersion topVer,
- DiscoCache discoCache) {
- return discoCache.copy(topVer, null);
- }
-
- /**
- * Reuses discovery cache for {@link ChangeGlobalStateMessage}
- *
- * @param msg Message.
- * @param topVer Topology version.
- * @param discoCache Disco cache.
- */
- public DiscoCache reuseDiscoCache(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer,
- DiscoCache discoCache) {
- return discoCache.copy(topVer, ctx.state().clusterState());
- }
-
- /**
- * Reuses discovery cache for {@link ChangeGlobalStateFinishMessage}
- *
- * @param msg Message.
- * @param topVer Topology version.
- * @param discoCache Disco cache.
- */
- public DiscoCache reuseDiscoCache(ChangeGlobalStateFinishMessage msg, AffinityTopologyVersion topVer,
- DiscoCache discoCache) {
- return discoCache.copy(topVer, ctx.state().clusterState());
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46501d92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
index 2897c55..fe1014c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
@@ -20,10 +20,9 @@ package org.apache.ignite.internal.processors.cache;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
@@ -157,9 +156,9 @@ public class CacheAffinityChangeMessage implements DiscoveryCustomMessage {
}
/** {@inheritDoc} */
- @Nullable @Override public DiscoCache reuseDiscoCache(GridKernalContext ctx,
+ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
- return ctx.discovery().reuseDiscoCache(this, topVer, discoCache);
+ return discoCache.copy(topVer, null);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/46501d92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
index 22e40c8..e35d80e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
@@ -22,10 +22,9 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -174,7 +173,7 @@ public class ClientCacheChangeDiscoveryMessage implements DiscoveryCustomMessage
}
/** {@inheritDoc} */
- @Nullable @Override public DiscoCache reuseDiscoCache(GridKernalContext ctx,
+ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46501d92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
index afa2081..6ed3ecc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
@@ -20,10 +20,9 @@ package org.apache.ignite.internal.processors.cache;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -106,7 +105,7 @@ public class ClientCacheChangeDummyDiscoveryMessage implements DiscoveryCustomMe
}
/** {@inheritDoc} */
- @Nullable @Override public DiscoCache reuseDiscoCache(GridKernalContext ctx,
+ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46501d92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index d266262..83459a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -19,10 +19,9 @@ package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
import java.util.Set;
-
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -78,9 +77,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
}
/** {@inheritDoc} */
- @Nullable @Override public DiscoCache reuseDiscoCache(GridKernalContext ctx,
- AffinityTopologyVersion topVer, DiscoCache discoCache) {
- return ctx.discovery().reuseDiscoCache(this, topVer, discoCache);
+ @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer,
+ DiscoCache discoCache) {
+ return mgr.createDiscoCacheOnCacheChange(topVer, discoCache);
}
/**
@@ -147,4 +146,4 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
@Override public String toString() {
return S.toString(DynamicCacheChangeBatch.class, this);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/46501d92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
index 8a78657..0416746 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
@@ -16,9 +16,9 @@
*/
package org.apache.ignite.internal.processors.cache.binary;
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
@@ -71,7 +71,7 @@ public class MetadataUpdateAcceptedMessage implements DiscoveryCustomMessage {
}
/** {@inheritDoc} */
- @Nullable @Override public DiscoCache reuseDiscoCache(GridKernalContext ctx,
+ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46501d92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
index 50c9628..f9bd660 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
@@ -18,11 +18,11 @@ package org.apache.ignite.internal.processors.cache.binary;
import java.util.UUID;
import org.apache.ignite.binary.BinaryObjectException;
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.binary.BinaryMetadataHandler;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
@@ -134,7 +134,7 @@ public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessa
}
/** {@inheritDoc} */
- @Nullable @Override public DiscoCache reuseDiscoCache(GridKernalContext ctx,
+ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46501d92/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
index 7f094c7..a1fbacf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
@@ -18,10 +18,9 @@
package org.apache.ignite.internal.processors.cluster;
import java.util.UUID;
-
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
@@ -84,9 +83,9 @@ public class ChangeGlobalStateFinishMessage implements DiscoveryCustomMessage {
}
/** {@inheritDoc} */
- @Nullable @Override public DiscoCache reuseDiscoCache(GridKernalContext ctx,
+ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
- return ctx.discovery().reuseDiscoCache(this, topVer, discoCache);
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/46501d92/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
index 2507ec9..22e376f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
@@ -19,10 +19,9 @@ package org.apache.ignite.internal.processors.cluster;
import java.util.List;
import java.util.UUID;
-
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
@@ -117,9 +116,9 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
}
/** {@inheritDoc} */
- @Nullable @Override public DiscoCache reuseDiscoCache(GridKernalContext ctx,
- AffinityTopologyVersion topVer, DiscoCache discoCache) {
- return ctx.discovery().reuseDiscoCache(this, topVer, discoCache);
+ @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer,
+ DiscoCache discoCache) {
+ return discoCache.copy(topVer, null);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/46501d92/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
index cdf3c1d..e9754d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
@@ -18,10 +18,9 @@
package org.apache.ignite.internal.processors.continuous;
import java.util.UUID;
-
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -64,7 +63,7 @@ public abstract class AbstractContinuousMessage implements DiscoveryCustomMessag
}
/** {@inheritDoc} */
- @Nullable @Override public DiscoCache reuseDiscoCache(GridKernalContext ctx,
+ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46501d92/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
index c47aa27..7af0559 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.processors.marshaller;
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
@@ -63,7 +63,7 @@ public class MappingAcceptedMessage implements DiscoveryCustomMessage {
}
/** {@inheritDoc} */
- @Nullable @Override public DiscoCache reuseDiscoCache(GridKernalContext ctx,
+ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46501d92/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
index 8cf8e61..b4e13fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
@@ -18,10 +18,9 @@
package org.apache.ignite.internal.processors.marshaller;
import java.util.UUID;
-
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -99,7 +98,7 @@ public class MappingProposedMessage implements DiscoveryCustomMessage {
}
/** {@inheritDoc} */
- @Nullable @Override public DiscoCache reuseDiscoCache(GridKernalContext ctx,
+ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46501d92/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
index e567fdc..f55eae0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.processors.query.schema.message;
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -56,7 +56,7 @@ public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomM
}
/** {@inheritDoc} */
- @Nullable @Override public DiscoCache reuseDiscoCache(GridKernalContext ctx,
+ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46501d92/modules/core/src/test/java/org/apache/ignite/spi/discovery/IgniteDiscoveryCacheReuseSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/IgniteDiscoveryCacheReuseSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/IgniteDiscoveryCacheReuseSelfTest.java
index 034bfa8..c238a9a 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/IgniteDiscoveryCacheReuseSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/IgniteDiscoveryCacheReuseSelfTest.java
@@ -18,20 +18,38 @@
package org.apache.ignite.spi.discovery;
import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
/**
* Tests discovery cache reuse between topology events.
*/
public class IgniteDiscoveryCacheReuseSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ return cfg;
+ }
+
/**
* Tests correct reuse of discovery cache.
+ *
+ * @throws Exception If failed.
*/
public void testDiscoCacheReuseOnNodeJoin() throws Exception {
startGridsMultiThreaded(2);
@@ -45,11 +63,10 @@ public class IgniteDiscoveryCacheReuseSelfTest extends GridCommonAbstractTest {
* @param v1 First version.
* @param v2 Next version.
*/
- private void assertDiscoCacheReuse(AffinityTopologyVersion v1,
- AffinityTopologyVersion v2) {
+ private void assertDiscoCacheReuse(AffinityTopologyVersion v1, AffinityTopologyVersion v2) {
for (Ignite ignite : G.allGrids()) {
GridBoundedConcurrentLinkedHashMap<AffinityTopologyVersion, DiscoCache> discoCacheHist =
- U.field(((IgniteEx) ignite).context().discovery(), "discoCacheHist");
+ U.field(((IgniteEx) ignite).context().discovery(), "discoCacheHist");
DiscoCache discoCache1 = discoCacheHist.get(v1);
DiscoCache discoCache2 = discoCacheHist.get(v2);
@@ -58,8 +75,8 @@ public class IgniteDiscoveryCacheReuseSelfTest extends GridCommonAbstractTest {
assertEquals(v2, discoCache2.version());
String[] props = new String[] {
- "state", "loc", "rmtNodes", "allNodes", "srvNodes", "daemonNodes", "rmtNodesWithCaches",
- "allCacheNodes", "allCacheNodes", "cacheGrpAffNodes", "nodeMap", "minNodeVer"
+ "state", "loc", "rmtNodes", "allNodes", "srvNodes", "daemonNodes", "rmtNodesWithCaches",
+ "allCacheNodes", "allCacheNodes", "cacheGrpAffNodes", "nodeMap", "minNodeVer"
};
for (String prop : props)
@@ -67,7 +84,6 @@ public class IgniteDiscoveryCacheReuseSelfTest extends GridCommonAbstractTest {
assertNotSame(U.field(discoCache1, "alives"), U.field(discoCache2, "alives"));
assertEquals(U.field(discoCache1, "alives"), U.field(discoCache2, "alives"));
-
}
}
}
\ No newline at end of file