You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/01/01 22:07:43 UTC
[16/18] ignite git commit: IGNITE-2263: DANGEROUS!!! Reworked
GridIoManager methods.
IGNITE-2263: DANGEROUS!!! Reworked GridIoManager methods.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/80f15a73
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/80f15a73
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/80f15a73
Branch: refs/heads/ignite-2263
Commit: 80f15a731c9a1f4cbbd38d85efaeb244ad9ee4e7
Parents: 9a3d951
Author: thatcoach <pp...@list.ru>
Authored: Fri Jan 1 23:52:26 2016 +0300
Committer: thatcoach <pp...@list.ru>
Committed: Fri Jan 1 23:52:26 2016 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 154 +++++++++++++------
.../eventstorage/GridEventStorageManager.java | 48 +++---
.../processors/cache/GridCacheIoManager.java | 25 ++-
3 files changed, 140 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/80f15a73/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 42f8dae..4fd0b67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -17,26 +17,6 @@
package org.apache.ignite.internal.managers.communication;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
@@ -64,6 +44,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
@@ -82,20 +63,32 @@ import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MARSH_CACHE_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.isReservedGridIoPolicy;
-import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.threadProcessingMessage;
-import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.*;
+import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.*;
/**
* Grid communication manager.
@@ -1224,11 +1217,33 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
byte plc,
long timeout,
boolean skipOnTimeout
+ ) throws IgniteCheckedException {
+ sendOrderedMessage(nodes, F.<ClusterNode>alwaysTrue(), topic, msg, plc, timeout, skipOnTimeout);
+ }
+
+ /**
+ * @param nodes Destination node.
+ * @param pred Predicate.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @param timeout Timeout to keep a message on receiving queue.
+ * @param skipOnTimeout Whether message can be skipped on timeout.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void sendOrderedMessage(
+ Collection<? extends ClusterNode> nodes,
+ IgnitePredicate<? super ClusterNode> pred,
+ Object topic,
+ Message msg,
+ byte plc,
+ long timeout,
+ boolean skipOnTimeout
)
throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
- send(nodes, topic, -1, msg, plc, true, timeout, skipOnTimeout);
+ send(nodes, pred, topic, -1, msg, plc, true, timeout, skipOnTimeout);
}
/**
@@ -1257,11 +1272,46 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
Message msg,
byte plc
) throws IgniteCheckedException {
- send(nodes, topic, -1, msg, plc, false, 0, false);
+ send(nodes, F.<ClusterNode>alwaysTrue(), topic, msg, plc);
+ }
+
+ /**
+ * @param nodes Destination node.
+ * @param pred Predicate.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void send(
+ Collection<? extends ClusterNode> nodes,
+ IgnitePredicate<? super ClusterNode> pred,
+ Object topic,
+ Message msg,
+ byte plc
+ ) throws IgniteCheckedException {
+ send(nodes, pred, topic, -1, msg, plc, false, 0, false);
+ }
+
+ /**
+ * @param nodes Destination nodes.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void send(
+ Collection<? extends ClusterNode> nodes,
+ GridTopic topic,
+ Message msg,
+ byte plc
+ ) throws IgniteCheckedException {
+ send(nodes, F.<ClusterNode>alwaysTrue(), topic, msg, plc);
}
/**
* @param nodes Destination nodes.
+ * @param pred Predicate.
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
@@ -1269,11 +1319,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
public void send(
Collection<? extends ClusterNode> nodes,
+ IgnitePredicate<? super ClusterNode> pred,
GridTopic topic,
Message msg,
byte plc
) throws IgniteCheckedException {
- send(nodes, topic, topic.ordinal(), msg, plc, false, 0, false);
+ send(nodes, pred, topic, topic.ordinal(), msg, plc, false, 0, false);
}
/**
@@ -1372,10 +1423,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
else {
ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
- Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(locNodeId));
-
- if (!rmtNodes.isEmpty())
- send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+ send(nodes, F.remoteNodes(locNodeId), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
// Will call local listeners in current thread synchronously, so must go the last
// to allow remote nodes execute the requested operation in parallel.
@@ -1452,6 +1500,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/**
* @param nodes Destination nodes.
+ * @param pred Optional predicate for nodes.
* @param topic Topic to send the message to.
* @param topicOrd Topic ordinal value.
* @param msg Message to send.
@@ -1463,6 +1512,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
private void send(
Collection<? extends ClusterNode> nodes,
+ IgnitePredicate<? super ClusterNode> pred,
Object topic,
int topicOrd,
Message msg,
@@ -1476,24 +1526,32 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
assert msg != null;
if (!ordered)
- assert F.find(nodes, null, F.localNode(locNodeId)) == null :
+ assert F.find(nodes, null, F.and(pred, F.localNode(locNodeId))) == null :
"Internal Ignite code should never call the method with local node in a node list.";
try {
// Small optimization, as communication SPIs may have lighter implementation for sending
// messages to one node vs. many.
- if (!nodes.isEmpty()) {
- for (ClusterNode node : nodes)
+ boolean sent = false;
+
+ for (ClusterNode node : nodes) {
+ if (pred.apply(node)) {
send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null);
+
+ sent = true;
+ }
+ }
+
+ if (!sent) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +
+ msg + ", policy=" + plc + ']');
}
- else if (log.isDebugEnabled())
- log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +
- msg + ", policy=" + plc + ']');
}
catch (IgniteSpiException e) {
throw new IgniteCheckedException("Failed to send message (nodes may have left the grid or " +
"TCP connection cannot be established due to firewall issues) " +
- "[nodes=" + nodes + ", topic=" + topic +
+ "[nodes=" + F.retain(nodes, true, pred) + ", topic=" + topic +
", msg=" + msg + ", policy=" + plc + ']', e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/80f15a73/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index ea01e52..4bb9d13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -17,21 +17,6 @@
package org.apache.ignite.internal.managers.eventstorage;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
@@ -64,13 +49,25 @@ import org.apache.ignite.spi.eventstorage.EventStorageSpi;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import static org.apache.ignite.events.EventType.EVTS_ALL;
-import static org.apache.ignite.events.EventType.EVTS_DISCOVERY_ALL;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
-import static org.apache.ignite.internal.GridTopic.TOPIC_EVENT;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
/**
* Grid event storage SPI manager.
@@ -1015,15 +1012,14 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
GridEventStorageMessage msg, byte plc) throws IgniteCheckedException {
ClusterNode locNode = F.find(nodes, null, F.localNode(ctx.localNodeId()));
- Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(ctx.localNodeId()));
-
if (locNode != null)
ctx.io().send(locNode, topic, msg, plc);
- if (!rmtNodes.isEmpty()) {
+ // Avoid marshaling if possible.
+ if (nodes.size() > (locNode != null ? 1 : 0)) {
msg.responseTopicBytes(marsh.marshal(msg.responseTopic()));
- ctx.io().send(rmtNodes, topic, msg, plc);
+ ctx.io().send(nodes, F.remoteNodes(ctx.localNodeId()), topic, msg, plc);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/80f15a73/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 4c9cdf2..02cbc04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -17,14 +17,6 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -71,7 +63,16 @@ import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.ignite.internal.GridTopic.*;
/**
* Cache communication manager.
@@ -705,13 +706,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
while (cnt < retryCnt) {
try {
- Collection<? extends ClusterNode> nodesView = F.view(nodes, new P1<ClusterNode>() {
+ cctx.gridIO().send(nodes, new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode e) {
return !leftIds.contains(e.id());
}
- });
-
- cctx.gridIO().send(nodesView, TOPIC_CACHE, msg, plc);
+ }, TOPIC_CACHE, msg, plc);
boolean added = false;