You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/15 16:02:10 UTC

[08/47] incubator-ignite git commit: # IGNITE-831 Done.

# IGNITE-831 Done.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e028de86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e028de86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e028de86

Branch: refs/heads/ignite-709_3
Commit: e028de86ee375ad7c96a8eeac333487258534cff
Parents: ac7597e
Author: sevdokimov <se...@gridgain.com>
Authored: Tue May 5 19:14:43 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Tue May 5 19:14:43 2015 +0300

----------------------------------------------------------------------
 .../continuous/GridContinuousMessageType.java   |  12 -
 .../continuous/GridContinuousProcessor.java     | 544 +++----------------
 .../StartRoutineAckDiscoveryMessage.java        |  62 +++
 .../StartRoutineDiscoveryMessage.java           |  89 +++
 .../StopRoutineAckDiscoveryMessage.java         |  49 ++
 .../continuous/StopRoutineDiscoveryMessage.java |  56 ++
 .../ignite/spi/discovery/DiscoverySpi.java      |   4 +-
 ...ridCacheContinuousQueryAbstractSelfTest.java |   6 +-
 .../tcp/TcpClientDiscoverySelfTest.java         |   4 +-
 9 files changed, 338 insertions(+), 488 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java
index eb33613..1b79430 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java
@@ -23,18 +23,6 @@ import org.jetbrains.annotations.*;
  * Continuous processor message types.
  */
 enum GridContinuousMessageType {
-    /** Consume start request. */
-    MSG_START_REQ,
-
-    /** Consume start acknowledgement. */
-    MSG_START_ACK,
-
-    /** Consume stop request. */
-    MSG_STOP_REQ,
-
-    /** Consume stop acknowledgement. */
-    MSG_STOP_ACK,
-
     /** Remote event notification. */
     MSG_EVT_NOTIFICATION,
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 41f5940..d71609b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.processors.cache.*;
@@ -64,21 +65,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /** Start futures. */
     private final ConcurrentMap<UUID, StartFuture> startFuts = new ConcurrentHashMap8<>();
 
-    /** Start ack wait lists. */
-    private final ConcurrentMap<UUID, Collection<UUID>> waitForStartAck = new ConcurrentHashMap8<>();
-
     /** Stop futures. */
     private final ConcurrentMap<UUID, StopFuture> stopFuts = new ConcurrentHashMap8<>();
 
-    /** Stop ack wait lists. */
-    private final ConcurrentMap<UUID, Collection<UUID>> waitForStopAck = new ConcurrentHashMap8<>();
-
     /** Threads started by this processor. */
     private final Collection<IgniteThread> threads = new GridConcurrentHashSet<>();
 
-    /** Pending start requests. */
-    private final Map<UUID, Collection<GridContinuousMessage>> pending = new HashMap<>();
-
     /** */
     private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts = new ConcurrentHashMap8<>();
 
@@ -91,18 +83,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /** Lock for stop process. */
     private final Lock stopLock = new ReentrantLock();
 
+    /** Marshaller. */
+    private Marshaller marsh;
+
     /** Delay in milliseconds between retries. */
     private long retryDelay = 1000;
 
     /** Number of retries using to send messages. */
     private int retryCnt = 3;
 
-    /** Acknowledgement timeout. */
-    private long ackTimeout;
-
-    /** Marshaller. */
-    private Marshaller marsh;
-
     /**
      * @param ctx Kernal context.
      */
@@ -117,15 +106,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         retryDelay = ctx.config().getNetworkSendRetryDelay();
         retryCnt = ctx.config().getNetworkSendRetryCount();
-        ackTimeout = ctx.config().getNetworkTimeout();
-
-        if (ackTimeout < retryDelay * retryCnt) {
-            U.warn(log, "Acknowledgement timeout for continuous operations is less than message send " +
-                "retry delay multiplied by retries count (will increase timeout value) [ackTimeout=" +
-                ackTimeout + ", retryDelay=" + retryDelay + ", retryCnt=" + retryCnt + ']');
-
-            ackTimeout = retryDelay * retryCnt;
-        }
 
         marsh = ctx.config().getMarshaller();
 
@@ -136,111 +116,82 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                 UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
 
-                Collection<GridContinuousMessage> reqs;
+                assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
 
-                pendingLock.lock();
+                // Unregister handlers created by left node.
+                for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) {
+                    UUID routineId = e.getKey();
+                    RemoteRoutineInfo info = e.getValue();
 
-                try {
-                    // Remove pending requests to send to joined node
-                    // (if node is left or failed, they are dropped).
-                    reqs = pending.remove(nodeId);
-                }
-                finally {
-                    pendingLock.unlock();
+                    if (info.autoUnsubscribe && nodeId.equals(info.nodeId))
+                        unregisterRemote(routineId);
                 }
 
-                switch (evt.type()) {
-                    case EVT_NODE_JOINED:
-                        if (reqs != null) {
-                            UUID routineId = null;
+                for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) {
+                    SyncMessageAckFuture fut = e.getValue();
 
-                            // Send pending requests.
-                            try {
-                                for (GridContinuousMessage req : reqs) {
-                                    routineId = req.routineId();
+                    if (fut.nodeId().equals(nodeId)) {
+                        SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey());
 
-                                    sendWithRetries(nodeId, req, null);
-                                }
-                            }
-                            catch (ClusterTopologyCheckedException ignored) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to send pending start request to node (is node alive?): " +
-                                        nodeId);
-                            }
-                            catch (IgniteCheckedException e) {
-                                U.error(log, "Failed to send pending start request to node: " + nodeId, e);
+                        if (fut0 != null) {
+                            ClusterTopologyCheckedException err = new ClusterTopologyCheckedException(
+                                "Node left grid while sending message to: " + nodeId);
 
-                                completeStartFuture(routineId);
-                            }
-                        }
-
-                        break;
-
-                    case EVT_NODE_LEFT:
-                    case EVT_NODE_FAILED:
-                        // Do not wait for start acknowledgements from left node.
-                        for (Map.Entry<UUID, Collection<UUID>> e : waitForStartAck.entrySet()) {
-                            Collection<UUID> nodeIds = e.getValue();
-
-                            for (Iterator<UUID> it = nodeIds.iterator(); it.hasNext();) {
-                                if (nodeId.equals(it.next())) {
-                                    it.remove();
-
-                                    break;
-                                }
-                            }
-
-                            if (nodeIds.isEmpty())
-                                completeStartFuture(e.getKey());
+                            fut0.onDone(err);
                         }
+                    }
+                }
+            }
+        }, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
-                        // Do not wait for stop acknowledgements from left node.
-                        for (Map.Entry<UUID, Collection<UUID>> e : waitForStopAck.entrySet()) {
-                            Collection<UUID> nodeIds = e.getValue();
+        ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessage.class,
+            new CustomEventListener<StartRoutineDiscoveryMessage>() {
+                @Override public void onCustomEvent(ClusterNode snd, StartRoutineDiscoveryMessage msg) {
+                    if (!snd.id().equals(ctx.localNodeId()))
+                        processStartRequest(snd.id(), msg);
+                }
+            });
 
