You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/16 15:26:05 UTC
[1/4] incubator-ignite git commit: # ignite-883 client reconnect
issues
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-sprint-6 a9228c07a -> f2c4cc801
# ignite-883 client reconnect issues
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/54bfa36c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/54bfa36c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/54bfa36c
Branch: refs/heads/ignite-sprint-6
Commit: 54bfa36c7417109832effe9c59c0120d9249b1b9
Parents: f4b1123
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 16 12:14:20 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 16 14:03:03 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 87 ++++++++++++++------
.../distributed/IgniteCacheManyClientsTest.java | 66 +++++++++++----
2 files changed, 116 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/54bfa36c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index a17296c..fef6f4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -545,14 +545,31 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param msg Discovery message.
* @return Latest topology snapshot.
*/
- private NavigableSet<ClusterNode> updateTopologyHistory(long topVer, @Nullable TcpDiscoveryAbstractMessage msg) {
+ private Collection<ClusterNode> updateTopologyHistory(long topVer, @Nullable TcpDiscoveryAbstractMessage msg) {
this.topVer = topVer;
+ if (!topHist.isEmpty() && topVer <= topHist.lastKey()) {
+ if (log.isDebugEnabled())
+ log.debug("Skip topology update since topology already updated [msg=" + msg +
+ ", lastHistKey=" + topHist.lastKey() +
+ ", topVer=" + topVer +
+ ", locNode=" + locNode + ']');
+
+ Collection<ClusterNode> top = topHist.get(topVer);
+
+ assert top != null : msg;
+
+ return top;
+ }
+
NavigableSet<ClusterNode> allNodes = allVisibleNodes();
if (!topHist.containsKey(topVer)) {
assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
- "lastVer=" + topHist.lastKey() + ", newVer=" + topVer + ", locNode=" + locNode + ", msg=" + msg;
+ "lastVer=" + (topHist.isEmpty() ? null : topHist.lastKey()) +
+ ", newVer=" + topVer +
+ ", locNode=" + locNode +
+ ", msg=" + msg;
topHist.put(topVer, allNodes);
@@ -886,7 +903,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param join {@code True} if reconnects during join.
*/
protected Reconnector(boolean join) {
- super(spi.ignite().name(), "tcp-client-disco-msg-worker", log);
+ super(spi.ignite().name(), "tcp-client-disco-reconnector", log);
this.join = join;
}
@@ -944,7 +961,8 @@ class ClientImpl extends TcpDiscoveryImpl {
sock.setKeepAlive(true);
sock.setTcpNoDelay(true);
- // Wait for
+ List<TcpDiscoveryAbstractMessage> msgs = null;
+
while (!isInterrupted()) {
TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
@@ -955,12 +973,23 @@ class ClientImpl extends TcpDiscoveryImpl {
if (res.success()) {
msgWorker.addMessage(res);
+ if (msgs != null) {
+ for (TcpDiscoveryAbstractMessage msg0 : msgs)
+ msgWorker.addMessage(msg0);
+ }
+
success = true;
}
return;
}
}
+ else if (spi.ensured(msg)) {
+ if (msgs == null)
+ msgs = new ArrayList<>();
+
+ msgs.add(msg);
+ }
}
}
catch (IOException | IgniteCheckedException e) {
@@ -1286,23 +1315,32 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
}
- if (!topHist.isEmpty() && msg.topologyVersion() <= topHist.lastKey()) {
- if (log.isDebugEnabled())
- log.debug("Discarding node add finished message since topology already updated " +
- "[msg=" + msg + ", lastHistKey=" + topHist.lastKey() + ", node=" + node + ']');
-
- return;
- }
+ boolean evt = false;
long topVer = msg.topologyVersion();
- node.order(topVer);
- node.visible(true);
+ assert topVer > 0 : msg;
+
+ if (!node.visible()) {
+ node.order(topVer);
+ node.visible(true);
+
+ if (spi.locNodeVer.equals(node.version()))
+ node.version(spi.locNodeVer);
+
+ evt = true;
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Skip node join event, node already joined [msg=" + msg + ", node=" + node + ']');
+
+ assert node.order() == topVer : node;
+ }
- if (spi.locNodeVer.equals(node.version()))
- node.version(spi.locNodeVer);
+ Collection<ClusterNode> top = updateTopologyHistory(topVer, msg);
- NavigableSet<ClusterNode> top = updateTopologyHistory(topVer, msg);
+ assert top != null && top.contains(node) : "Topology does not contain node [msg=" + msg +
+ ", node=" + node + ", top=" + top + ']';
if (!pending && joinLatch.getCount() > 0) {
if (log.isDebugEnabled())
@@ -1311,9 +1349,11 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
}
- notifyDiscovery(EVT_NODE_JOINED, topVer, node, top);
+ if (evt) {
+ notifyDiscovery(EVT_NODE_JOINED, topVer, node, top);
- spi.stats.onNodeJoined();
+ spi.stats.onNodeJoined();
+ }
}
}
@@ -1340,7 +1380,7 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
}
- NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
+ Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
if (!pending && joinLatch.getCount() > 0) {
if (log.isDebugEnabled())
@@ -1383,7 +1423,7 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
}
- NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
+ Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
if (!pending && joinLatch.getCount() > 0) {
if (log.isDebugEnabled())
@@ -1555,7 +1595,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param node Node.
* @param top Topology snapshot.
*/
- private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top) {
+ private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top) {
notifyDiscovery(type, topVer, node, top, null);
}
@@ -1564,8 +1604,9 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param topVer Topology version.
* @param node Node.
* @param top Topology snapshot.
+ * @param data Optional custom message data.
*/
- private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top,
+ private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top,
@Nullable DiscoverySpiCustomMessage data) {
DiscoverySpiListener lsnr = spi.lsnr;
@@ -1589,7 +1630,7 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/**
- *
+ * @return Queue size.
*/
public int queueSize() {
return queue.size();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/54bfa36c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
index 62c7c1a..947ded2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.processors.cache.distributed;
+import junit.framework.*;
import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.spi.discovery.tcp.*;
@@ -102,16 +104,14 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testManyClients() throws Exception {
+ public void testManyClients() throws Throwable {
manyClientsPutGet();
}
/**
* @throws Exception If failed.
*/
- public void testManyClientsClientDiscovery() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-883");
-
+ public void testManyClientsClientDiscovery() throws Throwable {
clientDiscovery = true;
manyClientsPutGet();
@@ -121,8 +121,6 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testManyClientsSequentiallyClientDiscovery() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-883");
-
clientDiscovery = true;
manyClientsSequentially();
@@ -162,33 +160,48 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
log.info("All clients started.");
- assertEquals(SRVS + CLIENTS, G.allGrids().size());
+ checkNodes(SRVS + CLIENTS);
+
+ for (Ignite client : clients)
+ client.close();
+ }
+
+ /**
+ * @param expCnt Expected number of nodes.
+ */
+ private void checkNodes(int expCnt) {
+ assertEquals(expCnt, G.allGrids().size());
long topVer = -1L;
for (Ignite ignite : G.allGrids()) {
- assertEquals(SRVS + CLIENTS, ignite.cluster().nodes().size());
+ log.info("Check node: " + ignite.name());
if (topVer == -1L)
topVer = ignite.cluster().topologyVersion();
else
- assertEquals(topVer, ignite.cluster().topologyVersion());
- }
+ assertEquals("Unexpected topology version for node: " + ignite.name(),
+ topVer,
+ ignite.cluster().topologyVersion());
- for (Ignite client : clients)
- client.close();
+ assertEquals("Unexpected number of nodes for node: " + ignite.name(),
+ expCnt,
+ ignite.cluster().nodes().size());
+ }
}
/**
* @throws Exception If failed.
*/
- private void manyClientsPutGet() throws Exception {
+ private void manyClientsPutGet() throws Throwable {
client = true;
final AtomicInteger idx = new AtomicInteger(SRVS);
final AtomicBoolean stop = new AtomicBoolean();
+ final AtomicReference<Throwable> err = new AtomicReference<>();
+
final int THREADS = 50;
final CountDownLatch latch = new CountDownLatch(THREADS);
@@ -224,7 +237,7 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
counted = true;
- while (!stop.get()) {
+ while (!stop.get() && err.get() == null) {
key = rnd.nextInt(0, 1000);
cache.put(key, iter++);
@@ -240,6 +253,8 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
return null;
}
catch (Throwable e) {
+ err.compareAndSet(null, e);
+
log.error("Unexpected error in client thread: " + e, e);
throw e;
@@ -257,6 +272,29 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
Thread.sleep(10_000);
+ Throwable err0 = err.get();
+
+ if (err0 != null)
+ throw err0;
+
+ boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ checkNodes(SRVS + THREADS);
+
+ return true;
+ }
+ catch (AssertionFailedError e) {
+ log.info("Check failed, will retry: " + e);
+ }
+
+ return false;
+ }
+ }, 10_000);
+
+ if (!wait)
+ checkNodes(SRVS + THREADS);
+
log.info("Stop clients.");
stop.set(true);
[2/4] incubator-ignite git commit: # ignite-883 fixed test
Posted by sb...@apache.org.
# ignite-883 fixed test
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/22ec5cf2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/22ec5cf2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/22ec5cf2
Branch: refs/heads/ignite-sprint-6
Commit: 22ec5cf2a50ac7719ddc25ed5feb7ca06c491931
Parents: 54bfa36
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 16 15:58:53 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 16 15:58:53 2015 +0300
----------------------------------------------------------------------
.../IgniteCacheClientNodeConcurrentStart.java | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/22ec5cf2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeConcurrentStart.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeConcurrentStart.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeConcurrentStart.java
index bd74ece..1eff7fb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeConcurrentStart.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeConcurrentStart.java
@@ -37,7 +37,7 @@ public class IgniteCacheClientNodeConcurrentStart extends GridCommonAbstractTest
protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/** */
- private static final int NODES_CNT = 5;
+ private static final int NODES_CNT = 6;
/** */
private Set<Integer> clientNodes;
@@ -83,13 +83,21 @@ public class IgniteCacheClientNodeConcurrentStart extends GridCommonAbstractTest
clientNodes = new HashSet<>();
while (clientNodes.size() < 2)
- clientNodes.add(rnd.nextInt(0, NODES_CNT));
+ clientNodes.add(rnd.nextInt(1, NODES_CNT));
clientNodes.add(NODES_CNT - 1);
log.info("Test iteration [iter=" + i + ", clients=" + clientNodes + ']');
- startGridsMultiThreaded(NODES_CNT, true);
+ Ignite srv = startGrid(0); // Start server node first.
+
+ assertFalse(srv.configuration().isClientMode());
+
+ startGridsMultiThreaded(1, NODES_CNT - 1);
+
+ checkTopology(NODES_CNT);
+
+ awaitPartitionMapExchange();
for (int node : clientNodes) {
Ignite ignite = grid(node);
[4/4] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-883' into ignite-sprint-6
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-883' into ignite-sprint-6
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f2c4cc80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f2c4cc80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f2c4cc80
Branch: refs/heads/ignite-sprint-6
Commit: f2c4cc8017fd29adc398f3b98cde361a1156d6a6
Parents: f6b1b79 22ec5cf
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 16 16:24:52 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 16 16:24:52 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 87 ++++++++++++++------
.../IgniteCacheClientNodeConcurrentStart.java | 14 +++-
.../distributed/IgniteCacheManyClientsTest.java | 66 +++++++++++----
3 files changed, 127 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
[3/4] incubator-ignite git commit: # ignite-sprint-6 enabled test
Posted by sb...@apache.org.
# ignite-sprint-6 enabled test
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f6b1b79f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f6b1b79f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f6b1b79f
Branch: refs/heads/ignite-sprint-6
Commit: f6b1b79fa3f354bde733d729c8c99016e09956f8
Parents: a9228c0
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 16 16:20:28 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 16 16:23:59 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheVersionMultinodeTest.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f6b1b79f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionMultinodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionMultinodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionMultinodeTest.java
index cecb4a9..91dcbf1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionMultinodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionMultinodeTest.java
@@ -81,7 +81,7 @@ public class GridCacheVersionMultinodeTest extends GridCacheAbstractSelfTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-114");
+ // No-op.
}
/** {@inheritDoc} */
@@ -240,12 +240,14 @@ public class GridCacheVersionMultinodeTest extends GridCacheAbstractSelfTest {
if (e != null) {
if (ver != null) {
- assertEquals("Non-equal versions for key: " + key, ver, e.version());
+ assertEquals("Non-equal versions for key: " + key,
+ ver,
+ e instanceof GridNearCacheEntry ? ((GridNearCacheEntry)e).dhtVersion() : e.version());
verified = true;
}
else
- ver = e.version();
+ ver = e instanceof GridNearCacheEntry ? ((GridNearCacheEntry)e).dhtVersion() : e.version();
}
}