You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/06/23 17:23:03 UTC

[16/38] incubator-ignite git commit: # ignite-sprint-6 do not call ping from exchange worker (cherry picked from commit 8fa9d3d)

# ignite-sprint-6 do not call ping from exchange worker (cherry picked from commit 8fa9d3d)


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/309eeb01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/309eeb01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/309eeb01

Branch: refs/heads/ignite-gg-10441
Commit: 309eeb0127546c2775f8514e1759497fccf1c8b4
Parents: 415264e
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 19 15:37:37 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jun 22 09:24:54 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    | 64 ++++++++++++------
 .../GridCachePartitionExchangeManager.java      | 70 +++++++++-----------
 .../GridCacheAbstractFailoverSelfTest.java      |  6 +-
 3 files changed, 79 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/309eeb01/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index eef9fde..74a4512 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -323,7 +323,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * Processes failed messages.
      * @param nodeId niode id.
      * @param msg message.
-     * @throws IgniteCheckedException
+     * @throws IgniteCheckedException If failed.
      */
     private void processFailedMessage(UUID nodeId, GridCacheMessage msg) throws IgniteCheckedException {
         GridCacheContext ctx = cctx.cacheContext(msg.cacheId());
@@ -511,6 +511,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @param cacheMsg Cache message to get start future.
      * @return Preloader start future.
      */
+    @SuppressWarnings("unchecked")
     private IgniteInternalFuture<Object> startFuture(GridCacheMessage cacheMsg) {
         int cacheId = cacheMsg.cacheId();
 
@@ -574,6 +575,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      *
      * @param node Node to send the message to.
      * @param msg Message to send.
+     * @param plc IO policy.
      * @throws IgniteCheckedException If sending failed.
      * @throws ClusterTopologyCheckedException If receiver left.
      */
@@ -734,6 +736,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      *
      * @param nodeId ID of node to send the message to.
      * @param msg Message to send.
+     * @param plc IO policy.
      * @throws IgniteCheckedException If sending failed.
      */
     public void send(UUID nodeId, GridCacheMessage msg, GridIoPolicy plc) throws IgniteCheckedException {
@@ -795,8 +798,41 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * Sends message without retries and node ping in case of error.
+     *
+     * @param node Node to send message to.
+     * @param msg Message.
+     * @param plc IO policy.
+     * @throws IgniteCheckedException If send failed.
+     */
+    public void sendNoRetry(ClusterNode node,
+        GridCacheMessage msg,
+        GridIoPolicy plc)
+        throws IgniteCheckedException
+    {
+        assert node != null;
+        assert msg != null;
+
+        onSend(msg, null);
+
+        try {
+            cctx.gridIO().send(node, TOPIC_CACHE, msg, plc);
+
+            if (log.isDebugEnabled())
+                log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
+        }
+        catch (IgniteCheckedException e) {
+            if (!cctx.discovery().alive(node.id()))
+                throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e);
+            else
+                throw e;
+        }
+    }
+
+    /**
      * Adds message handler.
      *
+     * @param cacheId Cache ID.
      * @param type Type of message.
      * @param c Handler.
      */
@@ -846,29 +882,15 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
         idxClsHandlers.remove(cacheId);
 
-        for (Iterator<ListenerKey> iterator = clsHandlers.keySet().iterator(); iterator.hasNext(); ) {
-            ListenerKey key = iterator.next();
+        for (Iterator<ListenerKey> iter = clsHandlers.keySet().iterator(); iter.hasNext(); ) {
+            ListenerKey key = iter.next();
 
             if (key.cacheId == cacheId)
-                iterator.remove();
+                iter.remove();
         }
     }
 
     /**
-     * @param lsnr Listener to add.
-     */
-    public void addDisconnectListener(GridDisconnectListener lsnr) {
-        cctx.kernalContext().io().addDisconnectListener(lsnr);
-    }
-
-    /**
-     * @param lsnr Listener to remove.
-     */
-    public void removeDisconnectListener(GridDisconnectListener lsnr) {
-        cctx.kernalContext().io().removeDisconnectListener(lsnr);
-    }
-
-    /**
      * @param msgCls Message class to check.
      * @return Message index.
      */
@@ -1029,11 +1051,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
         /** {@inheritDoc} */
         @Override public int hashCode() {
-            int result = cacheId;
+            int res = cacheId;
 
-            result = 31 * result + msgCls.hashCode();
+            res = 31 * res + msgCls.hashCode();
 
-            return result;
+            return res;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/309eeb01/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index ad4cf50..edd0ad7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -567,27 +567,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         Collection<ClusterNode> rmts = null;
 
-        try {
-            // If this is the oldest node.
-            if (oldest.id().equals(cctx.localNodeId())) {
-                rmts = CU.remoteNodes(cctx, AffinityTopologyVersion.NONE);
-
-                if (log.isDebugEnabled())
-                    log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());
+        // If this is the oldest node.
+        if (oldest.id().equals(cctx.localNodeId())) {
+            rmts = CU.remoteNodes(cctx, AffinityTopologyVersion.NONE);
 
-                sendAllPartitions(rmts);
-            }
-            else {
-                if (log.isDebugEnabled())
-                    log.debug("Refreshing local partitions from non-oldest node: " +
-                        cctx.localNodeId());
+            if (log.isDebugEnabled())
+                log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());
 
-                sendLocalPartitions(oldest, null);
-            }
+            sendAllPartitions(rmts);
         }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to refresh partition map [oldest=" + oldest.id() + ", rmts=" + U.nodeIds(rmts) +
-                ", loc=" + cctx.localNodeId() + ']', e);
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Refreshing local partitions from non-oldest node: " +
+                    cctx.localNodeId());
+
+            sendLocalPartitions(oldest, null);
         }
     }
 