-                            for (Iterator<UUID> it = nodeIds.iterator(); it.hasNext();) {
-                                if (nodeId.equals(it.next())) {
-                                    it.remove();
+        ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class,
+            new CustomEventListener<StartRoutineAckDiscoveryMessage>() {
+                @Override public void onCustomEvent(ClusterNode snd, StartRoutineAckDiscoveryMessage msg) {
+                    StartFuture fut = startFuts.remove(msg.routineId());
 
-                                    break;
-                                }
-                            }
+                    if (fut != null) {
+                        if (msg.errs().isEmpty())
+                            fut.onRemoteRegistered();
+                        else {
+                            IgniteCheckedException firstEx = F.first(msg.errs().values());
 
-                            if (nodeIds.isEmpty())
-                                completeStopFuture(e.getKey());
-                        }
-
-                        // Unregister handlers created by left node.
-                        for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) {
-                            UUID routineId = e.getKey();
-                            RemoteRoutineInfo info = e.getValue();
+                            fut.onDone(firstEx);
 
-                            if (info.autoUnsubscribe && nodeId.equals(info.nodeId))
-                                unregisterRemote(routineId);
+                            stopRoutine(msg.routineId());
                         }
 
-                        for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) {
-                            SyncMessageAckFuture fut = e.getValue();
-
-                            if (fut.nodeId().equals(nodeId)) {
-                                SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey());
+                    }
+                }
+            });
 
