You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/30 10:26:39 UTC

[46/49] ignite git commit: Fixed communication subsystem stop notification.

Fixed communication subsystem stop notification.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59f37266
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59f37266
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59f37266

Branch: refs/heads/ignite-1537
Commit: 59f3726696ec47b52f68103aa8250d9f2015b49b
Parents: 3a6a463
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Sat Nov 28 18:46:51 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Sat Nov 28 18:46:51 2015 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 88 +-------------------
 .../distributed/dht/GridDhtTxPrepareFuture.java |  4 +-
 .../cache/transactions/IgniteTxHandler.java     |  4 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 17 ++--
 .../ignite/testframework/GridTestUtils.java     | 61 +++++++++++++-
 5 files changed, 76 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index ea82d7f..a8557cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -46,8 +46,6 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteComponentType;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -103,9 +101,6 @@ import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIM
  * Grid communication manager.
  */
 public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializable>> {
-    /** */
-    public static volatile boolean TURBO_DEBUG_MODE;
-
     /** Empty array of message factories. */
     public static final MessageFactory[] EMPTY = {};
 
@@ -775,7 +770,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     private void processRegularMessage(
         final UUID nodeId,
         final GridIoMessage msg,
-        byte plc,
+        final byte plc,
         final IgniteRunnable msgC
     ) throws IgniteCheckedException {
         Runnable c = new Runnable() {
@@ -956,7 +951,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         if (msgC == null) {
             // Message from local node can be processed in sync manner.
-            assert locNodeId.equals(nodeId) || TURBO_DEBUG_MODE;
+            assert locNodeId.equals(nodeId);
 
             unwindMessageSet(set, lsnr);
 
@@ -1089,85 +1084,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
-     * This method can be used for debugging tricky concurrency issues
-     * with multi-nodes in single JVM.
-     * <p>
-     * This method eliminates network between nodes started in single JVM
-     * when {@link #TURBO_DEBUG_MODE} is set to {@code true}.
-     * <p>
-     * How to use it:
-     * <ol>
-     *     <li>Replace {@link #send(ClusterNode, Object, int, Message, byte, boolean, long, boolean, IgniteInClosure)}
-     *          with this method.</li>
-     *     <li>Start all grids for your test, then set {@link #TURBO_DEBUG_MODE} to {@code true}.</li>
-     *     <li>Perform test operations on the topology. No network will be there.</li>
-     *     <li>DO NOT turn on turbo debug before all grids started. This will cause deadlocks.</li>
-     * </ol>
-     *
-     * @param node Destination node.
-     * @param topic Topic to send the message to.
-     * @param topicOrd GridTopic enumeration ordinal.
-     * @param msg Message to send.
-     * @param plc Type of processing.
-     * @param ordered Ordered flag.
-     * @param timeout Timeout.
-     * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    private void sendTurboDebug(
-        ClusterNode node,
-        Object topic,
-        int topicOrd,
-        Message msg,
-        byte plc,
-        boolean ordered,
-        long timeout,
-        boolean skipOnTimeout
-    ) throws IgniteCheckedException {
-        assert node != null;
-        assert topic != null;
-        assert msg != null;
-
-        GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
-
-        IgniteKernal rmt;
-
-        if (locNodeId.equals(node.id())) {
-            assert plc != P2P_POOL;
-
-            CommunicationListener commLsnr = this.commLsnr;
-
-            if (commLsnr == null)
-                throw new IgniteCheckedException("Trying to send message when grid is not fully started.");
-
-            if (ordered)
-                processOrderedMessage(locNodeId, ioMsg, plc, null);
-            else
-                processRegularMessage0(ioMsg, locNodeId);
-        }
-        else if (TURBO_DEBUG_MODE && (rmt = IgnitionEx.gridxx(locNodeId)) != null) {
-            if (ioMsg.isOrdered())
-                rmt.context().io().processOrderedMessage(locNodeId, ioMsg, ioMsg.policy(), null);
-            else
-                rmt.context().io().processRegularMessage0(ioMsg, locNodeId);
-        }
-        else {
-            if (topicOrd < 0)
-                ioMsg.topicBytes(marsh.marshal(topic));
-
-            try {
-                getSpi().sendMessage(node, ioMsg);
-            }
-            catch (IgniteSpiException e) {
-                throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +
-                    "TCP connection cannot be established due to firewall issues) " +
-                    "[node=" + node + ", topic=" + topic +
-                    ", msg=" + msg + ", policy=" + plc + ']', e);
-            }
-        }
-    }
-
-    /**
      * @param nodeId Id of destination node.
      * @param topic Topic to send the message to.
      * @param msg Message to send.

http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 34addfa..9f1f8a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -81,8 +81,6 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
@@ -1208,7 +1206,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         assert req.transactionNodes() != null;
 
                         try {
-                            cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                            cctx.io().send(nearMapping.node(), req, tx.ioPolicy());
                         }
                         catch (ClusterTopologyCheckedException e) {
                             fut.onNodeLeft(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 61a9bed..91ebfd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -842,7 +842,7 @@ public class IgniteTxHandler {
 
         try {
             // Reply back to sender.
-            ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+            ctx.io().send(nodeId, res, req.policy());
         }
         catch (IgniteCheckedException e) {
             if (e instanceof ClusterTopologyCheckedException) {
@@ -1060,7 +1060,7 @@ public class IgniteTxHandler {
             }
 
             try {
-                ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                ctx.io().send(nodeId, res, req.policy());
             }
             catch (Throwable e) {
                 // Double-check.

http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 68e2f43..b2f0f65 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -838,6 +838,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Context initialization latch. */
     private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
 
+    /** Stopping flag (set to {@code true} when SPI gets stopping signal). */
+    private volatile boolean stopping;
+
     /** metrics listener. */
     private final GridNioMetricsListener metricsLsnr = new GridNioMetricsListener() {
         @Override public void onBytesSent(int bytesCnt) {
@@ -1794,6 +1797,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** {@inheritDoc} */
     @Override protected void onContextDestroyed0() {
+        stopping = true;
+
         if (ctxInitLatch.getCount() > 0)
             // Safety.
             ctxInitLatch.countDown();
@@ -1976,7 +1981,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             GridCommunicationClient client = clients.get(nodeId);
 
             if (client == null) {
-                if (isNodeStopping())
+                if (stopping)
                     throw new IgniteSpiException("Node is stopping.");
 
                 // Do not allow concurrent connects.
@@ -2311,8 +2316,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         U.closeQuiet(ch);
 
-                        throw new ClusterTopologyCheckedException("Failed to send message, " +
-                            "node left cluster: " + node);
+                        throw new ClusterTopologyCheckedException("Failed to send message " +
+                            "(node left topology): " + node);
                     }
 
                     long rcvCnt = -1;
@@ -2784,18 +2789,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @return Node ID message.
      */
     private NodeIdMessage nodeIdMessage() {
-        ClusterNode localNode = getLocalNode();
+        ClusterNode locNode = getLocalNode();
 
         UUID id;
 
-        if (localNode == null) {
+        if (locNode == null) {
             U.warn(log, "Local node is not started or fully initialized [isStopping=" +
                     getSpiContext().isStopping() + ']');
 
             id = new UUID(0, 0);
         }
         else
-            id = localNode.id();
+            id = locNode.id();
 
         return new NodeIdMessage(id);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index d1c3d9f..7116227 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -42,10 +42,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Queue;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.cache.CacheException;
 import javax.cache.configuration.Factory;
@@ -81,6 +86,7 @@ import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridAbsClosure;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -88,6 +94,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
 import org.apache.ignite.ssl.SslContextFactory;
 import org.apache.ignite.testframework.config.GridTestProperties;
@@ -147,6 +154,9 @@ public final class GridTestUtils {
     /** */
     private static final GridBusyLock busyLock = new GridBusyLock();
 
+    /** */
+    public static final ConcurrentMap<IgnitePair<UUID>, IgnitePair<Queue<Message>>> msgMap = new ConcurrentHashMap<>();
+
     /**
      * Ensure singleton.
      */
@@ -155,6 +165,55 @@ public final class GridTestUtils {
     }
 
     /**
+     * @param from From node ID.
+     * @param to To node ID.
+     * @param msg Message.
+     * @param sent Sent or received.
+     */
+    public static void addMessage(UUID from, UUID to, Message msg, boolean sent) {
+        IgnitePair<UUID> key = F.pair(from, to);
+
+        IgnitePair<Queue<Message>> val = msgMap.get(key);
+
+        if (val == null) {
+            IgnitePair<Queue<Message>> old = msgMap.putIfAbsent(key,
+                val = F.<Queue<Message>>pair(new ConcurrentLinkedQueue<Message>(), new ConcurrentLinkedQueue<Message>()));
+
+            if (old != null)
+                val = old;
+        }
+
+        (sent ? val.get1() : val.get2()).add(msg);
+    }
+
+    /**
+     * Dumps all messages tracked with {@link #addMessage(UUID, UUID, Message, boolean)} to std out.
+     */
+    public static void dumpMessages() {
+        for (Map.Entry<IgnitePair<UUID>, IgnitePair<Queue<Message>>> entry : msgMap.entrySet()) {
+            U.debug("\n" + entry.getKey().get1() + " [sent to] " + entry.getKey().get2());
+
+            for (Message message : entry.getValue().get1())
+                U.debug("\t" + message);
+
+            U.debug(entry.getKey().get2() + " [received from] " + entry.getKey().get1());
+
+            for (Message message : entry.getValue().get2())
+                U.debug("\t" + message);
+        }
+    }
+
+//    static {
+//        new Thread(new Runnable() {
+//            @Override public void run() {
+//                JOptionPane.showMessageDialog(null, "Close this to dump messages.");
+//
+//                dumpMessages();
+//            }
+//        }).start();
+//    }
+
+    /**
      * Checks whether callable throws expected exception or not.
      *
      * @param log Logger (optional).
@@ -1728,4 +1787,4 @@ public final class GridTestUtils {
         /** Evict to offheap with eviction policy + evict from offheap to swap when max offheap memory limit is reached. */
         OFFHEAP_EVICT_SWAP,
     }
-}
\ No newline at end of file
+}