You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/01/01 22:07:43 UTC

[16/18] ignite git commit: IGNITE-2263: DANGEROUS!!! Reworked GridIoManager methods.

IGNITE-2263: DANGEROUS!!! Reworked GridIoManager methods.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/80f15a73
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/80f15a73
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/80f15a73

Branch: refs/heads/ignite-2263
Commit: 80f15a731c9a1f4cbbd38d85efaeb244ad9ee4e7
Parents: 9a3d951
Author: thatcoach <pp...@list.ru>
Authored: Fri Jan 1 23:52:26 2016 +0300
Committer: thatcoach <pp...@list.ru>
Committed: Fri Jan 1 23:52:26 2016 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 154 +++++++++++++------
 .../eventstorage/GridEventStorageManager.java   |  48 +++---
 .../processors/cache/GridCacheIoManager.java    |  25 ++-
 3 files changed, 140 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/80f15a73/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 42f8dae..4fd0b67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -17,26 +17,6 @@
 
 package org.apache.ignite.internal.managers.communication;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -64,6 +44,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
@@ -82,20 +63,32 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MARSH_CACHE_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.isReservedGridIoPolicy;
-import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.threadProcessingMessage;
-import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.*;
+import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.*;
 
 /**
  * Grid communication manager.
@@ -1224,11 +1217,33 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         byte plc,
         long timeout,
         boolean skipOnTimeout
+    ) throws IgniteCheckedException {
+        sendOrderedMessage(nodes, F.<ClusterNode>alwaysTrue(), topic, msg, plc, timeout, skipOnTimeout);
+    }
+
+    /**
+     * @param nodes Destination node.
+     * @param pred Predicate.
+     * @param topic Topic to send the message to.
+     * @param msg Message to send.
+     * @param plc Type of processing.
+     * @param timeout Timeout to keep a message on receiving queue.
+     * @param skipOnTimeout Whether message can be skipped on timeout.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void sendOrderedMessage(
+        Collection<? extends ClusterNode> nodes,
+        IgnitePredicate<? super ClusterNode> pred,
+        Object topic,
+        Message msg,
+        byte plc,
+        long timeout,
+        boolean skipOnTimeout
     )
         throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(nodes, topic, -1, msg, plc, true, timeout, skipOnTimeout);
+        send(nodes, pred, topic, -1, msg, plc, true, timeout, skipOnTimeout);
     }
 
     /**
@@ -1257,11 +1272,46 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         Message msg,
         byte plc
     ) throws IgniteCheckedException {
-        send(nodes, topic, -1, msg, plc, false, 0, false);
+        send(nodes, F.<ClusterNode>alwaysTrue(), topic, msg, plc);
+    }
+
+    /**
+     * @param nodes Destination node.
+     * @param pred Predicate.
+     * @param topic Topic to send the message to.
+     * @param msg Message to send.
+     * @param plc Type of processing.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void send(
+        Collection<? extends ClusterNode> nodes,
+        IgnitePredicate<? super ClusterNode> pred,
+        Object topic,
+        Message msg,
+        byte plc
+    ) throws IgniteCheckedException {
+        send(nodes, pred, topic, -1, msg, plc, false, 0, false);
+    }
+
+    /**
+     * @param nodes Destination nodes.
+     * @param topic Topic to send the message to.
+     * @param msg Message to send.
+     * @param plc Type of processing.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void send(
+        Collection<? extends ClusterNode> nodes,
+        GridTopic topic,
+        Message msg,
+        byte plc
+    ) throws IgniteCheckedException {
+        send(nodes, F.<ClusterNode>alwaysTrue(), topic, msg, plc);
     }
 
     /**
      * @param nodes Destination nodes.
+     * @param pred Predicate.
      * @param topic Topic to send the message to.
      * @param msg Message to send.
      * @param plc Type of processing.
@@ -1269,11 +1319,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(
         Collection<? extends ClusterNode> nodes,
+        IgnitePredicate<? super ClusterNode> pred,
         GridTopic topic,
         Message msg,
         byte plc
     ) throws IgniteCheckedException {
-        send(nodes, topic, topic.ordinal(), msg, plc, false, 0, false);
+        send(nodes, pred, topic, topic.ordinal(), msg, plc, false, 0, false);
     }
 
     /**
@@ -1372,10 +1423,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         else {
             ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
 
-            Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(locNodeId));
-
-            if (!rmtNodes.isEmpty())
-                send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+            send(nodes, F.remoteNodes(locNodeId), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
 
             // Will call local listeners in current thread synchronously, so must go the last
             // to allow remote nodes execute the requested operation in parallel.
@@ -1452,6 +1500,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
     /**
      * @param nodes Destination nodes.
+     * @param pred Optional predicate for nodes.
      * @param topic Topic to send the message to.
      * @param topicOrd Topic ordinal value.
      * @param msg Message to send.
@@ -1463,6 +1512,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     private void send(
         Collection<? extends ClusterNode> nodes,
+        IgnitePredicate<? super ClusterNode> pred,
         Object topic,
         int topicOrd,
         Message msg,
@@ -1476,24 +1526,32 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         assert msg != null;
 
         if (!ordered)
-            assert F.find(nodes, null, F.localNode(locNodeId)) == null :
+            assert F.find(nodes, null, F.and(pred, F.localNode(locNodeId))) == null :
                 "Internal Ignite code should never call the method with local node in a node list.";
 
         try {
             // Small optimization, as communication SPIs may have lighter implementation for sending
             // messages to one node vs. many.
-            if (!nodes.isEmpty()) {
-                for (ClusterNode node : nodes)
+            boolean sent = false;
+
+            for (ClusterNode node : nodes) {
+                if (pred.apply(node)) {
                     send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null);
+
+                    sent = true;
+                }
+            }
+
+            if (!sent) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +
+                        msg + ", policy=" + plc + ']');
             }
-            else if (log.isDebugEnabled())
-                log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +
-                    msg + ", policy=" + plc + ']');
         }
         catch (IgniteSpiException e) {
             throw new IgniteCheckedException("Failed to send message (nodes may have left the grid or " +
                 "TCP connection cannot be established due to firewall issues) " +
-                "[nodes=" + nodes + ", topic=" + topic +
+                "[nodes=" + F.retain(nodes, true, pred) + ", topic=" + topic +
                 ", msg=" + msg + ", policy=" + plc + ']', e);
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/80f15a73/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index ea01e52..4bb9d13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -17,21 +17,6 @@
 
 package org.apache.ignite.internal.managers.eventstorage;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -64,13 +49,25 @@ import org.apache.ignite.spi.eventstorage.EventStorageSpi;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
-import static org.apache.ignite.events.EventType.EVTS_ALL;
-import static org.apache.ignite.events.EventType.EVTS_DISCOVERY_ALL;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
-import static org.apache.ignite.internal.GridTopic.TOPIC_EVENT;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 
 /**
  * Grid event storage SPI manager.
@@ -1015,15 +1012,14 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
         GridEventStorageMessage msg, byte plc) throws IgniteCheckedException {
         ClusterNode locNode = F.find(nodes, null, F.localNode(ctx.localNodeId()));
 
-        Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(ctx.localNodeId()));
-
         if (locNode != null)
             ctx.io().send(locNode, topic, msg, plc);
 
-        if (!rmtNodes.isEmpty()) {
+        // Avoid marshaling if possible.
+        if (nodes.size() > (locNode != null ? 1 : 0)) {
             msg.responseTopicBytes(marsh.marshal(msg.responseTopic()));
 
-            ctx.io().send(rmtNodes, topic, msg, plc);
+            ctx.io().send(nodes, F.remoteNodes(ctx.localNodeId()), topic, msg, plc);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/80f15a73/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 4c9cdf2..02cbc04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -17,14 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -71,7 +63,16 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
-import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.ignite.internal.GridTopic.*;
 
 /**
  * Cache communication manager.
@@ -705,13 +706,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
         while (cnt < retryCnt) {
             try {
-                Collection<? extends ClusterNode> nodesView = F.view(nodes, new P1<ClusterNode>() {
+                cctx.gridIO().send(nodes, new P1<ClusterNode>() {
                     @Override public boolean apply(ClusterNode e) {
                         return !leftIds.contains(e.id());
                     }
-                });
-
-                cctx.gridIO().send(nodesView, TOPIC_CACHE, msg, plc);
+                }, TOPIC_CACHE, msg, plc);
 
                 boolean added = false;