-                                if (fut0 != null) {
-                                    ClusterTopologyCheckedException err = new ClusterTopologyCheckedException(
-                                        "Node left grid while sending message to: " + nodeId);
+        ctx.discovery().setCustomEventListener(StopRoutineDiscoveryMessage.class,
+            new CustomEventListener<StopRoutineDiscoveryMessage>() {
+                @Override public void onCustomEvent(ClusterNode snd, StopRoutineDiscoveryMessage msg) {
+                    if (!snd.id().equals(ctx.localNodeId())) {
+                        UUID routineId = msg.routineId();
 
-                                    fut0.onDone(err);
-                                }
-                            }
-                        }
+                        unregisterRemote(routineId);
+                    }
+                }
+            });
 
-                        break;
+        ctx.discovery().setCustomEventListener(StopRoutineAckDiscoveryMessage.class,
+            new CustomEventListener<StopRoutineAckDiscoveryMessage>() {
+                @Override public void onCustomEvent(ClusterNode snd, StopRoutineAckDiscoveryMessage msg) {
+                    StopFuture fut = stopFuts.remove(msg.routineId());
 
-                    default:
-                        assert false : "Unexpected event received: " + evt.shortDisplay();
+                    if (fut != null)
+                        fut.onDone();
                 }
-            }
-        }, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
+            });
 
         ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() {
             @Override public void onMessage(UUID nodeId, Object obj) {
@@ -258,26 +209,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 }
 
                 switch (msg.type()) {
-                    case MSG_START_REQ:
-                        processStartRequest(nodeId, msg);
-
-                        break;
-
-                    case MSG_START_ACK:
-                        processStartAck(nodeId, msg);
-
-                        break;
-
-                    case MSG_STOP_REQ:
-                        processStopRequest(nodeId, msg);
-
-                        break;
-
-                    case MSG_STOP_ACK:
-                        processStopAck(nodeId, msg);
-
-                        break;
-
                     case MSG_EVT_NOTIFICATION:
                         processNotification(nodeId, msg);
 
@@ -323,9 +254,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             pendingLock.lock();
 
             try {
-                // Create empty pending set.
-                pending.put(nodeId, new HashSet<GridContinuousMessage>());
-
                 DiscoveryData data = new DiscoveryData(ctx.localNodeId());
 
                 // Collect listeners information (will be sent to
@@ -486,32 +414,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             });
         }
 
-        Collection<? extends ClusterNode> nodes;
-        Collection<UUID> nodeIds;
-
         pendingLock.lock();
 
         try {
-            // Nodes that participate in routine (request will be sent to these nodes directly).
-            nodes = F.view(ctx.discovery().allNodes(), F.and(prjPred, F.remoteNodes(ctx.localNodeId())));
-
-            // Stop with exception if projection is empty.
-            if (nodes.isEmpty() && !locIncluded) {
-                return new GridFinishedFuture<>(
-                    new ClusterTopologyCheckedException("Failed to register remote continuous listener (projection is empty)."));
-            }
-
-            // IDs of nodes where request will be sent.
-            nodeIds = new GridConcurrentHashSet<>(F.viewReadOnly(nodes, F.node2id()));
-
-            // If there are currently joining nodes, add request to their pending lists.
-            // Node IDs set is updated to make sure that we wait for acknowledgement from
-            // these nodes.
-            for (Map.Entry<UUID, Collection<GridContinuousMessage>> e : pending.entrySet()) {
-                if (nodeIds.add(e.getKey()))
-                    e.getValue().add(new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData, false));
-            }
-
             // Register routine locally.
             locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval));
         }
@@ -521,68 +426,19 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         StartFuture fut = new StartFuture(ctx, routineId);
 
