You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/30 10:26:39 UTC
[46/49] ignite git commit: Fixed communication subsystem stop
notification.
Fixed communication subsystem stop notification.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59f37266
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59f37266
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59f37266
Branch: refs/heads/ignite-1537
Commit: 59f3726696ec47b52f68103aa8250d9f2015b49b
Parents: 3a6a463
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Sat Nov 28 18:46:51 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Sat Nov 28 18:46:51 2015 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 88 +-------------------
.../distributed/dht/GridDhtTxPrepareFuture.java | 4 +-
.../cache/transactions/IgniteTxHandler.java | 4 +-
.../communication/tcp/TcpCommunicationSpi.java | 17 ++--
.../ignite/testframework/GridTestUtils.java | 61 +++++++++++++-
5 files changed, 76 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index ea82d7f..a8557cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -46,8 +46,6 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteComponentType;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -103,9 +101,6 @@ import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIM
* Grid communication manager.
*/
public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializable>> {
- /** */
- public static volatile boolean TURBO_DEBUG_MODE;
-
/** Empty array of message factories. */
public static final MessageFactory[] EMPTY = {};
@@ -775,7 +770,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
private void processRegularMessage(
final UUID nodeId,
final GridIoMessage msg,
- byte plc,
+ final byte plc,
final IgniteRunnable msgC
) throws IgniteCheckedException {
Runnable c = new Runnable() {
@@ -956,7 +951,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (msgC == null) {
// Message from local node can be processed in sync manner.
- assert locNodeId.equals(nodeId) || TURBO_DEBUG_MODE;
+ assert locNodeId.equals(nodeId);
unwindMessageSet(set, lsnr);
@@ -1089,85 +1084,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
- * This method can be used for debugging tricky concurrency issues
- * with multi-nodes in single JVM.
- * <p>
- * This method eliminates network between nodes started in single JVM
- * when {@link #TURBO_DEBUG_MODE} is set to {@code true}.
- * <p>
- * How to use it:
- * <ol>
- * <li>Replace {@link #send(ClusterNode, Object, int, Message, byte, boolean, long, boolean, IgniteInClosure)}
- * with this method.</li>
- * <li>Start all grids for your test, then set {@link #TURBO_DEBUG_MODE} to {@code true}.</li>
- * <li>Perform test operations on the topology. No network will be there.</li>
- * <li>DO NOT turn on turbo debug before all grids started. This will cause deadlocks.</li>
- * </ol>
- *
- * @param node Destination node.
- * @param topic Topic to send the message to.
- * @param topicOrd GridTopic enumeration ordinal.
- * @param msg Message to send.
- * @param plc Type of processing.
- * @param ordered Ordered flag.
- * @param timeout Timeout.
- * @param skipOnTimeout Whether message can be skipped on timeout.
- * @throws IgniteCheckedException Thrown in case of any errors.
- */
- private void sendTurboDebug(
- ClusterNode node,
- Object topic,
- int topicOrd,
- Message msg,
- byte plc,
- boolean ordered,
- long timeout,
- boolean skipOnTimeout
- ) throws IgniteCheckedException {
- assert node != null;
- assert topic != null;
- assert msg != null;
-
- GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
-
- IgniteKernal rmt;
-
- if (locNodeId.equals(node.id())) {
- assert plc != P2P_POOL;
-
- CommunicationListener commLsnr = this.commLsnr;
-
- if (commLsnr == null)
- throw new IgniteCheckedException("Trying to send message when grid is not fully started.");
-
- if (ordered)
- processOrderedMessage(locNodeId, ioMsg, plc, null);
- else
- processRegularMessage0(ioMsg, locNodeId);
- }
- else if (TURBO_DEBUG_MODE && (rmt = IgnitionEx.gridxx(locNodeId)) != null) {
- if (ioMsg.isOrdered())
- rmt.context().io().processOrderedMessage(locNodeId, ioMsg, ioMsg.policy(), null);
- else
- rmt.context().io().processRegularMessage0(ioMsg, locNodeId);
- }
- else {
- if (topicOrd < 0)
- ioMsg.topicBytes(marsh.marshal(topic));
-
- try {
- getSpi().sendMessage(node, ioMsg);
- }
- catch (IgniteSpiException e) {
- throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +
- "TCP connection cannot be established due to firewall issues) " +
- "[node=" + node + ", topic=" + topic +
- ", msg=" + msg + ", policy=" + plc + ']', e);
- }
- }
- }
-
- /**
* @param nodeId Id of destination node.
* @param topic Topic to send the message to.
* @param msg Message to send.
http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 34addfa..9f1f8a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -81,8 +81,6 @@ import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
@@ -1208,7 +1206,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
assert req.transactionNodes() != null;
try {
- cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ cctx.io().send(nearMapping.node(), req, tx.ioPolicy());
}
catch (ClusterTopologyCheckedException e) {
fut.onNodeLeft(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 61a9bed..91ebfd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -842,7 +842,7 @@ public class IgniteTxHandler {
try {
// Reply back to sender.
- ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ ctx.io().send(nodeId, res, req.policy());
}
catch (IgniteCheckedException e) {
if (e instanceof ClusterTopologyCheckedException) {
@@ -1060,7 +1060,7 @@ public class IgniteTxHandler {
}
try {
- ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ ctx.io().send(nodeId, res, req.policy());
}
catch (Throwable e) {
// Double-check.
http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 68e2f43..b2f0f65 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -838,6 +838,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Context initialization latch. */
private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
+ /** Stopping flag (set to {@code true} when SPI gets stopping signal). */
+ private volatile boolean stopping;
+
/** metrics listener. */
private final GridNioMetricsListener metricsLsnr = new GridNioMetricsListener() {
@Override public void onBytesSent(int bytesCnt) {
@@ -1794,6 +1797,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** {@inheritDoc} */
@Override protected void onContextDestroyed0() {
+ stopping = true;
+
if (ctxInitLatch.getCount() > 0)
// Safety.
ctxInitLatch.countDown();
@@ -1976,7 +1981,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
GridCommunicationClient client = clients.get(nodeId);
if (client == null) {
- if (isNodeStopping())
+ if (stopping)
throw new IgniteSpiException("Node is stopping.");
// Do not allow concurrent connects.
@@ -2311,8 +2316,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
U.closeQuiet(ch);
- throw new ClusterTopologyCheckedException("Failed to send message, " +
- "node left cluster: " + node);
+ throw new ClusterTopologyCheckedException("Failed to send message " +
+ "(node left topology): " + node);
}
long rcvCnt = -1;
@@ -2784,18 +2789,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @return Node ID message.
*/
private NodeIdMessage nodeIdMessage() {
- ClusterNode localNode = getLocalNode();
+ ClusterNode locNode = getLocalNode();
UUID id;
- if (localNode == null) {
+ if (locNode == null) {
U.warn(log, "Local node is not started or fully initialized [isStopping=" +
getSpiContext().isStopping() + ']');
id = new UUID(0, 0);
}
else
- id = localNode.id();
+ id = locNode.id();
return new NodeIdMessage(id);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index d1c3d9f..7116227 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -42,10 +42,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Queue;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.CacheException;
import javax.cache.configuration.Factory;
@@ -81,6 +86,7 @@ import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridAbsClosure;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
@@ -88,6 +94,7 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
import org.apache.ignite.ssl.SslContextFactory;
import org.apache.ignite.testframework.config.GridTestProperties;
@@ -147,6 +154,9 @@ public final class GridTestUtils {
/** */
private static final GridBusyLock busyLock = new GridBusyLock();
+ /** */
+ public static final ConcurrentMap<IgnitePair<UUID>, IgnitePair<Queue<Message>>> msgMap = new ConcurrentHashMap<>();
+
/**
* Ensure singleton.
*/
@@ -155,6 +165,55 @@ public final class GridTestUtils {
}
/**
+ * @param from From node ID.
+ * @param to To node ID.
+ * @param msg Message.
+ * @param sent Sent or received.
+ */
+ public static void addMessage(UUID from, UUID to, Message msg, boolean sent) {
+ IgnitePair<UUID> key = F.pair(from, to);
+
+ IgnitePair<Queue<Message>> val = msgMap.get(key);
+
+ if (val == null) {
+ IgnitePair<Queue<Message>> old = msgMap.putIfAbsent(key,
+ val = F.<Queue<Message>>pair(new ConcurrentLinkedQueue<Message>(), new ConcurrentLinkedQueue<Message>()));
+
+ if (old != null)
+ val = old;
+ }
+
+ (sent ? val.get1() : val.get2()).add(msg);
+ }
+
+ /**
+ * Dumps all messages tracked with {@link #addMessage(UUID, UUID, Message, boolean)} to std out.
+ */
+ public static void dumpMessages() {
+ for (Map.Entry<IgnitePair<UUID>, IgnitePair<Queue<Message>>> entry : msgMap.entrySet()) {
+ U.debug("\n" + entry.getKey().get1() + " [sent to] " + entry.getKey().get2());
+
+ for (Message message : entry.getValue().get1())
+ U.debug("\t" + message);
+
+ U.debug(entry.getKey().get2() + " [received from] " + entry.getKey().get1());
+
+ for (Message message : entry.getValue().get2())
+ U.debug("\t" + message);
+ }
+ }
+
+// static {
+// new Thread(new Runnable() {
+// @Override public void run() {
+// JOptionPane.showMessageDialog(null, "Close this to dump messages.");
+//
+// dumpMessages();
+// }
+// }).start();
+// }
+
+ /**
* Checks whether callable throws expected exception or not.
*
* @param log Logger (optional).
@@ -1728,4 +1787,4 @@ public final class GridTestUtils {
/** Evict to offheap with eviction policy + evict from offheap to swap when max offheap memory limit is reached. */
OFFHEAP_EVICT_SWAP,
}
-}
\ No newline at end of file
+}