You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/07/08 11:00:01 UTC
[49/50] incubator-ignite git commit: merge from ignite-747
merge from ignite-747
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4031db76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4031db76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4031db76
Branch: refs/heads/ignite-843
Commit: 4031db76d2bd9992001a5b63f17af7739e82cff0
Parents: 0f1b31a
Author: Denis Magda <dm...@gridgain.com>
Authored: Wed Jul 8 10:19:11 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Jul 8 10:19:11 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 20 +++++++++++
.../tcp/internal/TcpDiscoveryNode.java | 2 +-
.../tcp/internal/TcpDiscoveryNodesRing.java | 8 ++++-
.../tcp/internal/TcpDiscoveryStatistics.java | 10 ++++--
...acheAtomicReplicatedNodeRestartSelfTest.java | 8 ++---
.../tcp/TcpDiscoveryMultiThreadedTest.java | 38 ++++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 3 ++
7 files changed, 81 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4031db76/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index f8fae34..d51293e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2881,6 +2881,24 @@ class ServerImpl extends TcpDiscoveryImpl {
msg.verify(locNodeId);
}
+ else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) {
+ // Local node already has node from message in local topology.
+ // Just pass it to coordinator via the ring.
+ if (ring.hasRemoteNodes())
+ sendMessageAcrossRing(msg);
+
+ if (log.isDebugEnabled())
+ log.debug("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " +
+ "coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode="
+ + locNode + ", msg=" + msg + ']');
+
+ if (debugMode)
+ debugLog("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " +
+ "coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode="
+ + locNode + ", msg=" + msg + ']');
+
+ return;
+ }
if (msg.verified() && !locNodeId.equals(node.id())) {
if (node.internalOrder() <= ring.maxInternalOrder()) {
@@ -3163,6 +3181,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.verified() && locNodeId.equals(nodeId) && spiStateCopy() == CONNECTING) {
assert node != null;
+ assert topVer > 0 : "Invalid topology version: " + msg;
+
ring.topologyVersion(topVer);
node.order(topVer);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4031db76/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 36ae39e..4b4df45 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -300,7 +300,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
* @param order Order of the node.
*/
public void order(long order) {
- assert order >= 0 : "Order is invalid: " + this;
+ assert order > 0 : "Order is invalid: " + this;
this.order = order;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4031db76/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index e9eaa1d..acb479d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -34,7 +34,13 @@ public class TcpDiscoveryNodesRing {
/** Visible nodes filter. */
public static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() {
@Override public boolean apply(TcpDiscoveryNode node) {
- return node.visible();
+ if (node.visible()) {
+ assert node.order() > 0 : "Invalid node order: " + node;
+
+ return true;
+ }
+
+ return false;
}
};
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4031db76/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
index da8c4ea..377d8a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
@@ -256,7 +256,10 @@ public class TcpDiscoveryStatistics {
if (maxMsgQueueTime < duration)
maxMsgQueueTime = duration;
- avgMsgQueueTime = (avgMsgQueueTime * (totalReceivedMessages() -1)) / totalProcessedMessages();
+ int totalProcMsgs = totalProcessedMessages();
+
+ if (totalProcMsgs != 0)
+ avgMsgQueueTime = (avgMsgQueueTime * (totalProcMsgs - 1)) / totalProcMsgs;
}
msgsProcStartTs.put(msg.id(), U.currentTimeMillis());
@@ -275,7 +278,10 @@ public class TcpDiscoveryStatistics {
if (startTs != null) {
long duration = U.currentTimeMillis() - startTs;
- avgMsgProcTime = (avgMsgProcTime * (totalProcessedMessages() - 1) + duration) / totalProcessedMessages();
+ int totalProcMsgs = totalProcessedMessages();
+
+ if (totalProcMsgs != 0)
+ avgMsgProcTime = (avgMsgProcTime * (totalProcMsgs - 1) + duration) / totalProcMsgs;
if (duration > maxMsgProcTime) {
maxMsgProcTime = duration;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4031db76/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
index 54409d1..b4ed18d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java
@@ -26,17 +26,17 @@ import static org.apache.ignite.cache.CacheAtomicityMode.*;
*/
public class IgniteCacheAtomicReplicatedNodeRestartSelfTest extends GridCacheReplicatedNodeRestartSelfTest {
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-747");
+ @Override public void testRestartWithPutSixNodesTwoBackups() throws Throwable {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1095");
}
/** {@inheritDoc} */
- @Override public void testRestartWithPutSixNodesTwoBackups() throws Throwable {
+ @Override public void testRestartWithPutEightNodesTwoBackups() throws Throwable {
fail("https://issues.apache.org/jira/browse/IGNITE-1095");
}
/** {@inheritDoc} */
- @Override public void testRestartWithPutEightNodesTwoBackups() throws Throwable {
+ @Override public void testRestartWithPutTenNodesTwoBackups() throws Throwable {
fail("https://issues.apache.org/jira/browse/IGNITE-1095");
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4031db76/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index cfefff4..0bf7cad 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -21,8 +21,10 @@ import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
import org.apache.ignite.testframework.junits.common.*;
import java.util.concurrent.*;
@@ -100,6 +102,8 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
* @throws Exception If any error occurs.
*/
public void testMultiThreaded() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1100");
+
execute();
}
@@ -126,6 +130,40 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If any error occurs.
+ */
+ public void testMultipleStartOnCoordinatorStop() throws Exception{
+ startGrids(GRID_CNT);
+
+ final CyclicBarrier barrier = new CyclicBarrier(GRID_CNT + 4);
+
+ final AtomicInteger startIdx = new AtomicInteger(GRID_CNT);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ barrier.await();
+
+ Ignite ignite = startGrid(startIdx.getAndIncrement());
+
+ assertFalse(ignite.configuration().isClientMode());
+
+ log.info("Started node: " + ignite.name());
+
+ return null;
+ }
+ }, GRID_CNT + 3, "start-thread");
+
+ barrier.await();
+
+ U.sleep(ThreadLocalRandom.current().nextInt(10, 100));
+
+ for (int i = 0; i < GRID_CNT; i++)
+ stopGrid(i);
+
+ fut.get();
+ }
+
+ /**
* @throws Exception If failed.
*/
private void execute() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4031db76/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 498f50c..6f59f14 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -57,6 +57,9 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(TcpDiscoveryNodeConsistentIdSelfTest.class));
+ suite.addTest(new TestSuite(TcpDiscoveryRestartTest.class));
+ suite.addTest(new TestSuite(TcpDiscoveryMultiThreadedTest.class));
+
return suite;
}
}