You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/10 10:50:21 UTC

[14/26] ignite git commit: ignite-1911 Added special stop method to avoid hangs inside Ignition.start.

ignite-1911 Added special stop method to avoid hangs inside Ignition.start.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e0bd3395
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e0bd3395
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e0bd3395

Branch: refs/heads/ignite-1537
Commit: e0bd3395896cd6df433b04c87d94d26b0cbe0469
Parents: 1a3c784
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 10 09:54:39 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 10 09:54:39 2015 +0300

----------------------------------------------------------------------
 .../main/java/org/apache/ignite/Ignition.java   |  2 +-
 .../org/apache/ignite/internal/IgnitionEx.java  | 24 +++++++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 11 ++--
 .../IgniteCacheEntryListenerAbstractTest.java   |  8 ++-
 ...lientDiscoverySpiFailureTimeoutSelfTest.java |  5 +-
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 63 ++++++++++++++++++--
 ...niteCacheP2pUnmarshallingQueryErrorTest.java |  4 --
 7 files changed, 95 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e0bd3395/modules/core/src/main/java/org/apache/ignite/Ignition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignition.java b/modules/core/src/main/java/org/apache/ignite/Ignition.java
index 4fdc849..99ee1d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignition.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignition.java
@@ -221,7 +221,7 @@ public class Ignition {
      *      not found).
      */
     public static boolean stop(@Nullable String name, boolean cancel) {
-        return IgnitionEx.stop(name, cancel);
+        return IgnitionEx.stop(name, cancel, false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0bd3395/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index a73fdeb..9b886e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -286,7 +286,7 @@ public class IgnitionEx {
      *      {@code false} otherwise (if it was not started).
      */
     public static boolean stop(boolean cancel) {
-        return stop(null, cancel);
+        return stop(null, cancel, false);
     }
 
     /**
@@ -304,13 +304,20 @@ public class IgnitionEx {
      *      execution. If {@code false}, then jobs currently running will not be
      *      canceled. In either case, grid node will wait for completion of all
      *      jobs running on it before stopping.
+     * @param stopNotStarted If {@code true} and node start did not finish then interrupts starting thread.
      * @return {@code true} if named grid instance was indeed found and stopped,
      *      {@code false} otherwise (the instance with given {@code name} was
      *      not found).
      */
-    public static boolean stop(@Nullable String name, boolean cancel) {
+    public static boolean stop(@Nullable String name, boolean cancel, boolean stopNotStarted) {
         IgniteNamedInstance grid = name != null ? grids.get(name) : dfltGrid;
 
+        if (grid != null && stopNotStarted && grid.startLatch.getCount() != 0) {
+            grid.starterThreadInterrupted = true;
+
+            grid.starterThread.interrupt();
+        }
+
         if (grid != null && grid.state() == STARTED) {
             grid.stop(cancel);
 
@@ -963,7 +970,15 @@ public class IgnitionEx {
         boolean success = false;
 
         try {
-            grid.start(startCtx);
+            try {
+                grid.start(startCtx);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                if (grid.starterThreadInterrupted)
+                    Thread.interrupted();
+
+                throw e;
+            }
 
             notifyStateChange(name, STARTED);
 
@@ -1413,6 +1428,9 @@ public class IgnitionEx {
         @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
         private Thread starterThread;
 
+        /** */
+        private boolean starterThreadInterrupted;
+
         /**
          * Creates un-started named instance.
          *

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0bd3395/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 865f73f..b80529b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -66,6 +66,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.util.GridBoundedLinkedHashSet;
@@ -2165,22 +2166,22 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     if (ignite != null) {
                         U.error(log, "TcpDiscoverSpi's message worker thread failed abnormally. " +
-                            "Stopping the grid in order to prevent cluster wide instability.", e);
+                            "Stopping the node in order to prevent cluster wide instability.", e);
 
                         new Thread(new Runnable() {
                             @Override public void run() {
                                 try {
-                                    ignite.close();
+                                    IgnitionEx.stop(ignite.name(), true, true);
 
-                                    U.log(log, "Stopped the grid successfully in response to TcpDiscoverySpi's " +
+                                    U.log(log, "Stopped the node successfully in response to TcpDiscoverySpi's " +
                                         "message worker thread abnormal termination.");
                                 }
                                 catch (Throwable e) {
-                                    U.error(log, "Failed to stop the grid in response to TcpDiscoverySpi's " +
+                                    U.error(log, "Failed to stop the node in response to TcpDiscoverySpi's " +
                                         "message worker thread abnormal termination.", e);
                                 }
                             }
-                        }).start();
+                        }, "node-stop-thread").start();
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0bd3395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 8a3d756..27edb0c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -113,7 +113,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         for (int i = 0; i < gridCount(); i++) {
             GridContinuousProcessor proc = grid(i).context().continuous();
 
-            ConcurrentMap<?, ?> syncMsgFuts = GridTestUtils.getFieldValue(proc, "syncMsgFuts");
+            final ConcurrentMap<?, ?> syncMsgFuts = GridTestUtils.getFieldValue(proc, "syncMsgFuts");
+
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return syncMsgFuts.size() == 0;
+                }
+            }, 5000);
 
             assertEquals(0, syncMsgFuts.size());
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0bd3395/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 6b20b2a..35aa934 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -296,7 +296,6 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
         /** {@inheritDoc} */
         @Override protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout)
             throws IOException, IgniteCheckedException {
-
             if (readDelay < failureDetectionTimeout()) {
                 try {
                     return super.readMessage(sock, in, timeout);
@@ -313,9 +312,11 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
                 if (msg instanceof TcpDiscoveryPingRequest) {
                     try {
                         Thread.sleep(2000);
-                    } catch (InterruptedException e) {
+                    }
+                    catch (InterruptedException e) {
                         // Ignore
                     }
+
                     throw new SocketTimeoutException("Forced timeout");
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0bd3395/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 42960e7..862e780 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -1334,7 +1334,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
      */
     public void testNodeShutdownOnRingMessageWorkerFailure() throws Exception {
         try {
-            TestMessageWorkerFailureSpi spi0 = new TestMessageWorkerFailureSpi();
+            TestMessageWorkerFailureSpi1 spi0 = new TestMessageWorkerFailureSpi1();
 
             nodeSpi.set(spi0);
 
@@ -1351,9 +1351,9 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
             final UUID failedNodeId = ignite0.cluster().localNode().id();
 
             ignite1.events().localListen(new IgnitePredicate<Event>() {
-                @Override public boolean apply(Event event) {
-                    if (event.type() == EventType.EVT_NODE_FAILED &&
-                        failedNodeId.equals(((DiscoveryEvent)event).eventNode().id()))
+                @Override public boolean apply(Event evt) {
+                    if (evt.type() == EventType.EVT_NODE_FAILED &&
+                        failedNodeId.equals(((DiscoveryEvent)evt).eventNode().id()))
                         disconnected.set(true);
 
                     latch.countDown();
@@ -1382,6 +1382,38 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
             stopAllGrids();
         }
     }
+    /**
+     * @throws Exception If failed
+     */
+    public void testNodeShutdownOnRingMessageWorkerStartNotFinished() throws Exception {
+        try {
+            Ignite ignite0 = startGrid(0);
+
+            TestMessageWorkerFailureSpi2 spi0 = new TestMessageWorkerFailureSpi2();
+
+            nodeSpi.set(spi0);
+
+            try {
+                startGrid(1);
+
+                fail();
+            }
+            catch (Exception e) {
+                log.error("Expected error: " + e, e);
+            }
+
+            Ignite ignite1 = startGrid(1);
+
+            assertEquals(2, ignite1.cluster().nodes().size());
+            assertEquals(4, ignite1.cluster().topologyVersion());
+
+            assertEquals(2, ignite0.cluster().nodes().size());
+            assertEquals(4, ignite0.cluster().topologyVersion());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
 
 
     /**
@@ -1952,11 +1984,10 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     /**
      *
      */
-    private static class TestMessageWorkerFailureSpi extends TcpDiscoverySpi {
+    private static class TestMessageWorkerFailureSpi1 extends TcpDiscoverySpi {
         /** */
         private volatile boolean stop;
 
-
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
             GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
@@ -1969,6 +2000,26 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     *
+     */
+    private static class TestMessageWorkerFailureSpi2 extends TcpDiscoverySpi {
+        /** */
+        private volatile boolean stop;
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
+            if (stop)
+                throw new RuntimeException("Failing ring message worker explicitly");
+
+            super.writeToSocket(sock, msg, bout, timeout);
+
+            if (msg instanceof TcpDiscoveryNodeAddedMessage)
+                stop = true;
+        }
+    }
+
+    /**
      * Starts new grid with given index. Method optimize is not invoked.
      *
      * @param idx Index of the grid to start.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0bd3395/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java
index 6f8ca2d..a92451f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java
@@ -76,10 +76,6 @@ public class IgniteCacheP2pUnmarshallingQueryErrorTest extends IgniteCacheP2pUnm
                 private void readObject(ObjectInputStream is) throws IOException {
                     throw new IOException();
                 }
-
-                private void writeObject(ObjectOutputStream os) throws IOException {
-                    throw new IOException();
-                }
             })).getAll();
 
             assertTrue("Request unmarshalling failed, but error response was not sent.", portableMarshaller());