You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/07/20 11:07:27 UTC
[02/30] incubator-ignite git commit: ignite-890: filtering out non
verified messages for client
ignite-890: filtering out non verified messages for client
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c4f933fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c4f933fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c4f933fa
Branch: refs/heads/ignite-630
Commit: c4f933fa69f63886a199242957c3a86c254344e3
Parents: a1ed65b
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jul 9 16:55:38 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jul 9 16:55:38 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 10 +--
.../ignite/spi/discovery/tcp/ServerImpl.java | 22 ++++++-
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 16 ++---
.../spi/discovery/tcp/TcpDiscoverySpi.java | 10 +--
.../tcp/TcpDiscoveryMultiThreadedTest.java | 69 +++++++++++++++++++-
5 files changed, 105 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4f933fa/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 75e44d2..68017a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -595,11 +595,11 @@ class ClientImpl extends TcpDiscoveryImpl {
NavigableSet<ClusterNode> allNodes = allVisibleNodes();
if (!topHist.containsKey(topVer)) {
- assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
- "lastVer=" + (topHist.isEmpty() ? null : topHist.lastKey()) +
- ", newVer=" + topVer +
- ", locNode=" + locNode +
- ", msg=" + msg;
+// assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
+// "lastVer=" + (topHist.isEmpty() ? null : topHist.lastKey()) +
+// ", newVer=" + topVer +
+// ", locNode=" + locNode +
+// ", msg=" + msg;
topHist.put(topVer, allNodes);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4f933fa/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 5faa437..e398885 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -57,7 +57,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
class ServerImpl extends TcpDiscoveryImpl {
/** */
- private final Executor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
+ private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
/** Nodes ring. */
@@ -331,6 +331,15 @@ class ServerImpl extends TcpDiscoveryImpl {
U.interrupt(msgWorker);
U.join(msgWorker, log);
+ for (ClientMessageWorker clientWorker : clientMsgWorkers.values()) {
+ U.interrupt(clientWorker);
+ U.join(clientWorker, log);
+ }
+
+ clientMsgWorkers.clear();
+
+ utilityPool.shutdownNow();
+
U.interrupt(statsPrinter);
U.join(statsPrinter, log);
@@ -1699,7 +1708,7 @@ class ServerImpl extends TcpDiscoveryImpl {
res = new ArrayList<>(msgs.size());
}
- if (res != null)
+ if (res != null && msg.verified())
res.add(prepare(msg, node.id()));
}
@@ -1725,7 +1734,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.id().equals(lastMsgId))
skip = false;
}
- else
+ else if (msg.verified())
cp.add(prepare(msg, node.id()));
}
@@ -3894,6 +3903,13 @@ class ServerImpl extends TcpDiscoveryImpl {
private void processClientPingRequest(final TcpDiscoveryClientPingRequest msg) {
utilityPool.execute(new Runnable() {
@Override public void run() {
+ if (spiState == DISCONNECTED) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring ping request, SPI is already disconnected: " + msg);
+
+ return;
+ }
+
boolean res = pingNode(msg.nodeToPing());
final ClientMessageWorker worker = clientMsgWorkers.get(msg.creatorNodeId());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4f933fa/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 9172afe..8fedce1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -95,14 +95,14 @@ abstract class TcpDiscoveryImpl {
protected void debugLog(String msg) {
assert debugMode;
- String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) +
- '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() +
- "-" + locNode.internalOrder() + "] " +
- msg;
-
- debugLog.add(msg0);
-
- int delta = debugLog.size() - debugMsgHist;
+// String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) +
+// '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() +
+// "-" + locNode.internalOrder() + "] " +
+// msg;
+//
+// debugLog.add(msg0);
+//
+// int delta = debugLog.size() - debugMsgHist;
//
// for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++)
// debugLog.poll();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4f933fa/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 650c22d..b84e6c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1563,11 +1563,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
impl = new ServerImpl(this);
}
- impl.setDebugMode(true);
-
- synchronized (allSpis) {
- allSpis.add(this);
- }
+// impl.setDebugMode(true);
+//
+// synchronized (allSpis) {
+// allSpis.add(this);
+// }
assertParameter(ipFinder != null, "ipFinder != null");
assertParameter(hbFreq > 0, "heartbeatFreq > 0");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4f933fa/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 1ae334b..87d9304 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -102,7 +102,7 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
* @throws Exception If any error occurs.
*/
public void testMultiThreaded() throws Exception {
- execute();
+ execute2();
}
/**
@@ -164,6 +164,73 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ private void execute2() throws Exception {
+ info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");
+
+ startGridsMultiThreaded(GRID_CNT);
+
+ clientFlagGlobal = true;
+
+ startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
+
+ final AtomicBoolean done = new AtomicBoolean();
+
+ final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT);
+
+ IgniteInternalFuture<?> fut1 = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clientFlagPerThread.set(true);
+
+ int idx = clientIdx.getAndIncrement();
+
+ while (!done.get()) {
+ stopGrid(idx);
+ startGrid(idx);
+ }
+
+ return null;
+ }
+ },
+ 1
+ );
+
+ final BlockingQueue<Integer> srvIdx = new LinkedBlockingQueue<>();
+
+ for (int i = 0; i < GRID_CNT; i++)
+ srvIdx.add(i);
+
+ IgniteInternalFuture<?> fut2 = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clientFlagPerThread.set(false);
+
+ while (!done.get()) {
+ int idx = srvIdx.take();
+
+ stopGrid(idx);
+ startGrid(idx);
+
+ srvIdx.add(idx);
+ }
+
+ return null;
+ }
+ },
+ 1
+ );
+
+ Thread.sleep(getTestTimeout() - 60 * 1000);
+
+ done.set(true);
+
+ fut1.get();
+ fut2.get();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
private void execute() throws Exception {
info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");