You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/23 15:57:56 UTC
[21/40] incubator-ignite git commit: # ignite-sprint-6 do not call
ping from exchange worker (cherry picked from commit 8fa9d3d)
# ignite-sprint-6 do not call ping from exchange worker (cherry picked from commit 8fa9d3d)
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/309eeb01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/309eeb01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/309eeb01
Branch: refs/heads/IgniteReflectionFactory-doc
Commit: 309eeb0127546c2775f8514e1759497fccf1c8b4
Parents: 415264e
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 19 15:37:37 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jun 22 09:24:54 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheIoManager.java | 64 ++++++++++++------
.../GridCachePartitionExchangeManager.java | 70 +++++++++-----------
.../GridCacheAbstractFailoverSelfTest.java | 6 +-
3 files changed, 79 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/309eeb01/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index eef9fde..74a4512 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -323,7 +323,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* Processes failed messages.
* @param nodeId niode id.
* @param msg message.
- * @throws IgniteCheckedException
+ * @throws IgniteCheckedException If failed.
*/
private void processFailedMessage(UUID nodeId, GridCacheMessage msg) throws IgniteCheckedException {
GridCacheContext ctx = cctx.cacheContext(msg.cacheId());
@@ -511,6 +511,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @param cacheMsg Cache message to get start future.
* @return Preloader start future.
*/
+ @SuppressWarnings("unchecked")
private IgniteInternalFuture<Object> startFuture(GridCacheMessage cacheMsg) {
int cacheId = cacheMsg.cacheId();
@@ -574,6 +575,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
*
* @param node Node to send the message to.
* @param msg Message to send.
+ * @param plc IO policy.
* @throws IgniteCheckedException If sending failed.
* @throws ClusterTopologyCheckedException If receiver left.
*/
@@ -734,6 +736,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
*
* @param nodeId ID of node to send the message to.
* @param msg Message to send.
+ * @param plc IO policy.
* @throws IgniteCheckedException If sending failed.
*/
public void send(UUID nodeId, GridCacheMessage msg, GridIoPolicy plc) throws IgniteCheckedException {
@@ -795,8 +798,41 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
/**
+ * Sends message without retries and node ping in case of error.
+ *
+ * @param node Node to send message to.
+ * @param msg Message.
+ * @param plc IO policy.
+ * @throws IgniteCheckedException If send failed.
+ */
+ public void sendNoRetry(ClusterNode node,
+ GridCacheMessage msg,
+ GridIoPolicy plc)
+ throws IgniteCheckedException
+ {
+ assert node != null;
+ assert msg != null;
+
+ onSend(msg, null);
+
+ try {
+ cctx.gridIO().send(node, TOPIC_CACHE, msg, plc);
+
+ if (log.isDebugEnabled())
+ log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
+ }
+ catch (IgniteCheckedException e) {
+ if (!cctx.discovery().alive(node.id()))
+ throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e);
+ else
+ throw e;
+ }
+ }
+
+ /**
* Adds message handler.
*
+ * @param cacheId Cache ID.
* @param type Type of message.
* @param c Handler.
*/
@@ -846,29 +882,15 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
idxClsHandlers.remove(cacheId);
- for (Iterator<ListenerKey> iterator = clsHandlers.keySet().iterator(); iterator.hasNext(); ) {
- ListenerKey key = iterator.next();
+ for (Iterator<ListenerKey> iter = clsHandlers.keySet().iterator(); iter.hasNext(); ) {
+ ListenerKey key = iter.next();
if (key.cacheId == cacheId)
- iterator.remove();
+ iter.remove();
}
}
/**
- * @param lsnr Listener to add.
- */
- public void addDisconnectListener(GridDisconnectListener lsnr) {
- cctx.kernalContext().io().addDisconnectListener(lsnr);
- }
-
- /**
- * @param lsnr Listener to remove.
- */
- public void removeDisconnectListener(GridDisconnectListener lsnr) {
- cctx.kernalContext().io().removeDisconnectListener(lsnr);
- }
-
- /**
* @param msgCls Message class to check.
* @return Message index.
*/
@@ -1029,11 +1051,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
/** {@inheritDoc} */
@Override public int hashCode() {
- int result = cacheId;
+ int res = cacheId;
- result = 31 * result + msgCls.hashCode();
+ res = 31 * res + msgCls.hashCode();
- return result;
+ return res;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/309eeb01/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index ad4cf50..edd0ad7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -567,27 +567,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
Collection<ClusterNode> rmts = null;
- try {
- // If this is the oldest node.
- if (oldest.id().equals(cctx.localNodeId())) {
- rmts = CU.remoteNodes(cctx, AffinityTopologyVersion.NONE);
-
- if (log.isDebugEnabled())
- log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());
+ // If this is the oldest node.
+ if (oldest.id().equals(cctx.localNodeId())) {
+ rmts = CU.remoteNodes(cctx, AffinityTopologyVersion.NONE);
- sendAllPartitions(rmts);
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Refreshing local partitions from non-oldest node: " +
- cctx.localNodeId());
+ if (log.isDebugEnabled())
+ log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());
- sendLocalPartitions(oldest, null);
- }
+ sendAllPartitions(rmts);
}
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to refresh partition map [oldest=" + oldest.id() + ", rmts=" + U.nodeIds(rmts) +
- ", loc=" + cctx.localNodeId() + ']', e);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Refreshing local partitions from non-oldest node: " +
+ cctx.localNodeId());
+
+ sendLocalPartitions(oldest, null);
}
}
@@ -616,10 +610,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @param nodes Nodes.
* @return {@code True} if message was sent, {@code false} if node left grid.
- * @throws IgniteCheckedException If failed.
*/
- private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes)
- throws IgniteCheckedException {
+ private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) {
GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE);
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -634,7 +626,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (log.isDebugEnabled())
log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
- cctx.io().safeSend(nodes, m, SYSTEM_POOL, null);
+ for (ClusterNode node : nodes) {
+ try {
+ cctx.io().sendNoRetry(node, m, SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" +
+ node.id() + ", msg=" + m + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send partitions full message [node=" + node + ']', e);
+ }
+ }
return true;
}
@@ -642,11 +646,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @param node Node.
* @param id ID.
- * @return {@code True} if message was sent, {@code false} if node left grid.
- * @throws IgniteCheckedException If failed.
*/
- private boolean sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id)
- throws IgniteCheckedException {
+ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
cctx.kernalContext().clientNode(),
cctx.versions().last());
@@ -669,16 +670,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']');
try {
- cctx.io().send(node, m, SYSTEM_POOL);
-
- return true;
+ cctx.io().sendNoRetry(node, m, SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException ignore) {
if (log.isDebugEnabled())
log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" +
node.id() + ", msg=" + m + ']');
-
- return false;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send local partition map to node [node=" + node + ", exchId=" + id + ']', e);
}
}
@@ -903,13 +903,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return;
try {
- try {
- sendLocalPartitions(node, msg.exchangeId());
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send local partition map to node [nodeId=" + node.id() + ", exchId=" +
- msg.exchangeId() + ']', e);
- }
+ sendLocalPartitions(node, msg.exchangeId());
}
finally {
leaveBusy();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/309eeb01/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
index b6cd88e..3c74674 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
@@ -74,9 +74,11 @@ public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstrac
TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
- discoSpi.setSocketTimeout(10_000);
- discoSpi.setAckTimeout(10_000);
+ discoSpi.setSocketTimeout(30_000);
+ discoSpi.setAckTimeout(30_000);
discoSpi.setNetworkTimeout(60_000);
+ discoSpi.setHeartbeatFrequency(30_000);
+ discoSpi.setReconnectCount(2);
return cfg;
}