You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2015/07/16 17:50:42 UTC

[02/36] incubator-ignite git commit: ignite-890: filtering out non verified messages for client

ignite-890: filtering out non verified messages for client


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c4f933fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c4f933fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c4f933fa

Branch: refs/heads/ignite-961
Commit: c4f933fa69f63886a199242957c3a86c254344e3
Parents: a1ed65b
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jul 9 16:55:38 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jul 9 16:55:38 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 10 +--
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 22 ++++++-
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     | 16 ++---
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 10 +--
 .../tcp/TcpDiscoveryMultiThreadedTest.java      | 69 +++++++++++++++++++-
 5 files changed, 105 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4f933fa/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 75e44d2..68017a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -595,11 +595,11 @@ class ClientImpl extends TcpDiscoveryImpl {
         NavigableSet<ClusterNode> allNodes = allVisibleNodes();
 
         if (!topHist.containsKey(topVer)) {
-            assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
-                "lastVer=" + (topHist.isEmpty() ? null : topHist.lastKey()) +
-                ", newVer=" + topVer +
-                ", locNode=" + locNode +
-                ", msg=" + msg;
+//            assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
+//                "lastVer=" + (topHist.isEmpty() ? null : topHist.lastKey()) +
+//                ", newVer=" + topVer +
+//                ", locNode=" + locNode +
+//                ", msg=" + msg;
 
             topHist.put(topVer, allNodes);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4f933fa/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 5faa437..e398885 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -57,7 +57,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
 @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
 class ServerImpl extends TcpDiscoveryImpl {
     /** */
-    private final Executor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
+    private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<Runnable>());
 
     /** Nodes ring. */
@@ -331,6 +331,15 @@ class ServerImpl extends TcpDiscoveryImpl {
         U.interrupt(msgWorker);
         U.join(msgWorker, log);
 
+        for (ClientMessageWorker clientWorker : clientMsgWorkers.values()) {
+            U.interrupt(clientWorker);
+            U.join(clientWorker, log);
+        }
+
+        clientMsgWorkers.clear();
+
+        utilityPool.shutdownNow();
+
         U.interrupt(statsPrinter);
         U.join(statsPrinter, log);
 
@@ -1699,7 +1708,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             res = new ArrayList<>(msgs.size());
                     }
 
-                    if (res != null)
+                    if (res != null && msg.verified())
                         res.add(prepare(msg, node.id()));
                 }
 
@@ -1725,7 +1734,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (msg.id().equals(lastMsgId))
                             skip = false;
                     }
-                    else
+                    else if (msg.verified())
                         cp.add(prepare(msg, node.id()));
                 }
 
@@ -3894,6 +3903,13 @@ class ServerImpl extends TcpDiscoveryImpl {
         private void processClientPingRequest(final TcpDiscoveryClientPingRequest msg) {
             utilityPool.execute(new Runnable() {
                 @Override public void run() {
+                    if (spiState == DISCONNECTED) {
+                        if (log.isDebugEnabled())
+                            log.debug("Ignoring ping request, SPI is already disconnected: " + msg);
+
+                        return;
+                    }
+
                     boolean res = pingNode(msg.nodeToPing());
 
                     final ClientMessageWorker worker = clientMsgWorkers.get(msg.creatorNodeId());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4f933fa/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 9172afe..8fedce1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -95,14 +95,14 @@ abstract class TcpDiscoveryImpl {
     protected void debugLog(String msg) {
         assert debugMode;
 
-        String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) +
-            '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() +
-            "-" + locNode.internalOrder() + "] " +
-            msg;
-
-        debugLog.add(msg0);
-
-        int delta = debugLog.size() - debugMsgHist;
+//        String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) +
+//            '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() +
+//            "-" + locNode.internalOrder() + "] " +
+//            msg;
+//
+//        debugLog.add(msg0);
+//
+//        int delta = debugLog.size() - debugMsgHist;
 //
 //        for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++)
 //            debugLog.poll();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4f933fa/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 650c22d..b84e6c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1563,11 +1563,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             impl = new ServerImpl(this);
         }
 
-        impl.setDebugMode(true);
-
-        synchronized (allSpis) {
-            allSpis.add(this);
-        }
+//        impl.setDebugMode(true);
+//
+//        synchronized (allSpis) {
+//            allSpis.add(this);
+//        }
 
         assertParameter(ipFinder != null, "ipFinder != null");
         assertParameter(hbFreq > 0, "heartbeatFreq > 0");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4f933fa/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 1ae334b..87d9304 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -102,7 +102,7 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
      * @throws Exception If any error occurs.
      */
     public void testMultiThreaded() throws Exception {
-        execute();
+        execute2();
     }
 
     /**
@@ -164,6 +164,73 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    private void execute2() throws Exception {
+        info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");
+
+        startGridsMultiThreaded(GRID_CNT);
+
+        clientFlagGlobal = true;
+
+        startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT);
+
+        IgniteInternalFuture<?> fut1 = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    clientFlagPerThread.set(true);
+
+                    int idx = clientIdx.getAndIncrement();
+
+                    while (!done.get()) {
+                        stopGrid(idx);
+                        startGrid(idx);
+                    }
+
+                    return null;
+                }
+            },
+            1
+        );
+
+        final BlockingQueue<Integer> srvIdx = new LinkedBlockingQueue<>();
+
+        for (int i = 0; i < GRID_CNT; i++)
+            srvIdx.add(i);
+
+        IgniteInternalFuture<?> fut2 = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    clientFlagPerThread.set(false);
+
+                    while (!done.get()) {
+                        int idx = srvIdx.take();
+
+                        stopGrid(idx);
+                        startGrid(idx);
+
+                        srvIdx.add(idx);
+                    }
+
+                    return null;
+                }
+            },
+            1
+        );
+
+        Thread.sleep(getTestTimeout() - 60 * 1000);
+
+        done.set(true);
+
+        fut1.get();
+        fut2.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     private void execute() throws Exception {
         info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");