You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/21 10:22:32 UTC
[09/10] ignite git commit: ignite-6124 Merge exchanges for multiple
discovery events
ignite-6124 Merge exchanges for multiple discovery events
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bebf2997
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bebf2997
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bebf2997
Branch: refs/heads/master
Commit: bebf299799712b464ee0e3800752ecc07770d9f0
Parents: b8b8064
Author: sboikov <sb...@gridgain.com>
Authored: Mon Aug 21 13:21:44 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Aug 21 13:21:45 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 3 +
.../java/org/apache/ignite/TestDebugLog.java | 219 ++
.../internal/IgniteDiagnosticMessage.java | 2 +-
.../communication/GridIoMessageFactory.java | 9 +-
.../internal/managers/discovery/DiscoCache.java | 79 +-
.../discovery/GridDiscoveryManager.java | 28 +-
.../affinity/AffinityTopologyVersion.java | 7 +
.../affinity/GridAffinityAssignmentCache.java | 42 +
.../affinity/GridAffinityProcessor.java | 8 +-
.../cache/CacheAffinitySharedManager.java | 695 ++++--
.../processors/cache/CacheGroupContext.java | 18 +-
.../cache/CachePartitionExchangeWorkerTask.java | 5 +-
.../ClientCacheChangeDummyDiscoveryMessage.java | 5 +
.../cache/ClientCacheUpdateTimeout.java | 5 +
.../processors/cache/ClusterCachesInfo.java | 22 +-
.../processors/cache/ExchangeContext.java | 131 ++
.../cache/ExchangeDiscoveryEvents.java | 262 +++
.../processors/cache/GridCacheAdapter.java | 8 +-
.../processors/cache/GridCacheContext.java | 2 +-
.../processors/cache/GridCacheIoManager.java | 57 +-
.../processors/cache/GridCacheMapEntry.java | 6 +-
.../GridCachePartitionExchangeManager.java | 413 +++-
.../processors/cache/GridCacheProcessor.java | 14 +-
.../dht/ClientCacheDhtTopologyFuture.java | 12 +-
.../dht/GridClientPartitionTopology.java | 130 +-
.../distributed/dht/GridDhtCacheAdapter.java | 18 +-
.../distributed/dht/GridDhtLocalPartition.java | 4 +-
.../dht/GridDhtPartitionTopology.java | 37 +-
.../dht/GridDhtPartitionTopologyImpl.java | 492 +++--
.../dht/GridDhtPartitionsReservation.java | 2 +-
.../distributed/dht/GridDhtTopologyFuture.java | 36 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 95 +-
.../dht/GridPartitionedGetFuture.java | 4 +-
.../dht/GridPartitionedSingleGetFuture.java | 4 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 2 +-
.../dht/atomic/GridDhtAtomicCache.java | 31 +-
.../GridNearAtomicSingleUpdateFuture.java | 1 -
.../colocated/GridDhtColocatedLockFuture.java | 2 +-
.../preloader/CacheGroupAffinityMessage.java | 339 +++
.../preloader/ForceRebalanceExchangeTask.java | 5 +
.../preloader/GridDhtPartitionExchangeId.java | 11 +
.../dht/preloader/GridDhtPartitionMap.java | 2 +-
.../dht/preloader/GridDhtPartitionSupplier.java | 2 +-
.../GridDhtPartitionsAbstractMessage.java | 37 +-
.../GridDhtPartitionsExchangeFuture.java | 1976 +++++++++++++-----
.../preloader/GridDhtPartitionsFullMessage.java | 161 +-
.../GridDhtPartitionsSingleMessage.java | 78 +-
.../GridDhtPartitionsSingleRequest.java | 47 +-
.../dht/preloader/GridDhtPreloader.java | 44 +-
.../IgniteDhtPartitionCountersMap.java | 7 +
.../dht/preloader/InitNewCoordinatorFuture.java | 307 +++
.../RebalanceReassignExchangeTask.java | 5 +
.../distributed/near/GridNearCacheAdapter.java | 2 +-
.../distributed/near/GridNearGetFuture.java | 4 +-
.../distributed/near/GridNearLockFuture.java | 2 +-
...arOptimisticSerializableTxPrepareFuture.java | 1 +
.../near/GridNearOptimisticTxPrepareFuture.java | 1 +
.../GridNearPessimisticTxPrepareFuture.java | 1 +
.../near/GridNearTxPrepareRequest.java | 14 +
.../GridCacheDatabaseSharedManager.java | 6 +-
.../cache/query/GridCacheQueryAdapter.java | 4 +-
.../cache/transactions/IgniteTxAdapter.java | 2 +-
.../cache/transactions/IgniteTxHandler.java | 184 +-
.../closure/GridClosureProcessor.java | 36 +-
.../cluster/GridClusterStateProcessor.java | 2 +-
.../datastreamer/DataStreamProcessor.java | 57 +-
.../datastreamer/DataStreamerImpl.java | 65 +-
.../datastreamer/PlatformDataStreamer.java | 3 +-
.../query/schema/SchemaExchangeWorkerTask.java | 5 +
.../SchemaNodeLeaveExchangeWorkerTask.java | 5 +
.../processors/task/GridTaskWorker.java | 8 +-
.../org/apache/ignite/thread/IgniteThread.java | 9 +
.../internal/TestDelayingCommunicationSpi.java | 63 +
...CacheExchangeMessageDuplicatedStateTest.java | 9 +-
.../IgniteClientCacheStartFailoverTest.java | 4 +-
.../IgniteClusterActivateDeactivateTest.java | 4 +-
.../cache/IgniteDynamicCacheStartSelfTest.java | 26 +-
...niteTopologyValidatorGridSplitCacheTest.java | 6 +-
...AffinityCoordinatorDynamicStartStopTest.java | 2 +-
...eAbstractDataStructuresFailoverSelfTest.java | 7 +-
.../distributed/CacheExchangeMergeTest.java | 1528 ++++++++++++++
.../CacheLateAffinityAssignmentTest.java | 598 ++++--
...CacheLoadingConcurrentGridStartSelfTest.java | 11 +
.../CacheLockReleaseNodeLeaveTest.java | 13 +-
.../distributed/CachePartitionStateTest.java | 18 +-
...ncurrentGridStartSelfTestAllowOverwrite.java | 33 +
...niteCacheClientNodeChangingTopologyTest.java | 5 +-
...teCacheClientNodePartitionsExchangeTest.java | 52 +-
...ePrimaryNodeFailureRecoveryAbstractTest.java | 4 +-
...eAtomicInvalidPartitionHandlingSelfTest.java | 36 +-
.../IgniteChangeGlobalStateTest.java | 11 +-
.../join/JoinInActiveNodeToActiveCluster.java | 4 +-
.../junits/common/GridCommonAbstractTest.java | 22 +-
.../testsuites/IgniteCacheTestSuite2.java | 7 +-
.../testsuites/IgniteCacheTestSuite6.java | 3 +
.../cache/WaitMapExchangeFinishCallable.java | 4 +-
96 files changed, 7339 insertions(+), 1473 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 8af66c4..39c19fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -168,6 +168,9 @@ public final class IgniteSystemProperties {
/** Maximum size for exchange history. Default value is {@code 1000}.*/
public static final String IGNITE_EXCHANGE_HISTORY_SIZE = "IGNITE_EXCHANGE_HISTORY_SIZE";
+ /** */
+ public static final String IGNITE_EXCHANGE_MERGE_DELAY = "IGNITE_EXCHANGE_MERGE_DELAY";
+
/**
* Name of the system property defining name of command line program.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/TestDebugLog.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/TestDebugLog.java b/modules/core/src/main/java/org/apache/ignite/TestDebugLog.java
new file mode 100644
index 0000000..94c5eb2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/TestDebugLog.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * TODO
+ */
+public class TestDebugLog {
+ /** */
+ private static final List<Object> msgs = Collections.synchronizedList(new ArrayList<>(100_000));
+
+ /** */
+ private static final SimpleDateFormat DEBUG_DATE_FMT = new SimpleDateFormat("HH:mm:ss,SSS");
+
+ static class Message {
+ String thread = Thread.currentThread().getName();
+
+ String msg;
+
+ long ts = U.currentTimeMillis();
+
+ public Message(String msg) {
+ this.msg = msg;
+ }
+
+ public String toString() {
+ return "Msg [msg=" + msg + ", thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+ }
+ }
+
+ static class EntryMessage extends Message {
+ Object key;
+ Object val;
+
+ public EntryMessage(Object key, Object val, String msg) {
+ super(msg);
+
+ this.key = key;
+ this.val = val;
+ }
+
+ public String toString() {
+ return "EntryMsg [key=" + key + ", val=" + val + ", msg=" + msg + ", thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+ }
+ }
+
+ static class PartMessage extends Message {
+ int p;
+ Object val;
+
+ public PartMessage(int p, Object val, String msg) {
+ super(msg);
+
+ this.p = p;
+ this.val = val;
+ }
+
+ public String toString() {
+ return "PartMessage [p=" + p + ", val=" + val + ", msg=" + msg + ", thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
+ }
+ }
+
+ static final boolean out = false;
+
+ public static void addMessage(String msg) {
+ msgs.add(new Message(msg));
+
+ if (out)
+ System.out.println(msg);
+ }
+
+ public static void addEntryMessage(Object key, Object val, String msg) {
+ if (key instanceof KeyCacheObject)
+ key = ((KeyCacheObject)key).value(null, false);
+
+ EntryMessage msg0 = new EntryMessage(key, val, msg);
+
+ msgs.add(msg0);
+
+ if (out) {
+ System.out.println(msg0.toString());
+
+ System.out.flush();
+ }
+ }
+
+ public static void addPartMessage(int p, Object val, String msg) {
+ PartMessage msg0 = new PartMessage(p, val, msg);
+
+ msgs.add(msg0);
+
+ if (out) {
+ System.out.println(msg0.toString());
+
+ System.out.flush();
+ }
+ }
+
+ public static void printMessages(boolean file, Integer part) {
+ List<Object> msgs0;
+
+ synchronized (msgs) {
+ msgs0 = new ArrayList<>(msgs);
+
+ msgs.clear();
+ }
+
+ if (file) {
+ try {
+ FileOutputStream out = new FileOutputStream("test_debug.log");
+
+ PrintWriter w = new PrintWriter(out);
+
+ for (Object msg : msgs0) {
+ if (part != null && msg instanceof PartMessage) {
+ if (((PartMessage) msg).p != part)
+ continue;
+ }
+
+ w.println(msg.toString());
+ }
+
+ w.close();
+
+ out.close();
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ else {
+ for (Object msg : msgs0)
+ System.out.println(msg);
+ }
+ }
+
+ public static void printKeyMessages(boolean file, Object key) {
+ List<Object> msgs0;
+
+ synchronized (msgs) {
+ msgs0 = new ArrayList<>(msgs);
+
+ msgs.clear();
+ }
+
+ if (file) {
+ try {
+ FileOutputStream out = new FileOutputStream("test_debug.log");
+
+ PrintWriter w = new PrintWriter(out);
+
+ for (Object msg : msgs0) {
+ if (msg instanceof EntryMessage && !((EntryMessage)msg).key.equals(key))
+ continue;
+
+ w.println(msg.toString());
+ }
+
+ w.close();
+
+ out.close();
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ else {
+ for (Object msg : msgs0) {
+ if (msg instanceof EntryMessage && !((EntryMessage)msg).key.equals(key))
+ continue;
+
+ System.out.println(msg);
+ }
+ }
+ }
+
+ public static void clear() {
+ msgs.clear();
+ }
+
+ public static void clearEntries() {
+ for (Iterator it = msgs.iterator(); it.hasNext();) {
+ Object msg = it.next();
+
+ if (msg instanceof EntryMessage)
+ it.remove();
+ }
+ }
+
+ public static void main(String[] args) {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
index bd4ec3a..8739c0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
@@ -364,7 +364,7 @@ public class IgniteDiagnosticMessage implements Message {
List<GridDhtPartitionsExchangeFuture> futs = ctx.cache().context().exchange().exchangeFutures();
for (GridDhtPartitionsExchangeFuture fut : futs) {
- if (topVer.equals(fut.topologyVersion())) {
+ if (topVer.equals(fut.initialVersion())) {
sb.append("Exchange future: ").append(fut);
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 29c89a5..97e06bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearCacheUpdates;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -869,7 +870,13 @@ public class GridIoMessageFactory implements MessageFactory {
break;
- // [-3..119] [124..127] [-23..-27] [-36..-55]- this
+ case 128:
+ msg = new CacheGroupAffinityMessage();
+
+ break;
+
+
+ // [-3..119] [124..128] [-23..-27] [-36..-55]- this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
// [2048..2053] - Snapshots
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 4c1077b..5ac99f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -32,6 +33,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.jetbrains.annotations.Nullable;
/**
@@ -81,7 +83,14 @@ public class DiscoCache {
/** Alive nodes. */
private final Set<UUID> alives = new GridConcurrentHashSet<>();
+ /** */
+ private final IgniteProductVersion minNodeVer;
+
+ /** */
+ private final AffinityTopologyVersion topVer;
+
/**
+ * @param topVer Topology version.
* @param state Current cluster state.
* @param loc Local node.
* @param rmtNodes Remote nodes.
@@ -97,6 +106,7 @@ public class DiscoCache {
* @param alives Alive nodes.
*/
DiscoCache(
+ AffinityTopologyVersion topVer,
DiscoveryDataClusterState state,
ClusterNode loc,
List<ClusterNode> rmtNodes,
@@ -110,6 +120,7 @@ public class DiscoCache {
Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
Map<UUID, ClusterNode> nodeMap,
Set<UUID> alives) {
+ this.topVer = topVer;
this.state = state;
this.loc = loc;
this.rmtNodes = rmtNodes;
@@ -123,6 +134,33 @@ public class DiscoCache {
this.cacheGrpAffNodes = cacheGrpAffNodes;
this.nodeMap = nodeMap;
this.alives.addAll(alives);
+
+ IgniteProductVersion minVer = null;
+
+ for (int i = 0; i < allNodes.size(); i++) {
+ ClusterNode node = allNodes.get(i);
+
+ if (minVer == null)
+ minVer = node.version();
+ else if (node.version().compareTo(minVer) < 0)
+ minVer = node.version();
+ }
+
+ minNodeVer = minVer;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ public AffinityTopologyVersion version() {
+ return topVer;
+ }
+
+ /**
+ * @return Minimum node version.
+ */
+ public IgniteProductVersion minimumNodeVersion() {
+ return minNodeVer;
}
/**
@@ -254,7 +292,7 @@ public class DiscoCache {
* @param id Node ID.
* @return Node.
*/
- public @Nullable ClusterNode node(UUID id) {
+ @Nullable public ClusterNode node(UUID id) {
return nodeMap.get(id);
}
@@ -280,6 +318,45 @@ public class DiscoCache {
}
/**
+ * @param order Order.
+ * @return Server node instance.
+ */
+ @Nullable public ClusterNode serverNodeByOrder(long order) {
+ int idx = serverNodeBinarySearch(order);
+
+ if (idx >= 0)
+ return srvNodes.get(idx);
+
+ return null;
+ }
+
+ /**
+ * @param order Node order.
+ * @return Node index.
+ */
+ private int serverNodeBinarySearch(long order) {
+ int low = 0;
+ int high = srvNodes.size() - 1;
+
+ while (low <= high) {
+ int mid = (low + high) >>> 1;
+
+ ClusterNode midVal = srvNodes.get(mid);
+
+ int cmp = Long.compare(midVal.order(), order);
+
+ if (cmp < 0)
+ low = mid + 1;
+ else if (cmp > 0)
+ high = mid - 1;
+ else
+ return mid;
+ }
+
+ return -(low + 1);
+ }
+
+ /**
* @param nodes Cluster nodes.
* @return Empty collection if nodes list is {@code null}
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 834ba4d..d426ca5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -621,7 +621,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
ChangeGlobalStateFinishMessage stateFinishMsg = null;
if (locJoinEvt) {
- discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+ discoCache = createDiscoCache(new AffinityTopologyVersion(topVer, minorTopVer),
+ ctx.state().clusterState(),
+ locNode,
+ topSnapshot);
transitionWaitFut = ctx.state().onLocalJoin(discoCache);
}
@@ -644,7 +647,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
else if (customMsg instanceof ChangeGlobalStateFinishMessage) {
ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg);
- discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+ discoCache = createDiscoCache(topSnap.get().topVer,
+ ctx.state().clusterState(),
+ locNode,
+ topSnapshot);
topSnap.set(new Snapshot(topSnap.get().topVer, discoCache));
@@ -691,8 +697,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
// There is no race possible between history maintenance and concurrent discovery
// event notifications, since SPI notifies manager about all events from this listener.
if (verChanged) {
- if (discoCache == null)
- discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+ if (discoCache == null) {
+ discoCache = createDiscoCache(nextTopVer,
+ ctx.state().clusterState(),
+ locNode,
+ topSnapshot);
+ }
discoCacheHist.put(nextTopVer, discoCache);
@@ -763,7 +773,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
topHist.clear();
topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
- createDiscoCache(ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet())));
+ createDiscoCache(AffinityTopologyVersion.ZERO, ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet())));
}
else if (type == EVT_CLIENT_NODE_RECONNECTED) {
assert locNode.isClient() : locNode;
@@ -2149,12 +2159,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* Called from discovery thread.
*
+ * @param topVer Topology version.
* @param state Current state.
* @param loc Local node.
* @param topSnapshot Topology snapshot.
* @return Newly created discovery cache.
*/
- @NotNull private DiscoCache createDiscoCache(DiscoveryDataClusterState state,
+ @NotNull private DiscoCache createDiscoCache(
+ AffinityTopologyVersion topVer,
+ DiscoveryDataClusterState state,
ClusterNode loc,
Collection<ClusterNode> topSnapshot) {
HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
@@ -2231,6 +2244,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
return new DiscoCache(
+ topVer,
state,
loc,
Collections.unmodifiableList(rmtNodes),
@@ -2373,7 +2387,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
discoWrk.addEvent(EVT_NODE_SEGMENTED,
AffinityTopologyVersion.NONE,
node,
- createDiscoCache(null, node, empty),
+ createDiscoCache(AffinityTopologyVersion.NONE, null, node, empty),
empty,
null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
index 8669530..44b2753 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -73,6 +73,13 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi
}
/**
+ * @return {@code True} if this is real topology version (neither {@link #NONE} nor {@link #ZERO}.
+ */
+ public boolean initialized() {
+ return topVer > 0;
+ }
+
+ /**
* @return Topology version with incremented minor version.
*/
public AffinityTopologyVersion nextMinorVersion() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a8c6c59..f921251 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.affinity;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -352,6 +353,17 @@ public class GridAffinityAssignmentCache {
return aff.assignment();
}
+ /**
+ * @param topVer Topology version.
+ * @return Affinity assignment.
+ */
+ public List<List<ClusterNode>> readyAssignments(AffinityTopologyVersion topVer) {
+ AffinityAssignment aff = readyAffinity(topVer);
+
+ assert aff != null : "No ready affinity [grp=" + cacheOrGrpName + ", ver=" + topVer + ']';
+
+ return aff.assignment();
+ }
/**
* Gets future that will be completed after topology with version {@code topVer} is calculated.
@@ -456,6 +468,30 @@ public class GridAffinityAssignmentCache {
}
/**
+ * @param topVer Topology version.
+ * @return Assignment.
+ */
+ public AffinityAssignment readyAffinity(AffinityTopologyVersion topVer) {
+ AffinityAssignment cache = head.get();
+
+ if (!cache.topologyVersion().equals(topVer)) {
+ cache = affCache.get(topVer);
+
+ if (cache == null) {
+ throw new IllegalStateException("Affinity for topology version is " +
+ "not initialized [locNode=" + ctx.discovery().localNode().id() +
+ ", grp=" + cacheOrGrpName +
+ ", topVer=" + topVer +
+ ", head=" + head.get().topologyVersion() +
+ ", history=" + affCache.keySet() +
+ ']');
+ }
+ }
+
+ return cache;
+ }
+
+ /**
* Get cached affinity for specified topology version.
*
* @param topVer Topology version.
@@ -600,6 +636,12 @@ public class GridAffinityAssignmentCache {
}
}
+ /**
+ * @return All initialized versions.
+ */
+ public Collection<AffinityTopologyVersion> cachedVersions() {
+ return affCache.keySet();
+ }
/**
* Affinity ready future. Will remove itself from ready futures map.
http://git-wip-us.apache.org/repos/asf/ignite/blob/bebf2997/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 4ee0502..9c9fb8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -184,7 +184,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
assert cacheName != null;
if (aff == null) {
- aff = affinityCache(cacheName, ctx.discovery().topologyVersionEx());
+ aff = affinityCache(cacheName, ctx.cache().context().exchange().readyAffinityVersion());
if (aff == null)
throw new IgniteCheckedException("Failed to get cache affinity (cache was not started " +
@@ -303,7 +303,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
if (key == null)
return null;
- AffinityInfo affInfo = affinityCache(cacheName, ctx.discovery().topologyVersionEx());
+ AffinityInfo affInfo = affinityCache(cacheName, ctx.cache().context().exchange().readyAffinityVersion());
if (affInfo == null)
return null;
@@ -329,7 +329,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
*/
private <K> Map<ClusterNode, Collection<K>> keysToNodes(@Nullable final String cacheName,
Collection<? extends K> keys) throws IgniteCheckedException {
- return keysToNodes(cacheName, keys, ctx.discovery().topologyVersionEx());
+ return keysToNodes(cacheName, keys, ctx.cache().context().exchange().readyAffinityVersion());
}
/**
@@ -974,7 +974,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If failed.
*/
private AffinityInfo cache() throws IgniteCheckedException {
- AffinityInfo aff = affinityCache(cacheName, ctx.discovery().topologyVersionEx());
+ AffinityInfo aff = affinityCache(cacheName, ctx.cache().context().exchange().readyAffinityVersion());
if (aff == null)
throw new IgniteException("Failed to find cache (cache was not started " +