-        if (!nodeIds.isEmpty()) {
-            // Wait for acknowledgements.
-            waitForStartAck.put(routineId, nodeIds);
-
-            startFuts.put(routineId, fut);
-
-            // Register acknowledge timeout (timeout object will be removed when
-            // future is completed).
-            fut.addTimeoutObject(new GridTimeoutObjectAdapter(ackTimeout) {
-                @Override public void onTimeout() {
-                    // Stop waiting for acknowledgements.
-                    Collection<UUID> ids = waitForStartAck.remove(routineId);
-
-                    if (ids != null) {
-                        StartFuture f = startFuts.remove(routineId);
+        startFuts.put(routineId, fut);
 
-                        assert f != null;
-
-                        // If there are still nodes without acknowledgements,
-                        // Stop routine with exception. Continue and complete
-                        // future otherwise.
-                        if (!ids.isEmpty()) {
-                            f.onDone(new IgniteCheckedException("Failed to get start acknowledgement from nodes (timeout " +
-                                "expired): " + ids + ". Will unregister all continuous listeners."));
-
-                            stopRoutine(routineId);
-                        }
-                        else
-                            f.onRemoteRegistered();
-                    }
-                }
-            });
+        try {
+            ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData));
         }
+        catch (IgniteException e) { // Marshaller exception may occurs if user pass unmarshallable filter.
+            startFuts.remove(routineId);
 
-        if (!nodes.isEmpty()) {
-            // Do not send projection predicate (nodes already filtered).
-            reqData.projectionPredicate(null);
-            reqData.projectionPredicateBytes(null);
-
-            // Send start requests.
-            try {
-                GridContinuousMessage req = new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData, false);
-
-                sendWithRetries(nodes, req, null);
-            }
-            catch (IgniteCheckedException e) {
-                startFuts.remove(routineId);
-                waitForStartAck.remove(routineId);
-
-                fut.onDone(e);
-
-                stopRoutine(routineId);
+            locInfos.remove(routineId);
 
-                locIncluded = false;
-            }
-        }
-        else {
-            // There are no remote nodes, but we didn't throw topology exception.
-            assert locIncluded;
+            fut.onDone(e);
 
-            // Do not wait anything from remote nodes.
-            fut.onRemoteRegistered();
+            return fut;
         }
 
         // Register local handler if needed.
@@ -640,61 +496,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             // Unregister handler locally.
             unregisterHandler(routineId, routine.hnd, true);
 
-            pendingLock.lock();
-
-            try {
-                // Remove pending requests for this routine.
-                for (Collection<GridContinuousMessage> msgs : pending.values()) {
-                    Iterator<GridContinuousMessage> it = msgs.iterator();
-
-                    while (it.hasNext()) {
-                        if (it.next().routineId().equals(routineId))
-                            it.remove();
-                    }
-                }
-            }
-            finally {
-                pendingLock.unlock();
-            }
-
-            // Nodes where to send stop requests.
-            Collection<? extends ClusterNode> nodes = F.view(ctx.discovery().allNodes(),
-                F.and(routine.prjPred, F.remoteNodes(ctx.localNodeId())));
-
-            if (!nodes.isEmpty()) {
-                // Wait for acknowledgements.
-                waitForStopAck.put(routineId, new GridConcurrentHashSet<>(F.viewReadOnly(nodes, F.node2id())));
-
-                // Register acknowledge timeout (timeout object will be removed when
-                // future is completed).
-                fut.addTimeoutObject(new StopTimeoutObject(ackTimeout, routineId,
-                    new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null, false)));
-
-                // Send stop requests.
-                try {
-                    for (ClusterNode node : nodes) {
-                        try {
-                            sendWithRetries(node.id(),
-                                new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null, false),
-                                null);
-                        }
-                        catch (ClusterTopologyCheckedException ignored) {
-                            U.warn(log, "Failed to send stop request (node left topology): " + node.id());
-                        }
-                    }
-                }
-                catch (IgniteCheckedException e) {
-                    stopFuts.remove(routineId);
-                    waitForStopAck.remove(routineId);
-
-                    fut.onDone(e);
-                }
-            }
-            else {
-                stopFuts.remove(routineId);
-
-                fut.onDone();
-            }
+            ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId));
         }
 
         return fut;
