You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/26 11:25:28 UTC

[47/47] ignite git commit: ignite-3967 Do not use GridBoundedConcurrentOrderedMap.clear

ignite-3967 Do not use GridBoundedConcurrentOrderedMap.clear


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2b1bd783
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2b1bd783
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2b1bd783

Branch: refs/heads/ignite-3967
Commit: 2b1bd783cdd4568cd2e9883be705dace430c4a6e
Parents: 8032fc2
Author: sboikov <sb...@gridgain.com>
Authored: Mon Sep 26 14:24:21 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Sep 26 14:24:21 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  2 +-
 .../discovery/GridDiscoveryManager.java         | 66 ++++++++++----------
 .../util/GridBoundedConcurrentOrderedMap.java   |  5 ++
 .../IgniteClientReconnectCacheTest.java         | 35 +++++++++++
 4 files changed, 73 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2b1bd783/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 6c5a628..e0a36a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1126,7 +1126,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         ackStart(rtBean);
 
         if (!isDaemon())
-            ctx.discovery().ackTopology();
+            ctx.discovery().ackTopology(localNode().order());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b1bd783/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index bbf3ebd..8cb003d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -42,7 +42,6 @@ import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
@@ -81,7 +80,7 @@ import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.processors.service.GridServiceProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.F0;
-import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -113,15 +112,14 @@ import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
 import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
 import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
-import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SERVICES_COMPATIBILITY_MODE;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -210,8 +208,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     private final AtomicBoolean lastSegChkRes = new AtomicBoolean(true);
 
     /** Topology cache history. */
-    private final ConcurrentNavigableMap<AffinityTopologyVersion, DiscoCache> discoCacheHist =
-        new GridBoundedConcurrentOrderedMap<>(DISCOVERY_HISTORY_SIZE);
+    private final GridBoundedConcurrentLinkedHashMap<AffinityTopologyVersion, DiscoCache> discoCacheHist =
+        new GridBoundedConcurrentLinkedHashMap<>(DISCOVERY_HISTORY_SIZE);
 
     /** Topology snapshots history. */
     private volatile Map<Long, Collection<ClusterNode>> topHist = new HashMap<>();
@@ -599,7 +597,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                     registeredCaches.clear();
 
-                    discoCacheHist.clear();
+                    for (AffinityTopologyVersion histVer : discoCacheHist.keySet())
+                        discoCacheHist.remove(histVer);
 
                     topHist.clear();
 
@@ -1199,9 +1198,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
     /**
      * Prints the latest topology info into log taking into account logging/verbosity settings.
+     *
+     * @param topVer Topology version.
      */
