You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/10 10:50:21 UTC
[14/26] ignite git commit: ignite-1911 Added special stop method to
avoid hangs inside Ignition.start.
ignite-1911 Added special stop method to avoid hangs inside Ignition.start.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e0bd3395
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e0bd3395
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e0bd3395
Branch: refs/heads/ignite-1537
Commit: e0bd3395896cd6df433b04c87d94d26b0cbe0469
Parents: 1a3c784
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 10 09:54:39 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 10 09:54:39 2015 +0300
----------------------------------------------------------------------
.../main/java/org/apache/ignite/Ignition.java | 2 +-
.../org/apache/ignite/internal/IgnitionEx.java | 24 +++++++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 11 ++--
.../IgniteCacheEntryListenerAbstractTest.java | 8 ++-
...lientDiscoverySpiFailureTimeoutSelfTest.java | 5 +-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 63 ++++++++++++++++++--
...niteCacheP2pUnmarshallingQueryErrorTest.java | 4 --
7 files changed, 95 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0bd3395/modules/core/src/main/java/org/apache/ignite/Ignition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignition.java b/modules/core/src/main/java/org/apache/ignite/Ignition.java
index 4fdc849..99ee1d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignition.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignition.java
@@ -221,7 +221,7 @@ public class Ignition {
* not found).
*/
public static boolean stop(@Nullable String name, boolean cancel) {
- return IgnitionEx.stop(name, cancel);
+ return IgnitionEx.stop(name, cancel, false);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0bd3395/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index a73fdeb..9b886e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -286,7 +286,7 @@ public class IgnitionEx {
* {@code false} otherwise (if it was not started).
*/
public static boolean stop(boolean cancel) {
- return stop(null, cancel);
+ return stop(null, cancel, false);
}
/**
@@ -304,13 +304,20 @@ public class IgnitionEx {
* execution. If {@code false}, then jobs currently running will not be
* canceled. In either case, grid node will wait for completion of all
* jobs running on it before stopping.
+ * @param stopNotStarted If {@code true} and node start did not finish then interrupts starting thread.
* @return {@code true} if named grid instance was indeed found and stopped,
* {@code false} otherwise (the instance with given {@code name} was
* not found).
*/
- public static boolean stop(@Nullable String name, boolean cancel) {
+ public static boolean stop(@Nullable String name, boolean cancel, boolean stopNotStarted) {
IgniteNamedInstance grid = name != null ? grids.get(name) : dfltGrid;
+ if (grid != null && stopNotStarted && grid.startLatch.getCount() != 0) {
+ grid.starterThreadInterrupted = true;
+
+ grid.starterThread.interrupt();
+ }
+
if (grid != null && grid.state() == STARTED) {
grid.stop(cancel);
@@ -963,7 +970,15 @@ public class IgnitionEx {
boolean success = false;
try {
- grid.start(startCtx);
+ try {
+ grid.start(startCtx);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ if (grid.starterThreadInterrupted)
+ Thread.interrupted();
+
+ throw e;
+ }
notifyStateChange(name, STARTED);
@@ -1413,6 +1428,9 @@ public class IgnitionEx {
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private Thread starterThread;
+ /** */
+ private boolean starterThreadInterrupted;
+
/**
* Creates un-started named instance.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0bd3395/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 865f73f..b80529b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -66,6 +66,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.util.GridBoundedLinkedHashSet;
@@ -2165,22 +2166,22 @@ class ServerImpl extends TcpDiscoveryImpl {
if (ignite != null) {
U.error(log, "TcpDiscoverSpi's message worker thread failed abnormally. " +
- "Stopping the grid in order to prevent cluster wide instability.", e);
+ "Stopping the node in order to prevent cluster wide instability.", e);
new Thread(new Runnable() {
@Override public void run() {
try {
- ignite.close();
+ IgnitionEx.stop(ignite.name(), true, true);
- U.log(log, "Stopped the grid successfully in response to TcpDiscoverySpi's " +
+ U.log(log, "Stopped the node successfully in response to TcpDiscoverySpi's " +
"message worker thread abnormal termination.");
}
catch (Throwable e) {
- U.error(log, "Failed to stop the grid in response to TcpDiscoverySpi's " +
+ U.error(log, "Failed to stop the node in response to TcpDiscoverySpi's " +
"message worker thread abnormal termination.", e);
}
}
- }).start();
+ }, "node-stop-thread").start();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0bd3395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 8a3d756..27edb0c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -113,7 +113,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
for (int i = 0; i < gridCount(); i++) {
GridContinuousProcessor proc = grid(i).context().continuous();
- ConcurrentMap<?, ?> syncMsgFuts = GridTestUtils.getFieldValue(proc, "syncMsgFuts");
+ final ConcurrentMap<?, ?> syncMsgFuts = GridTestUtils.getFieldValue(proc, "syncMsgFuts");
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return syncMsgFuts.size() == 0;
+ }
+ }, 5000);
assertEquals(0, syncMsgFuts.size());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0bd3395/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 6b20b2a..35aa934 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -296,7 +296,6 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
/** {@inheritDoc} */
@Override protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout)
throws IOException, IgniteCheckedException {
-
if (readDelay < failureDetectionTimeout()) {
try {
return super.readMessage(sock, in, timeout);
@@ -313,9 +312,11 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
if (msg instanceof TcpDiscoveryPingRequest) {
try {
Thread.sleep(2000);
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
// Ignore
}
+
throw new SocketTimeoutException("Forced timeout");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0bd3395/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 42960e7..862e780 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -1334,7 +1334,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
*/
public void testNodeShutdownOnRingMessageWorkerFailure() throws Exception {
try {
- TestMessageWorkerFailureSpi spi0 = new TestMessageWorkerFailureSpi();
+ TestMessageWorkerFailureSpi1 spi0 = new TestMessageWorkerFailureSpi1();
nodeSpi.set(spi0);
@@ -1351,9 +1351,9 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
final UUID failedNodeId = ignite0.cluster().localNode().id();
ignite1.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event event) {
- if (event.type() == EventType.EVT_NODE_FAILED &&
- failedNodeId.equals(((DiscoveryEvent)event).eventNode().id()))
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EventType.EVT_NODE_FAILED &&
+ failedNodeId.equals(((DiscoveryEvent)evt).eventNode().id()))
disconnected.set(true);
latch.countDown();
@@ -1382,6 +1382,38 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
stopAllGrids();
}
}
+ /**
+ * @throws Exception If failed
+ */
+ public void testNodeShutdownOnRingMessageWorkerStartNotFinished() throws Exception {
+ try {
+ Ignite ignite0 = startGrid(0);
+
+ TestMessageWorkerFailureSpi2 spi0 = new TestMessageWorkerFailureSpi2();
+
+ nodeSpi.set(spi0);
+
+ try {
+ startGrid(1);
+
+ fail();
+ }
+ catch (Exception e) {
+ log.error("Expected error: " + e, e);
+ }
+
+ Ignite ignite1 = startGrid(1);
+
+ assertEquals(2, ignite1.cluster().nodes().size());
+ assertEquals(4, ignite1.cluster().topologyVersion());
+
+ assertEquals(2, ignite0.cluster().nodes().size());
+ assertEquals(4, ignite0.cluster().topologyVersion());
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
/**
@@ -1952,11 +1984,10 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
/**
*
*/
- private static class TestMessageWorkerFailureSpi extends TcpDiscoverySpi {
+ private static class TestMessageWorkerFailureSpi1 extends TcpDiscoverySpi {
/** */
private volatile boolean stop;
-
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
@@ -1969,6 +2000,26 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/**
+ *
+ */
+ private static class TestMessageWorkerFailureSpi2 extends TcpDiscoverySpi {
+ /** */
+ private volatile boolean stop;
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
+ if (stop)
+ throw new RuntimeException("Failing ring message worker explicitly");
+
+ super.writeToSocket(sock, msg, bout, timeout);
+
+ if (msg instanceof TcpDiscoveryNodeAddedMessage)
+ stop = true;
+ }
+ }
+
+ /**
* Starts new grid with given index. Method optimize is not invoked.
*
* @param idx Index of the grid to start.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0bd3395/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java
index 6f8ca2d..a92451f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java
@@ -76,10 +76,6 @@ public class IgniteCacheP2pUnmarshallingQueryErrorTest extends IgniteCacheP2pUnm
private void readObject(ObjectInputStream is) throws IOException {
throw new IOException();
}
-
- private void writeObject(ObjectOutputStream os) throws IOException {
- throw new IOException();
- }
})).getAll();
assertTrue("Request unmarshalling failed, but error response was not sent.", portableMarshaller());