You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/26 11:25:28 UTC
[47/47] ignite git commit: ignite-3967 Do not use
GridBoundedConcurrentOrderedMap.clear
ignite-3967 Do not use GridBoundedConcurrentOrderedMap.clear
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2b1bd783
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2b1bd783
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2b1bd783
Branch: refs/heads/ignite-3967
Commit: 2b1bd783cdd4568cd2e9883be705dace430c4a6e
Parents: 8032fc2
Author: sboikov <sb...@gridgain.com>
Authored: Mon Sep 26 14:24:21 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Sep 26 14:24:21 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 2 +-
.../discovery/GridDiscoveryManager.java | 66 ++++++++++----------
.../util/GridBoundedConcurrentOrderedMap.java | 5 ++
.../IgniteClientReconnectCacheTest.java | 35 +++++++++++
4 files changed, 73 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b1bd783/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 6c5a628..e0a36a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1126,7 +1126,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ackStart(rtBean);
if (!isDaemon())
- ctx.discovery().ackTopology();
+ ctx.discovery().ackTopology(localNode().order());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b1bd783/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index bbf3ebd..8cb003d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -42,7 +42,6 @@ import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@@ -81,7 +80,7 @@ import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.F0;
-import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -113,15 +112,14 @@ import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
-import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SERVICES_COMPATIBILITY_MODE;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -210,8 +208,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
private final AtomicBoolean lastSegChkRes = new AtomicBoolean(true);
/** Topology cache history. */
- private final ConcurrentNavigableMap<AffinityTopologyVersion, DiscoCache> discoCacheHist =
- new GridBoundedConcurrentOrderedMap<>(DISCOVERY_HISTORY_SIZE);
+ private final GridBoundedConcurrentLinkedHashMap<AffinityTopologyVersion, DiscoCache> discoCacheHist =
+ new GridBoundedConcurrentLinkedHashMap<>(DISCOVERY_HISTORY_SIZE);
/** Topology snapshots history. */
private volatile Map<Long, Collection<ClusterNode>> topHist = new HashMap<>();
@@ -599,7 +597,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
registeredCaches.clear();
- discoCacheHist.clear();
+ for (AffinityTopologyVersion histVer : discoCacheHist.keySet())
+ discoCacheHist.remove(histVer);
topHist.clear();
@@ -1199,9 +1198,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* Prints the latest topology info into log taking into account logging/verbosity settings.
+ *
+ * @param topVer Topology version.
*/
- public void ackTopology() {
- ackTopology(topSnap.get().topVer.topologyVersion(), false);
+ public void ackTopology(long topVer) {
+ ackTopology(topVer, false);
}
/**
@@ -1213,7 +1214,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
private void ackTopology(long topVer, boolean throttle) {
assert !isLocDaemon;
- DiscoCache discoCache = discoCache();
+ DiscoCache discoCache = discoCacheHist.get(new AffinityTopologyVersion(topVer));
+
+ if (discoCache == null) {
+ String msg = "Failed to resolve nodes topology [topVer=" + topVer +
+ ", hist=" + discoCacheHist.keySet() + ']';
+
+ if (log.isQuiet())
+ U.quiet(false, msg);
+
+ if (log.isDebugEnabled())
+ log.debug(msg);
+ else if (log.isInfoEnabled())
+ log.info(msg);
+
+ return;
+ }
Collection<ClusterNode> rmtNodes = discoCache.remoteNodes();
@@ -1237,7 +1253,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
double heap = U.heapSize(allNodes, 2);
if (log.isQuiet())
- U.quiet(false, topologySnapshotMessage(srvNodes.size(), clientNodes.size(), totalCpus, heap));
+ U.quiet(false, topologySnapshotMessage(topVer, srvNodes.size(), clientNodes.size(), totalCpus, heap));
if (log.isDebugEnabled()) {
String dbg = "";
@@ -1281,19 +1297,20 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
log.debug(dbg);
}
else if (log.isInfoEnabled())
- log.info(topologySnapshotMessage(srvNodes.size(), clientNodes.size(), totalCpus, heap));
+ log.info(topologySnapshotMessage(topVer, srvNodes.size(), clientNodes.size(), totalCpus, heap));
}
/**
+ * @param topVer Topology version.
* @param srvNodesNum Server nodes number.
* @param clientNodesNum Client nodes number.
* @param totalCpus Total cpu number.
* @param heap Heap size.
* @return Topology snapshot message.
*/
- private String topologySnapshotMessage(int srvNodesNum, int clientNodesNum, int totalCpus, double heap) {
+ private String topologySnapshotMessage(long topVer, int srvNodesNum, int clientNodesNum, int totalCpus, double heap) {
return PREFIX + " [" +
- (discoOrdered ? "ver=" + topSnap.get().topVer.topologyVersion() + ", " : "") +
+ (discoOrdered ? "ver=" + topVer + ", " : "") +
"servers=" + srvNodesNum +
", clients=" + clientNodesNum +
", CPUs=" + totalCpus +
@@ -1506,7 +1523,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
*
* @return Discovery collection cache.
*/
- public DiscoCache discoCache() {
+ private DiscoCache discoCache() {
Snapshot cur = topSnap.get();
assert cur != null;
@@ -1533,15 +1550,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
return discoCache().allNodes();
}
- /**
- * Gets topology grouped by node versions.
- *
- * @return Version to collection of nodes map.
- */
- public NavigableMap<IgniteProductVersion, Collection<ClusterNode>> topologyVersionMap() {
- return discoCache().versionsMap();
- }
-
/** @return Full topology size. */
public int size() {
return discoCache().allNodes().size();
@@ -1782,16 +1790,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
snap.discoCache : discoCacheHist.get(topVer);
if (cache == null) {
- // Find the eldest acceptable discovery cache.
- Map.Entry<AffinityTopologyVersion, DiscoCache> eldest = discoCacheHist.firstEntry();
-
- if (eldest != null) {
- if (topVer.compareTo(eldest.getKey()) < 0)
- cache = eldest.getValue();
- }
- }
-
- if (cache == null) {
throw new IgniteException("Failed to resolve nodes topology [cacheName=" + cacheName +
", topVer=" + topVer +
", history=" + discoCacheHist.keySet() +
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b1bd783/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
index 3f6db30..33b2e60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
@@ -236,4 +236,9 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap
return rmvd;
}
+
+ /** {@inheritDoc} */
+ @Override public void clear() {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b1bd783/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index ad6c46f..0f0165b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.CacheException;
import junit.framework.AssertionFailedError;
@@ -1084,6 +1085,21 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
* @throws Exception If failed.
*/
public void testReconnectMultinode() throws Exception {
+ reconnectMultinode(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectMultinodeLongHistory() throws Exception {
+ reconnectMultinode(true);
+ }
+
+ /**
+ * @param longHist If {@code true} generates many discovery events to overflow events history.
+ * @throws Exception If failed.
+ */
+ private void reconnectMultinode(boolean longHist) throws Exception {
grid(0).createCache(new CacheConfiguration<>());
clientMode = true;
@@ -1100,6 +1116,25 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
clients.add(client);
}
+ if (longHist) {
+ // Generate many discovery events to overflow discovery events history.
+ final AtomicInteger nodeIdx = new AtomicInteger(SRV_CNT + CLIENTS);
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int idx = nodeIdx.incrementAndGet();
+
+ for (int i = 0; i < 25; i++) {
+ startGrid(idx);
+
+ stopGrid(idx);
+ }
+
+ return null;
+ }
+ }, 4, "restart-thread");
+ }
+
int nodes = SRV_CNT + CLIENTS;
int srvNodes = SRV_CNT;