-    public void ackTopology() {
-        ackTopology(topSnap.get().topVer.topologyVersion(), false);
+    public void ackTopology(long topVer) {
+        ackTopology(topVer, false);
     }
 
     /**
@@ -1213,7 +1214,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     private void ackTopology(long topVer, boolean throttle) {
         assert !isLocDaemon;
 
-        DiscoCache discoCache = discoCache();
+        DiscoCache discoCache = discoCacheHist.get(new AffinityTopologyVersion(topVer));
+
+        if (discoCache == null) {
+            String msg = "Failed to resolve nodes topology [topVer=" + topVer +
+                ", hist=" + discoCacheHist.keySet() + ']';
+
+            if (log.isQuiet())
+                U.quiet(false, msg);
+
+            if (log.isDebugEnabled())
+                log.debug(msg);
+            else if (log.isInfoEnabled())
+                log.info(msg);
+
+            return;
+        }
 
         Collection<ClusterNode> rmtNodes = discoCache.remoteNodes();
 
@@ -1237,7 +1253,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         double heap = U.heapSize(allNodes, 2);
 
         if (log.isQuiet())
-            U.quiet(false, topologySnapshotMessage(srvNodes.size(), clientNodes.size(), totalCpus, heap));
+            U.quiet(false, topologySnapshotMessage(topVer, srvNodes.size(), clientNodes.size(), totalCpus, heap));
 
         if (log.isDebugEnabled()) {
             String dbg = "";
@@ -1281,19 +1297,20 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             log.debug(dbg);
         }
         else if (log.isInfoEnabled())
-            log.info(topologySnapshotMessage(srvNodes.size(), clientNodes.size(), totalCpus, heap));
+            log.info(topologySnapshotMessage(topVer, srvNodes.size(), clientNodes.size(), totalCpus, heap));
     }
 
     /**
+     * @param topVer Topology version.
      * @param srvNodesNum Server nodes number.
      * @param clientNodesNum Client nodes number.
      * @param totalCpus Total cpu number.
      * @param heap Heap size.
      * @return Topology snapshot message.
      */
-    private String topologySnapshotMessage(int srvNodesNum, int clientNodesNum, int totalCpus, double heap) {
+    private String topologySnapshotMessage(long topVer, int srvNodesNum, int clientNodesNum, int totalCpus, double heap) {
         return PREFIX + " [" +
-            (discoOrdered ? "ver=" + topSnap.get().topVer.topologyVersion() + ", " : "") +
+            (discoOrdered ? "ver=" + topVer + ", " : "") +
             "servers=" + srvNodesNum +
             ", clients=" + clientNodesNum +
             ", CPUs=" + totalCpus +
@@ -1506,7 +1523,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      *
      * @return Discovery collection cache.
      */
-    public DiscoCache discoCache() {
+    private DiscoCache discoCache() {
         Snapshot cur = topSnap.get();
 
         assert cur != null;
@@ -1533,15 +1550,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         return discoCache().allNodes();
     }
 
-    /**
-     * Gets topology grouped by node versions.
-     *
-     * @return Version to collection of nodes map.
-     */
-    public NavigableMap<IgniteProductVersion, Collection<ClusterNode>> topologyVersionMap() {
-        return discoCache().versionsMap();
-    }
-
     /** @return Full topology size. */
     public int size() {
         return discoCache().allNodes().size();
@@ -1782,16 +1790,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             snap.discoCache : discoCacheHist.get(topVer);
 
         if (cache == null) {
-            // Find the eldest acceptable discovery cache.
-            Map.Entry<AffinityTopologyVersion, DiscoCache> eldest = discoCacheHist.firstEntry();
-
-            if (eldest != null) {
-                if (topVer.compareTo(eldest.getKey()) < 0)
-                    cache = eldest.getValue();
-            }
-        }
-
-        if (cache == null) {
             throw new IgniteException("Failed to resolve nodes topology [cacheName=" + cacheName +
                 ", topVer=" + topVer +
                 ", history=" + discoCacheHist.keySet() +

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b1bd783/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
index 3f6db30..33b2e60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
@@ -236,4 +236,9 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap
 
         return rmvd;
     }
+
+    /** {@inheritDoc} */
+    @Override public void clear() {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2b1bd783/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index ad6c46f..0f0165b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.CacheException;
 import junit.framework.AssertionFailedError;
@@ -1084,6 +1085,21 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
      * @throws Exception If failed.
      */
     public void testReconnectMultinode() throws Exception {
+        reconnectMultinode(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectMultinodeLongHistory() throws Exception {
+        reconnectMultinode(true);
+    }
+
+    /**
+     * @param longHist If {@code true} generates many discovery events to overflow events history.
+     * @throws Exception If failed.
+     */
+    private void reconnectMultinode(boolean longHist) throws Exception {
         grid(0).createCache(new CacheConfiguration<>());
 
         clientMode = true;
@@ -1100,6 +1116,25 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
             clients.add(client);
         }
 
+        if (longHist) {
+            // Generate many discovery events to overflow discovery events history.
+            final AtomicInteger nodeIdx = new AtomicInteger(SRV_CNT + CLIENTS);
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    int idx = nodeIdx.incrementAndGet();
+
+                    for (int i  = 0; i < 25; i++) {
+                        startGrid(idx);
+
+                        stopGrid(idx);
+                    }
+
+                    return null;
+                }
+            }, 4, "restart-thread");
+        }
+
         int nodes = SRV_CNT + CLIENTS;
         int srvNodes = SRV_CNT;