@@ -616,10 +610,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /**
      * @param nodes Nodes.
      * @return {@code True} if message was sent, {@code false} if node left grid.
-     * @throws IgniteCheckedException If failed.
      */
-    private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes)
-        throws IgniteCheckedException {
+    private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) {
         GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE);
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -634,7 +626,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (log.isDebugEnabled())
             log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
 
-        cctx.io().safeSend(nodes, m, SYSTEM_POOL, null);
+        for (ClusterNode node : nodes) {
+            try {
+                cctx.io().sendNoRetry(node, m, SYSTEM_POOL);
+            }
+            catch (ClusterTopologyCheckedException ignore) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" +
+                        node.id() + ", msg=" + m + ']');
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send partitions full message [node=" + node + ']', e);
+            }
+        }
 
         return true;
     }
@@ -642,11 +646,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /**
      * @param node Node.
      * @param id ID.
-     * @return {@code True} if message was sent, {@code false} if node left grid.
-     * @throws IgniteCheckedException If failed.
      */
-    private boolean sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id)
-        throws IgniteCheckedException {
+    private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
         GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
             cctx.kernalContext().clientNode(),
             cctx.versions().last());
@@ -669,16 +670,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']');
 
         try {
-            cctx.io().send(node, m, SYSTEM_POOL);
-
-            return true;
+            cctx.io().sendNoRetry(node, m, SYSTEM_POOL);
         }
         catch (ClusterTopologyCheckedException ignore) {
             if (log.isDebugEnabled())
                 log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" +
                     node.id() + ", msg=" + m + ']');
-
-            return false;
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send local partition map to node [node=" + node + ", exchId=" + id + ']', e);
         }
     }
 
@@ -903,13 +903,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             return;
 
         try {
-            try {
-                sendLocalPartitions(node, msg.exchangeId());
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send local partition map to node [nodeId=" + node.id() + ", exchId=" +
-                    msg.exchangeId() + ']', e);
-            }
+            sendLocalPartitions(node, msg.exchangeId());
         }
         finally {
             leaveBusy();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/309eeb01/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
index b6cd88e..3c74674 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
@@ -74,9 +74,11 @@ public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstrac
 
         TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
 
-        discoSpi.setSocketTimeout(10_000);
-        discoSpi.setAckTimeout(10_000);
+        discoSpi.setSocketTimeout(30_000);
+        discoSpi.setAckTimeout(30_000);
         discoSpi.setNetworkTimeout(60_000);
+        discoSpi.setHeartbeatFrequency(30_000);
+        discoSpi.setReconnectCount(2);
 
         return cfg;
     }