@@ -727,7 +529,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             assert info.interval == 0 || !sync;
 
             if (sync) {
-                SyncMessageAckFuture fut = new SyncMessageAckFuture(ctx, nodeId);
+                SyncMessageAckFuture fut = new SyncMessageAckFuture(nodeId);
 
                 IgniteUuid futId = IgniteUuid.randomUuid();
 
@@ -782,12 +584,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @param nodeId Sender ID.
      * @param req Start request.
      */
-    private void processStartRequest(UUID nodeId, GridContinuousMessage req) {
+    private void processStartRequest(UUID nodeId, StartRoutineDiscoveryMessage req) {
         assert nodeId != null;
         assert req != null;
 
         UUID routineId = req.routineId();
-        StartRequestData data = req.data();
+        StartRequestData data = req.startRequestData();
 
         GridContinuousHandler hnd = data.handler();
 
@@ -836,100 +638,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             }
         }
 
-        try {
-            sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK, routineId, null, err, false), null);
-        }
-        catch (ClusterTopologyCheckedException ignored) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send start acknowledgement to node (is node alive?): " + nodeId);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send start acknowledgement to node: " + nodeId, e);
-        }
+        if (err != null)
+            req.addError(ctx.localNodeId(), err);
 
         if (registered)
             hnd.onListenerRegistered(routineId, ctx);
     }
 
     /**
-     * @param nodeId Sender ID.
-     * @param ack Start acknowledgement.
-     */
-    private void processStartAck(UUID nodeId, GridContinuousMessage ack) {
-        assert nodeId != null;
-        assert ack != null;
-
-        UUID routineId = ack.routineId();
-
-        final IgniteCheckedException err = ack.data();
-
-        if (err != null) {
-            if (waitForStartAck.remove(routineId) != null) {
-                final StartFuture fut = startFuts.remove(routineId);
-
-                if (fut != null) {
-                    fut.onDone(err);
-
-                    stopRoutine(routineId);
-                }
-            }
-        }
-
-        Collection<UUID> nodeIds = waitForStartAck.get(routineId);
-
-        if (nodeIds != null) {
-            nodeIds.remove(nodeId);
-
-            if (nodeIds.isEmpty())
-                completeStartFuture(routineId);
-        }
-    }
-
-    /**
-     * @param nodeId Sender ID.
-     * @param req Stop request.
-     */
-    private void processStopRequest(UUID nodeId, GridContinuousMessage req) {
-        assert nodeId != null;
-        assert req != null;
-
-        UUID routineId = req.routineId();
-
-        unregisterRemote(routineId);
-
-        try {
-            sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK, routineId, null, null, false), null);
-        }
-        catch (ClusterTopologyCheckedException ignored) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send stop acknowledgement to node (is node alive?): " + nodeId);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send stop acknowledgement to node: " + nodeId, e);
-        }
-    }
-
-    /**
-     * @param nodeId Sender ID.
-     * @param ack Stop acknowledgement.
-     */
-    private void processStopAck(UUID nodeId, GridContinuousMessage ack) {
-        assert nodeId != null;
-        assert ack != null;
-
-        UUID routineId = ack.routineId();
-
-        Collection<UUID> nodeIds = waitForStopAck.get(routineId);
-
-        if (nodeIds != null) {
-            nodeIds.remove(nodeId);
-
-            if (nodeIds.isEmpty())
-                completeStopFuture(routineId);
-        }
-    }
-
-    /**
      * @param msg Message.
      */
     private void processMessageAck(GridContinuousMessage msg) {
@@ -972,36 +688,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param routineId Consume ID.
-     */
-    private void completeStartFuture(UUID routineId) {
-        assert routineId != null;
-
-        if (waitForStartAck.remove(routineId) != null) {
-            StartFuture fut = startFuts.remove(routineId);
-
-            assert fut != null;
-
-            fut.onRemoteRegistered();
-        }
-    }
-
-    /**
-     * @param routineId Consume ID.
-     */
-    private void completeStopFuture(UUID routineId) {
-        assert routineId != null;
-
-        if (waitForStopAck.remove(routineId) != null) {
-            GridFutureAdapter <?> fut = stopFuts.remove(routineId);
-
-            assert fut != null;
-
-            fut.onDone();
-        }
-    }
-
-    /**
      * @param nodeId Node ID.
      * @param routineId Consume ID.
      * @param hnd Handler.
@@ -1589,13 +1275,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         private volatile GridTimeoutObject timeoutObj;
 
         /**
-         * Required by {@link Externalizable}.
-         */
-        public StartFuture() {
-            // No-op.
-        }
-
-        /**
          * @param ctx Kernal context.
          * @param routineId Consume ID.
          */
@@ -1706,10 +1385,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         private UUID nodeId;
 
         /**
-         * @param ctx Kernal context.
          * @param nodeId Master node ID.
          */
-        SyncMessageAckFuture(GridKernalContext ctx, UUID nodeId) {
+        SyncMessageAckFuture(UUID nodeId) {
             this.nodeId = nodeId;
         }
 
@@ -1725,76 +1403,4 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             return S.toString(SyncMessageAckFuture.class, this);
         }
     }
