You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/19 03:02:49 UTC

[6/6] ignite git commit: IGNITE-1171 - Delay custom messages between NodeAdded and NodeAddFinished messages.

IGNITE-1171 - Delay custom messages between NodeAdded and NodeAddFinished messages.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2412b173
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2412b173
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2412b173

Branch: refs/heads/ignite-1171
Commit: 2412b1734cfd65057565a653daa8816a7d31827e
Parents: 6a11189
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Sep 18 18:01:58 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Sep 18 18:01:58 2015 -0700

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryManager.java |  2 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  5 ++-
 .../discovery/DiscoverySpiCustomMessage.java    | 12 +++---
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 41 ++++++++++++--------
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  6 +--
 .../tcp/TcpDiscoveryMultiThreadedTest.java      |  2 +-
 6 files changed, 39 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index da02b97..6d95b73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -811,4 +811,4 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 144a0fd..c93d5af 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2126,8 +2126,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
                     timeoutHelper.checkFailureTimeoutReached(e))) {
-                    log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
-                        failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
+                    if (log.isDebugEnabled())
+                        log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
+                            failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
 
                     throw e;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index 373c121..a0f9b75 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,13 +18,15 @@
 package org.apache.ignite.spi.discovery;
 
 import java.io.Serializable;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Message to send across ring.
  *
- * @see org.apache.ignite.internal.managers.discovery.GridDiscoveryManager#sendCustomEvent(
- * org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage)
+ * @see GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)
  */
 public interface DiscoverySpiCustomMessage extends Serializable {
     /**
@@ -36,4 +38,4 @@ public interface DiscoverySpiCustomMessage extends Serializable {
      * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
      */
     public boolean isMutable();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 92f90a1..e40bafb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1880,15 +1880,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * Clears pending messages.
-         */
-        void clear() {
-            msgs.clear();
-
-            discardId = null;
-        }
-
-        /**
          * Discards message with provided ID and all before it.
          *
          * @param id Discarded message ID.
@@ -1943,7 +1934,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         private long connCheckThreshold;
 
         /** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished messages. */
-        private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new LinkedList<>();
+        private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>();
 
         /** Collection to track joining nodes. */
         private Set<UUID> joiningNodes = new HashSet<>();
@@ -2053,6 +2044,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             sendHeartbeatMessage();
 
             checkHeartbeatsReceiving();
+
+            checkPendingCustomMessages();
         }
 
         /**
@@ -2337,7 +2330,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         if (pendingMsg.id().equals(pendingMsgs.discardId))
                                             skip = false;
 
-                                        continue;
+                                        if (!(pendingMsg instanceof TcpDiscoveryCustomEventMessage))
+                                            continue;
                                     }
 
                                     long tstamp = U.currentTimeMillis();
@@ -2361,13 +2355,13 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
                                     if (log.isDebugEnabled())
-                                        log.debug("Pending message has been sent to next node [msg=" + msg.id() +
-                                            ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
+                                        log.debug("Pending message has been sent to next node [msgId=" + msg.id() +
+                                            ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
                                             ", res=" + res + ']');
 
                                     if (debugMode)
-                                        debugLog("Pending message has been sent to next node [msg=" + msg.id() +
-                                            ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
+                                        debugLog("Pending message has been sent to next node [msgId=" + msg.id() +
+                                            ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
                                             ", res=" + res + ']');
 
                                     // Resetting timeout control object to create a new one for the next bunch of
@@ -3236,6 +3230,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 n.visible(true);
                             }
 
+                            joiningNodes.clear();
+
                             locNode.setAttributes(node.attributes());
 
                             locNode.visible(true);
@@ -3251,7 +3247,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             topHist.clear();
                             topHist.putAll(msg.topologyHistory());
 
-                            pendingMsgs.discard(msg.discardedMessageId());
+                            pendingMsgs.reset(msg.messages(), msg.discardedMessageId());
 
                             // Clear data to minimize message size.
                             msg.messages(null, null);
@@ -4175,8 +4171,19 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
             else {
-                if (msg.verified())
+                TcpDiscoverySpiState state0;
+
+                synchronized (mux) {
+                    state0 = spiState;
+                }
+
+                assert !(msg.verified() && state0 == CONNECTING);
+
+                if (msg.verified() && state0 == CONNECTED) {
+                    assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes;
+
                     notifyDiscoveryListener(msg);
+                }
 
                 if (ring.hasRemoteNodes())
                     sendMessageAcrossRing(msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index e5be530..2786d0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -313,4 +313,4 @@ abstract class TcpDiscoveryImpl {
 
         return res;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2412b173/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 4f3c9a9..1ccbe1f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -293,4 +293,4 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
             stopAllGrids();
         }
     }
-}
\ No newline at end of file
+}