You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/24 12:36:36 UTC

[24/24] incubator-ignite git commit: ignite-1139: - fixed race in GridDhtPartitionsExchangeFuture - fixed NPE in TcpCommunicationSpi when this SPI was not in the fully initialized state

ignite-1139:
- fixed race in GridDhtPartitionsExchangeFuture
- fixed NPE in TcpCommunicationSpi when this SPI was not in the fully initialized state


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/84f8b956
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/84f8b956
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/84f8b956

Branch: refs/heads/ignite-1139
Commit: 84f8b956e40ae88d11e0ef125442203a497b8c4b
Parents: 89da409
Author: dmagda <ma...@gmail.com>
Authored: Fri Jul 24 13:35:32 2015 +0300
Committer: dmagda <ma...@gmail.com>
Committed: Fri Jul 24 13:35:32 2015 +0300

----------------------------------------------------------------------
 .../GridDhtPartitionsExchangeFuture.java        | 20 +++++-----
 .../communication/tcp/TcpCommunicationSpi.java  | 39 ++++++++++++++++++--
 2 files changed, 46 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/84f8b956/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3664220..cbf6b40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -583,7 +583,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                             onDone(exchId.topologyVersion());
                         }
                         else
-                            sendPartitions();
+                            sendPartitions(oldest);
                     }
                     else {
                         rmtIds = Collections.emptyList();
@@ -816,9 +816,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             if (log.isDebugEnabled())
                 log.debug("Initialized future: " + this);
 
+            ClusterNode oldest = oldestNode.get();
+
             // If this node is not oldest.
-            if (!oldestNode.get().id().equals(cctx.localNodeId()))
-                sendPartitions();
+            if (!oldest.id().equals(cctx.localNodeId()))
+                sendPartitions(oldest);
             else {
                 boolean allReceived = allReceived();
 
@@ -948,11 +950,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     *
+     * @param oldestNode Oldest node.
      */
-    private void sendPartitions() {
-        ClusterNode oldestNode = this.oldestNode.get();
-
+    private void sendPartitions(ClusterNode oldestNode) {
         try {
             sendLocalPartitions(oldestNode, exchId);
         }
@@ -1402,8 +1402,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      *
      */
     private void recheck() {
+        ClusterNode oldest = oldestNode.get();
+
         // If this is the oldest node.
-        if (oldestNode.get().id().equals(cctx.localNodeId())) {
+        if (oldest.id().equals(cctx.localNodeId())) {
             Collection<UUID> remaining = remaining();
 
             if (!remaining.isEmpty()) {
@@ -1423,7 +1425,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             }
         }
         else
-            sendPartitions();
+            sendPartitions(oldest);
 
         // Schedule another send.
         scheduleRecheck();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/84f8b956/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index e9fd696..5159e18 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1717,7 +1717,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (log.isTraceEnabled())
             log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']');
 
-        if (node.id().equals(getLocalNode().id()))
+        ClusterNode localNode = getLocalNode();
+
+        if (localNode == null)
+            throw new IgniteSpiException("Local node is not started or fully initialized [isStopping=" +
+                    getSpiContext().isStopping() + ']');
+
+        if (node.id().equals(localNode.id()))
             notifyListener(node.id(), msg, NOOP);
         else {
             GridCommunicationClient client = null;
@@ -2263,8 +2269,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
 
+                    ClusterNode localNode = getLocalNode();
+
+                    if (localNode == null)
+                        throw new IgniteCheckedException("Local node is not started or fully initialized [isStopping=" +
+                            getSpiContext().isStopping() + ']');
+
                     if (recovery != null) {
-                        HandshakeMessage msg = new HandshakeMessage(getLocalNode().id(),
+                        HandshakeMessage msg = new HandshakeMessage(localNode.id(),
                             recovery.incrementConnectCount(),
                             recovery.receivedCount());
 
@@ -2415,7 +2427,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @return Node ID message.
      */
     private NodeIdMessage nodeIdMessage() {
-        return new NodeIdMessage(getLocalNode().id());
+        ClusterNode localNode = getLocalNode();
+
+        UUID id;
+
+        if (localNode == null) {
+            log.warning("Local node is not started or fully initialized [isStopping=" +
+                    getSpiContext().isStopping() + ']');
+
+            id = new UUID(0, 0);
+        }
+        else
+            id = localNode.id();
+
+        return new NodeIdMessage(id);
     }
 
     /** {@inheritDoc} */
@@ -2931,7 +2956,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
-                UUID id = getLocalNode().id();
+                ClusterNode localNode = getLocalNode();
+
+                if (localNode == null)
+                    throw new IgniteSpiException("Local node is not started or fully initialized [isStopping=" +
+                            getSpiContext().isStopping() + ']');
+
+                UUID id = localNode.id();
 
                 NodeIdMessage msg = new NodeIdMessage(id);