-
-    /**
-     * Timeout object for stop process.
-     */
-    private class StopTimeoutObject extends GridTimeoutObjectAdapter {
-        /** Timeout. */
-        private final long timeout;
-
-        /** Routine ID. */
-        private final UUID routineId;
-
-        /** Request. */
-        private final GridContinuousMessage req;
-
-        /**
-         * @param timeout Timeout.
-         * @param routineId Routine ID.
-         * @param req Request.
-         */
-        protected StopTimeoutObject(long timeout, UUID routineId, GridContinuousMessage req) {
-            super(timeout);
-
-            assert routineId != null;
-            assert req != null;
-
-            this.timeout = timeout;
-            this.routineId = routineId;
-            this.req = req;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onTimeout() {
-            Collection<UUID> ids = waitForStopAck.remove(routineId);
-
-            if (ids != null) {
-                U.warn(log, "Failed to get stop acknowledgement from nodes (timeout expired): " + ids +
-                    ". Will retry.");
-
-                StopFuture f = stopFuts.get(routineId);
-
-                if (f != null) {
-                    if (!ids.isEmpty()) {
-                        waitForStopAck.put(routineId, ids);
-
-                        // Resend requests.
-                        for (UUID id : ids) {
-                            try {
-                                sendWithRetries(id, req, null);
-                            }
-                            catch (ClusterTopologyCheckedException ignored) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to resend stop request to node (is node alive?): " + id);
-                            }
-                            catch (IgniteCheckedException e) {
-                                U.error(log, "Failed to resend stop request to node: " + id, e);
-
-                                ids.remove(id);
-
-                                if (ids.isEmpty())
-                                    f.onDone(e);
-                            }
-                        }
-
-                        // Reschedule timeout.
-                        ctx.timeout().addTimeoutObject(new StopTimeoutObject(timeout, routineId, req));
-                    }
-                    else if (stopFuts.remove(routineId) != null)
-                        f.onDone();
-                }
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
new file mode 100644
index 0000000..4e5bb9c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.managers.discovery.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class StartRoutineAckDiscoveryMessage implements DiscoveryCustomMessage {
+    /** Routine ID. */
+    private final UUID routineId;
+
+    /** */
+    private final Map<UUID, IgniteCheckedException> errs;
+
+    /**
+     * @param routineId Routine id.
+     * @param errs Errs.
+     */
+    public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs) {
+        this.routineId = routineId;
+        this.errs = new HashMap<>(errs);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean forwardMinorVersion() {
+        return false;
+    }
+
+    /**
+     * @return Routine ID.
+     */
+    public UUID routineId() {
+        return routineId;
+    }
+
+    /**
+     * @return Errs.
+     */
+    public Map<UUID, IgniteCheckedException> errs() {
+        return errs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
new file mode 100644
index 0000000..7fa78b6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class StartRoutineDiscoveryMessage implements RingEndAwareCustomMessage {
+    /** Routine ID. */
+    private final UUID routineId;
+
+    /** */
+    private final StartRequestData startReqData;
+
+    /** */
+    private final Map<UUID, IgniteCheckedException> errs = new HashMap<>();
+
+    /**
+     * @param routineId Routine id.
+     * @param startReqData Start request data.
+     */
+    public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) {
+        this.routineId = routineId;
+        this.startReqData = startReqData;
+    }
+
+
+
+    /** {@inheritDoc} */
+    @Override public boolean forwardMinorVersion() {
+        return false;
+    }
+
+    /**
+     * @return Start request data.
+     */
+    public StartRequestData startRequestData() {
+        return startReqData;
+    }
+
+    /**
+     * @param nodeId Node id.
+     * @param e Exception.
+     */
+    public void addError(UUID nodeId, IgniteCheckedException e) {
+        errs.put(nodeId, e);
+    }
+
+    /**
+     * @return Routine ID.
+     */
+    public UUID routineId() {
+        return routineId;
+    }
+
+    /**
+     * @return Errs.
+     */
+    public Map<UUID, IgniteCheckedException> errs() {
+        return errs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoveryCustomMessage newMessageOnRingEnd(IgniteSpiContext ctx) {
+        return new StartRoutineAckDiscoveryMessage(routineId, errs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
new file mode 100644
index 0000000..755552b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.internal.managers.discovery.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class StopRoutineAckDiscoveryMessage implements DiscoveryCustomMessage {
+    /** Routine ID. */
+    private final UUID routineId;
+
+    /**
+     * @param routineId Routine id.
+     */
+    public StopRoutineAckDiscoveryMessage(UUID routineId) {
+        this.routineId = routineId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean forwardMinorVersion() {
+        return false;
+    }
+
+    /**
+     * @return Routine ID.
+     */
+    public UUID routineId() {
+        return routineId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
new file mode 100644
index 0000000..9c480a0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.spi.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class StopRoutineDiscoveryMessage implements RingEndAwareCustomMessage {
+    /** Routine ID. */
+    private final UUID routineId;
+
+    /**
+     * @param routineId Routine id.
+     */
+    public StopRoutineDiscoveryMessage(UUID routineId) {
+        this.routineId = routineId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean forwardMinorVersion() {
+        return false;
+    }
+
+    /**
+     * @return Routine ID.
+     */
+    public UUID routineId() {
+        return routineId;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage newMessageOnRingEnd(IgniteSpiContext ctx) {
+        return new StopRoutineAckDiscoveryMessage(routineId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 247ff67..84a5f41 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.discovery;
 
+import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.lang.*;
@@ -142,8 +143,9 @@ public interface DiscoverySpi extends IgniteSpi {
     /**
      * Sends custom message across the ring.
      * @param evt Event.
+     * @throws IgniteException if failed to marshal evt.
      */
-    public void sendCustomEvent(DiscoveryCustomMessage evt);
+    public void sendCustomEvent(DiscoveryCustomMessage evt) throws IgniteException;
 
     /**
      * Initiates failure of provided node.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 5a78f9f..378d5a3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -42,6 +42,7 @@ import javax.cache.*;
 import javax.cache.configuration.*;
 import javax.cache.event.*;
 import javax.cache.integration.*;
+import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -57,7 +58,7 @@ import static org.apache.ignite.internal.processors.cache.query.CacheQueryType.*
 /**
  * Continuous queries tests.
  */
-public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommonAbstractTest {
+public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommonAbstractTest implements Serializable {
     /** IP finder. */
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
@@ -177,10 +178,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             assertEquals(String.valueOf(i), 3, ((Map)U.field(proc, "locInfos")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "startFuts")).size());
-            assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "waitForStartAck")).size());
             assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "stopFuts")).size());
-            assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "waitForStopAck")).size());
-            assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "pending")).size());
 
             CacheContinuousQueryManager mgr = grid(i).context().cache().internalCache().context().continuousQueries();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
index 05fb52b..d1b25df 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
@@ -539,7 +539,7 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
         try {
             id = msg.remoteListen(null, new MessageListener());
 
-            msgLatch = new CountDownLatch(4);
+            msgLatch = new CountDownLatch(2);
 
             msg.send(null, "Message 1");
 
@@ -550,7 +550,7 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
 
             checkNodes(3, 3);
 
-            msgLatch = new CountDownLatch(6);
+            msgLatch = new CountDownLatch(3);
 
             msg.send(null, "Message 2");