You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/24 12:36:36 UTC
[24/24] incubator-ignite git commit: ignite-1139: - fixed race in
GridDhtPartitionsExchangeFuture - fixed NPE in TcpCommunicationSpi when this
SPI was not in the fully initialized state
ignite-1139:
- fixed race in GridDhtPartitionsExchangeFuture
- fixed NPE in TcpCommunicationSpi when this SPI was not in the fully initialized state
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/84f8b956
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/84f8b956
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/84f8b956
Branch: refs/heads/ignite-1139
Commit: 84f8b956e40ae88d11e0ef125442203a497b8c4b
Parents: 89da409
Author: dmagda <ma...@gmail.com>
Authored: Fri Jul 24 13:35:32 2015 +0300
Committer: dmagda <ma...@gmail.com>
Committed: Fri Jul 24 13:35:32 2015 +0300
----------------------------------------------------------------------
.../GridDhtPartitionsExchangeFuture.java | 20 +++++-----
.../communication/tcp/TcpCommunicationSpi.java | 39 ++++++++++++++++++--
2 files changed, 46 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/84f8b956/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3664220..cbf6b40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -583,7 +583,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
onDone(exchId.topologyVersion());
}
else
- sendPartitions();
+ sendPartitions(oldest);
}
else {
rmtIds = Collections.emptyList();
@@ -816,9 +816,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (log.isDebugEnabled())
log.debug("Initialized future: " + this);
+ ClusterNode oldest = oldestNode.get();
+
// If this node is not oldest.
- if (!oldestNode.get().id().equals(cctx.localNodeId()))
- sendPartitions();
+ if (!oldest.id().equals(cctx.localNodeId()))
+ sendPartitions(oldest);
else {
boolean allReceived = allReceived();
@@ -948,11 +950,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
- *
+ * @param oldestNode Oldest node.
*/
- private void sendPartitions() {
- ClusterNode oldestNode = this.oldestNode.get();
-
+ private void sendPartitions(ClusterNode oldestNode) {
try {
sendLocalPartitions(oldestNode, exchId);
}
@@ -1402,8 +1402,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
*
*/
private void recheck() {
+ ClusterNode oldest = oldestNode.get();
+
// If this is the oldest node.
- if (oldestNode.get().id().equals(cctx.localNodeId())) {
+ if (oldest.id().equals(cctx.localNodeId())) {
Collection<UUID> remaining = remaining();
if (!remaining.isEmpty()) {
@@ -1423,7 +1425,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
}
else
- sendPartitions();
+ sendPartitions(oldest);
// Schedule another send.
scheduleRecheck();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/84f8b956/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index e9fd696..5159e18 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1717,7 +1717,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isTraceEnabled())
log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']');
- if (node.id().equals(getLocalNode().id()))
+ ClusterNode localNode = getLocalNode();
+
+ if (localNode == null)
+ throw new IgniteSpiException("Local node is not started or fully initialized [isStopping=" +
+ getSpiContext().isStopping() + ']');
+
+ if (node.id().equals(localNode.id()))
notifyListener(node.id(), msg, NOOP);
else {
GridCommunicationClient client = null;
@@ -2263,8 +2269,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
+ ClusterNode localNode = getLocalNode();
+
+ if (localNode == null)
+ throw new IgniteCheckedException("Local node is not started or fully initialized [isStopping=" +
+ getSpiContext().isStopping() + ']');
+
if (recovery != null) {
- HandshakeMessage msg = new HandshakeMessage(getLocalNode().id(),
+ HandshakeMessage msg = new HandshakeMessage(localNode.id(),
recovery.incrementConnectCount(),
recovery.receivedCount());
@@ -2415,7 +2427,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @return Node ID message.
*/
private NodeIdMessage nodeIdMessage() {
- return new NodeIdMessage(getLocalNode().id());
+ ClusterNode localNode = getLocalNode();
+
+ UUID id;
+
+ if (localNode == null) {
+ log.warning("Local node is not started or fully initialized [isStopping=" +
+ getSpiContext().isStopping() + ']');
+
+ id = new UUID(0, 0);
+ }
+ else
+ id = localNode.id();
+
+ return new NodeIdMessage(id);
}
/** {@inheritDoc} */
@@ -2931,7 +2956,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
try {
- UUID id = getLocalNode().id();
+ ClusterNode localNode = getLocalNode();
+
+ if (localNode == null)
+ throw new IgniteSpiException("Local node is not started or fully initialized [isStopping=" +
+ getSpiContext().isStopping() + ']');
+
+ UUID id = localNode.id();
NodeIdMessage msg = new NodeIdMessage(id);