You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/27 08:21:54 UTC

[1/3] ignite git commit: zk

Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 09ab8649d -> 70bc10ba2


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/913a5373
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/913a5373
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/913a5373

Branch: refs/heads/ignite-zk
Commit: 913a5373d80a9c9d674e303ad9b50d0deb27d15a
Parents: 4e8516d
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 26 13:00:39 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Dec 26 13:08:01 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/IgniteDiscoverySpi.java   |  6 ++++++
 .../processors/continuous/GridContinuousProcessor.java    | 10 ++++++++--
 .../apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java  |  5 +++++
 .../ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java    |  5 +++++
 .../spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java |  2 ++
 5 files changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/913a5373/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
index bf117f1..2e2b9af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.managers.discovery;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 
 /**
  *
@@ -64,4 +65,9 @@ public interface IgniteDiscoverySpi extends DiscoverySpi {
      * @param err Connection error.
      */
     public void resolveCommunicationError(ClusterNode node, Exception err);
+
+    /**
+     * @return {@code True} if mutable {@link DiscoverySpiCustomMessage}s are supported.
+     */
+    public boolean supportsMutableCustomEvents();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/913a5373/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index dbcea4c..eb7966e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -84,7 +85,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -164,7 +165,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
-        discoProtoVer = ctx.config().getDiscoverySpi() instanceof TcpDiscoverySpi ? 1 : 2;
+        DiscoverySpi discoSpi = ctx.config().getDiscoverySpi();
+
+        boolean nonMutableCustomEvts = (discoSpi instanceof IgniteDiscoverySpi) &&
+            !((IgniteDiscoverySpi)discoSpi).supportsMutableCustomEvents();
+
+        discoProtoVer = nonMutableCustomEvts ? 2 : 1;
 
         if (ctx.config().isDaemon())
             return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/913a5373/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 9d7dce3..781272c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2111,6 +2111,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery
     }
 
     /** {@inheritDoc} */
+    @Override public boolean supportsMutableCustomEvents() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public void resolveCommunicationError(ClusterNode node, Exception err) {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/913a5373/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index c4370cf..ebb667f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -246,6 +246,11 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
     }
 
     /** {@inheritDoc} */
+    @Override public boolean supportsMutableCustomEvents() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public void resolveCommunicationError(ClusterNode node, Exception err) {
         impl.resolveCommunicationError(node, err);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/913a5373/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index b8cfcfb..04bf113 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -3061,6 +3061,8 @@ public class ZookeeperDiscoveryImpl {
      */
     @SuppressWarnings("unchecked")
     private void notifyCustomEvent(final ZkDiscoveryCustomEventData evtData, final DiscoverySpiCustomMessage msg) {
+        assert !(msg instanceof ZkInternalMessage) : msg;
+
         if (log.isDebugEnabled())
             log.debug(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg + ']');
 


[3/3] ignite git commit: zk

Posted by sb...@apache.org.
zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/70bc10ba
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/70bc10ba
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/70bc10ba

Branch: refs/heads/ignite-zk
Commit: 70bc10ba2dd5d9c4df194b2b35f8f924a8ec9822
Parents: 830faea
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 26 14:31:04 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 27 11:21:43 2017 +0300

----------------------------------------------------------------------
 .../discovery/CustomMessageWrapper.java         |   5 +
 .../discovery/DiscoveryCustomMessage.java       |  10 +-
 .../discovery/GridDiscoveryManager.java         |  11 +
 .../cache/CacheAffinityChangeMessage.java       |   5 +
 .../cache/CacheStatisticsModeChangeMessage.java |   5 +
 .../ClientCacheChangeDiscoveryMessage.java      |   5 +
 .../ClientCacheChangeDummyDiscoveryMessage.java |   5 +
 .../cache/DynamicCacheChangeBatch.java          |   5 +
 .../binary/MetadataUpdateAcceptedMessage.java   |   5 +
 .../binary/MetadataUpdateProposedMessage.java   |   5 +
 .../cluster/ChangeGlobalStateFinishMessage.java |   5 +
 .../cluster/ChangeGlobalStateMessage.java       |   5 +
 .../continuous/AbstractContinuousMessage.java   |   5 +
 .../continuous/GridContinuousProcessor.java     |   7 +-
 .../StartRoutineDiscoveryMessageV2.java         |   2 +-
 .../StopRoutineAckDiscoveryMessage.java         |   5 +
 .../marshaller/MappingAcceptedMessage.java      |   5 +
 .../marshaller/MappingProposedMessage.java      |   5 +
 .../message/SchemaFinishDiscoveryMessage.java   |   5 +
 .../message/SchemaProposeDiscoveryMessage.java  |   5 +
 .../discovery/DiscoverySpiCustomMessage.java    |  15 +-
 ...kCommunicationErrorResolveFinishMessage.java |   5 +
 ...ZkCommunicationErrorResolveStartMessage.java |   5 +
 .../zk/internal/ZkDiscoveryCustomEventData.java |  19 +-
 .../zk/internal/ZkDiscoveryEventsData.java      |  28 +-
 .../zk/internal/ZkForceNodeFailMessage.java     |  11 +
 .../discovery/zk/internal/ZkIgnitePaths.java    |   8 +-
 .../zk/internal/ZkNoServersMessage.java         |   5 +
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 324 +++++++++++-------
 .../ZookeeperDiscoverySpiBasicTest.java         | 332 +++++++++++++++++++
 30 files changed, 696 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
index 8f56248..133c727 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
@@ -49,6 +49,11 @@ class CustomMessageWrapper implements DiscoverySpiCustomMessage {
         return delegate.isMutable();
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return delegate.stopProcess();
+    }
+
     /**
      * @return Delegate.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
index c708c62..6ed2096 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.managers.discovery;
 import java.io.Serializable;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
 import org.jetbrains.annotations.Nullable;
@@ -87,11 +88,18 @@ public interface DiscoveryCustomMessage extends Serializable {
     @Nullable public DiscoveryCustomMessage ackMessage();
 
     /**
-     * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
+     * @return {@code True} if message can be modified during listener notification. Changes will be sent to next nodes.
      */
     public boolean isMutable();
 
     /**
+     * See {@link DiscoverySpiCustomMessage#stopProcess()}.
+     *
+     * @return {@code True} if message should not be sent to others nodes after it was processed on coordinator.
+     */
+    public boolean stopProcess();
+
+    /**
      * Creates new discovery cache if message caused topology version change.
      *
      * @param mgr Discovery manager.

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9002233..eb6c2e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -2394,6 +2394,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @return {@code True} if configured {@link DiscoverySpi} does not support mutable custom messages.
+     */
+    public boolean unmutableCustomMessages() {
+        DiscoverySpi spi = getSpi();
+
+        return (spi instanceof IgniteDiscoverySpi) &&
+            !((IgniteDiscoverySpi)spi).supportsMutableCustomEvents();
+
+    }
+
+    /**
      * @param node Problem node.
      * @param err Error.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
index fe1014c..937a889 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
@@ -156,6 +156,11 @@ public class CacheAffinityChangeMessage implements DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         return discoCache.copy(topVer, null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
index 40bcfaf..e33256f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
@@ -101,6 +101,11 @@ public class CacheStatisticsModeChangeMessage implements DiscoveryCustomMessage
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer,
         DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
index e35d80e..ae76c95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
@@ -173,6 +173,11 @@ public class ClientCacheChangeDiscoveryMessage implements DiscoveryCustomMessage
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
index 6ed3ecc..4ce0c87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
@@ -105,6 +105,11 @@ public class ClientCacheChangeDummyDiscoveryMessage implements DiscoveryCustomMe
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 83459a5..d85e29b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -77,6 +77,11 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer,
         DiscoCache discoCache) {
         return mgr.createDiscoCacheOnCacheChange(topVer, discoCache);

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
index 0416746..df64613 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
@@ -71,6 +71,11 @@ public class MetadataUpdateAcceptedMessage implements DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
index f9bd660..84e32e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
@@ -134,6 +134,11 @@ public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessa
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
index a1fbacf..c56c0eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
@@ -83,6 +83,11 @@ public class ChangeGlobalStateFinishMessage implements DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
index 6a642bc..f10a652 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
@@ -116,6 +116,11 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer,
         DiscoCache discoCache) {
         return mgr.createDiscoCacheOnCacheChange(topVer, discoCache);

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
index e9754d1..928c619 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
@@ -63,6 +63,11 @@ public abstract class AbstractContinuousMessage implements DiscoveryCustomMessag
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index eb7966e..899af48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -165,12 +165,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
-        DiscoverySpi discoSpi = ctx.config().getDiscoverySpi();
-
-        boolean nonMutableCustomEvts = (discoSpi instanceof IgniteDiscoverySpi) &&
-            !((IgniteDiscoverySpi)discoSpi).supportsMutableCustomEvents();
-
-        discoProtoVer = nonMutableCustomEvts ? 2 : 1;
+        discoProtoVer = ctx.discovery().unmutableCustomMessages() ? 2 : 1;
 
         if (ctx.config().isDaemon())
             return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
index e9760a8..275765d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
@@ -42,7 +42,7 @@ public class StartRoutineDiscoveryMessageV2 extends AbstractContinuousMessage {
      * @param startReqData Start request data.
      * @param keepBinary Keep binary flag.
      */
-    public StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestDataV2 startReqData, boolean keepBinary) {
+    StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestDataV2 startReqData, boolean keepBinary) {
         super(routineId);
 
         this.startReqData = startReqData;

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
index 79d8b29..dfba0e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
@@ -42,6 +42,11 @@ public class StopRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(StopRoutineAckDiscoveryMessage.class, this, "routineId", routineId());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
index 7af0559..80e3f7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
@@ -63,6 +63,11 @@ public class MappingAcceptedMessage implements DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
index b4e13fb..9358585 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
@@ -98,6 +98,11 @@ public class MappingProposedMessage implements DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
index 2245b24..f802e09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
@@ -59,6 +59,11 @@ public class SchemaFinishDiscoveryMessage extends SchemaAbstractDiscoveryMessage
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean exchange() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
index 0e1270b..62b6d6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
@@ -60,6 +60,11 @@ public class SchemaProposeDiscoveryMessage extends SchemaAbstractDiscoveryMessag
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean exchange() {
         return exchange;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index a0f9b75..10e1101 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -30,12 +30,23 @@ import org.jetbrains.annotations.Nullable;
  */
 public interface DiscoverySpiCustomMessage extends Serializable {
     /**
-     * Called when message passed the ring.
+     * Called when custom message has been handled by all nodes.
+     *
+     * @return Ack message or {@code null} if ack is not required.
      */
     @Nullable public DiscoverySpiCustomMessage ackMessage();
 
     /**
-     * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
+     * @return {@code True} if message can be modified during listener notification. Changes will be send to next nodes.
      */
     public boolean isMutable();
+
+    /**
+     * Called on discovery coordinator node after after listener is notified. If returns {@code true}
+     * then message is not passed to others nodes, if after this {@link #ackMessage()} returns ack, it is sent to
+     * all nodes.
+     *
+     * @return {@code True} if message should not be sent to all nodes.
+     */
+    public boolean stopProcess();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
index 147b78f..9b7476c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
@@ -58,6 +58,11 @@ class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMess
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ZkCommunicationErrorResolveFinishMessage.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
index e85277b..bb63f30 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
@@ -50,6 +50,11 @@ public class ZkCommunicationErrorResolveStartMessage implements DiscoverySpiCust
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ZkCommunicationErrorResolveStartMessage.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
index 6375bc7..ec55682 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
@@ -30,7 +30,7 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
     private static final long serialVersionUID = 0L;
 
     /** */
-    private static final int CUSTOM_MSG_ACK_FLAG = 0x01;
+    final long origEvtId;
 
     /** */
     final UUID sndNodeId;
@@ -46,37 +46,36 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
 
     /**
      * @param evtId Event ID.
+     * @param origEvtId For acknowledge events ID of original event.
      * @param topVer Topology version.
      * @param sndNodeId Sender node ID.
      * @param msg Message instance.
      * @param evtPath Event path.
-     * @param ack Acknowledge event flag.
      */
-    ZkDiscoveryCustomEventData(long evtId,
+    ZkDiscoveryCustomEventData(
+        long evtId,
+        long origEvtId,
         long topVer,
         UUID sndNodeId,
         DiscoverySpiCustomMessage msg,
-        String evtPath,
-        boolean ack)
+        String evtPath)
     {
         super(evtId, DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, topVer);
 
         assert sndNodeId != null;
-        assert msg != null || ack || !F.isEmpty(evtPath);
+        assert msg != null || origEvtId != 0 || !F.isEmpty(evtPath);
 
+        this.origEvtId = origEvtId;
         this.msg = msg;
         this.sndNodeId = sndNodeId;
         this.evtPath = evtPath;
-
-        if (ack)
-            flags |= CUSTOM_MSG_ACK_FLAG;
     }
 
     /**
      * @return {@code True} for custom event ack message.
      */
     boolean ackEvent() {
-        return flagSet(CUSTOM_MSG_ACK_FLAG);
+        return origEvtId != 0;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
index cb2d0be..70e6ba2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
@@ -30,31 +30,31 @@ class ZkDiscoveryEventsData implements Serializable {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** */
+    /** Unique cluster ID (generated when first node in cluster starts). */
     final UUID clusterId;
 
-    /** */
-    int procCustEvt = -1;
+    /** Internal order of last processed custom event. */
+    long procCustEvt = -1;
 
-    /** */
+    /** Event ID counter. */
     long evtIdGen;
 
-    /** */
+    /** Current topology version. */
     long topVer;
 
-    /** */
+    /** Max node internal order in cluster. */
     long maxInternalOrder;
 
-    /** */
+    /** Min internal order in cluster. */
     final long startInternalOrder;
 
-    /** */
-    final long gridStartTime;
+    /** Cluster start time (recorded when first node in cluster starts). */
+    final long clusterStartTime;
 
-    /** */
+    /** Events to process. */
     final TreeMap<Long, ZkDiscoveryEventData> evts;
 
-    /** */
+    /** ID of current active communication error resolve process. */
     private UUID commErrFutId;
 
     /**
@@ -77,19 +77,19 @@ class ZkDiscoveryEventsData implements Serializable {
      * @param clusterId Cluster ID.
      * @param startInternalOrder Starting internal order for cluster.
      * @param topVer Current topology version.
-     * @param gridStartTime Cluster start time.
+     * @param clusterStartTime Cluster start time.
      * @param evts Events history.
      */
     private ZkDiscoveryEventsData(
         UUID clusterId,
         long startInternalOrder,
-        long gridStartTime,
+        long clusterStartTime,
         long topVer,
         TreeMap<Long, ZkDiscoveryEventData> evts)
     {
         this.clusterId = clusterId;
         this.startInternalOrder = startInternalOrder;
-        this.gridStartTime = gridStartTime;
+        this.clusterStartTime = clusterStartTime;
         this.topVer = topVer;
         this.evts = evts;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
index 333f457..c76bcba 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.jetbrains.annotations.Nullable;
 
@@ -52,4 +53,14 @@ public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInte
     @Override public boolean isMutable() {
         return false;
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkForceNodeFailMessage.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index 1c8706e..4ba6de2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -322,11 +322,13 @@ class ZkIgnitePaths {
     }
 
     /**
-     * @param evtId Event ID.
+     * @param origEvtId ID of original custom event.
      * @return Path for custom event ack.
      */
-    String ackEventDataPath(long evtId) {
-        return customEvtsAcksDir + "/" + String.valueOf(evtId);
+    String ackEventDataPath(long origEvtId) {
+        assert origEvtId != 0;
+
+        return customEvtsAcksDir + "/" + String.valueOf(origEvtId);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
index dcbd205..626fe74 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
@@ -39,6 +39,11 @@ class ZkNoServersMessage implements DiscoverySpiCustomMessage, ZkInternalMessage
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ZkNoServersMessage.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 04bf113..ad7da00 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -1365,11 +1365,11 @@ public class ZookeeperDiscoveryImpl {
 
         ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
             evtsData.evtIdGen,
+            0L,
             evtsData.topVer,
             locNode.id(),
             new ZkNoServersMessage(),
-            null,
-            false);
+            null);
 
         Collection<ZookeeperClusterNode> nodesToAck = Collections.emptyList();
 
@@ -2069,7 +2069,7 @@ public class ZookeeperDiscoveryImpl {
         if (log.isInfoEnabled()) {
             log.info("New cluster started [locId=" + locNode.id() +
                 ", clusterId=" + rtState.evtsData.clusterId +
-                ", startTime=" + rtState.evtsData.gridStartTime + ']');
+                ", startTime=" + rtState.evtsData.clusterStartTime + ']');
         }
 
         locNode.internalId(locInternalId);
@@ -2176,7 +2176,7 @@ public class ZookeeperDiscoveryImpl {
         ZookeeperClient zkClient = rtState.zkClient;
         ZkDiscoveryEventsData evtsData = rtState.evtsData;
 
-        TreeMap<Integer, String> newEvts = null;
+        TreeMap<Integer, String> unprocessedEvts = null;
 
         for (int i = 0; i < customEvtNodes.size(); i++) {
             String evtPath = customEvtNodes.get(i);
@@ -2184,129 +2184,164 @@ public class ZookeeperDiscoveryImpl {
             int evtSeq = ZkIgnitePaths.customEventSequence(evtPath);
 
             if (evtSeq > evtsData.procCustEvt) {
-                if (newEvts == null)
-                    newEvts = new TreeMap<>();
+                if (unprocessedEvts == null)
+                    unprocessedEvts = new TreeMap<>();
 
-                newEvts.put(evtSeq, evtPath);
+                unprocessedEvts.put(evtSeq, evtPath);
             }
         }
 
-        if (newEvts != null) {
-            Set<UUID> alives = null;
+        if (unprocessedEvts == null)
+            return;
 
-            for (Map.Entry<Integer, String> evtE : newEvts.entrySet()) {
-                evtsData.procCustEvt = evtE.getKey();
+        for (Map.Entry<Integer, String> evtE : unprocessedEvts.entrySet()) {
+            evtsData.procCustEvt = evtE.getKey();
 
-                String evtPath = evtE.getValue();
+            String evtPath = evtE.getValue();
 
-                UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtPath);
+            UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtPath);
 
-                ZookeeperClusterNode sndNode = rtState.top.nodesById.get(sndNodeId);
+            ZookeeperClusterNode sndNode = rtState.top.nodesById.get(sndNodeId);
 
-                if (alives != null && !alives.contains(sndNode.id()))
-                    sndNode = null;
+            if (sndNode != null) {
+                byte[] evtBytes = readCustomEventData(zkClient, evtPath, sndNodeId);
 
-                if (sndNode != null) {
-                    byte[] evtBytes = readCustomEventData(zkClient, evtPath, sndNodeId);
+                DiscoverySpiCustomMessage msg;
 
-                    DiscoverySpiCustomMessage msg;
+                try {
+                    msg = unmarshalZip(evtBytes);
+                }
+                catch (Exception e) {
+                    U.error(log, "Failed to unmarshal custom discovery message: " + e, e);
 
-                    try {
-                        msg = unmarshalZip(evtBytes);
+                    deleteCustomEventDataAsync(rtState.zkClient, evtPath);
 
-                        if (msg instanceof ZkForceNodeFailMessage) {
-                            ZkForceNodeFailMessage msg0 = (ZkForceNodeFailMessage)msg;
+                    continue;
+                }
 
-                            if (alives == null)
-                                alives = new HashSet<>(rtState.top.nodesById.keySet());
+                generateAndProcessCustomEventOnCoordinator(evtPath, sndNode, msg);
+            }
+            else {
+                U.warn(log, "Ignore custom event from unknown node: " + sndNodeId);
 
-                            if (alives.contains(msg0.nodeId)) {
-                                evtsData.topVer++;
+                deleteCustomEventDataAsync(rtState.zkClient, evtPath);
+            }
+        }
+    }
 
-                                alives.remove(msg0.nodeId);
+    /**
+     * @param evtPath Event data path.
+     * @param sndNode Sender node.
+     * @param msg Message instance.
+     * @throws Exception If failed.
+     */
+    private void generateAndProcessCustomEventOnCoordinator(String evtPath,
+        ZookeeperClusterNode sndNode,
+        DiscoverySpiCustomMessage msg) throws Exception
+    {
+        ZookeeperClient zkClient = rtState.zkClient;
+        ZkDiscoveryEventsData evtsData = rtState.evtsData;
 
-                                ZookeeperClusterNode node = rtState.top.nodesById.get(msg0.nodeId);
+        if (msg instanceof ZkForceNodeFailMessage) {
+            ZkForceNodeFailMessage msg0 = (ZkForceNodeFailMessage)msg;
 
-                                assert node != null :  msg0.nodeId;
+            if (rtState.top.nodesById.containsKey(msg0.nodeId))
+                evtsData.topVer++;
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Ignore forcible node fail request for unknown node: " + msg0.nodeId);
 
-                                // TODO ZK: delete when process event
-                                for (String child : zkClient.getChildren(zkPaths.aliveNodesDir)) {
-                                    if (ZkIgnitePaths.aliveInternalId(child) == node.internalId()) {
-                                        zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + child);
+                deleteCustomEventDataAsync(zkClient, evtPath);
 
-                                        break;
-                                    }
-                                }
-                            }
-                            else {
-                                if (log.isDebugEnabled())
-                                    log.debug("Ignore forcible node fail request for unknown node: " + msg0.nodeId);
+                return;
+            }
+        }
+        else if (msg instanceof ZkCommunicationErrorResolveStartMessage) {
+            ZkCommunicationErrorResolveStartMessage msg0 =
+                (ZkCommunicationErrorResolveStartMessage)msg;
 
-                                deleteCustomEventDataAsync(zkClient, evtPath);
+            if (evtsData.communicationErrorResolveFutureId() != null) {
+                if (log.isInfoEnabled()) {
+                    log.info("Ignore communication error resolve message, resolve process " +
+                        "already started [sndNode=" + sndNode + ']');
+                }
 
-                                continue;
-                            }
-                        }
-                        else if (msg instanceof ZkCommunicationErrorResolveStartMessage) {
-                            ZkCommunicationErrorResolveStartMessage msg0 =
-                                (ZkCommunicationErrorResolveStartMessage)msg;
-
-                            if (evtsData.communicationErrorResolveFutureId() != null) {
-                                if (log.isInfoEnabled()) {
-                                    log.info("Ignore communication error resolve message, resolve process " +
-                                        "already started [sndNode=" + sndNode + ']');
-                                }
+                deleteCustomEventDataAsync(zkClient, evtPath);
 
-                                deleteCustomEventDataAsync(zkClient, evtPath);
+                return;
+            }
+            else {
+                if (log.isInfoEnabled()) {
+                    log.info("Start cluster-wide communication error resolve [sndNode=" + sndNode +
+                        ", reqId=" + msg0.id +
+                        ", topVer=" + evtsData.topVer + ']');
+                }
 
-                                continue;
-                            }
-                            else {
-                                if (log.isInfoEnabled()) {
-                                    log.info("Start cluster-wide communication error resolve [sndNode=" + sndNode +
-                                        ", reqId=" + msg0.id +
-                                        ", topVer=" + evtsData.topVer + ']');
-                                }
+                zkClient.createIfNeeded(zkPaths.distributedFutureBasePath(msg0.id),
+                    null,
+                    PERSISTENT);
 
-                                zkClient.createIfNeeded(zkPaths.distributedFutureBasePath(msg0.id),
-                                    null,
-                                    PERSISTENT);
+                evtsData.communicationErrorResolveFutureId(msg0.id);
+            }
+        }
 
-                                evtsData.communicationErrorResolveFutureId(msg0.id);
-                            }
-                        }
+        evtsData.evtIdGen++;
 
-                        evtsData.evtIdGen++;
+        ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
+            evtsData.evtIdGen,
+            0L,
+            evtsData.topVer,
+            sndNode.id(),
+            null,
+            evtPath);
 
-                        ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
-                            evtsData.evtIdGen,
-                            evtsData.topVer,
-                            sndNodeId,
-                            null,
-                            evtPath,
-                            false);
+        evtData.resolvedMsg = msg;
 
-                        evtData.resolvedMsg = msg;
+        if (log.isDebugEnabled())
+            log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']');
 
-                        evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);
+        boolean fastStopProcess = false;
 
-                        if (log.isDebugEnabled())
-                            log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']');
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to unmarshal custom discovery message: " + e, e);
+        if (msg instanceof ZkInternalMessage)
+            processInternalMessage(evtData, (ZkInternalMessage)msg);
+        else {
+            notifyCustomEvent(evtData, msg);
 
-                        deleteCustomEventDataAsync(rtState.zkClient, evtPath);
-                    }
-                }
-                else {
-                    U.warn(log, "Ignore custom event from unknown node: " + sndNodeId);
+            if (msg.stopProcess()) {
+                if (log.isDebugEnabled())
+                    log.debug("Fast stop process custom event [evt=" + evtData + ", msg=" + msg + ']');
 
-                    deleteCustomEventDataAsync(rtState.zkClient, evtPath);
+                fastStopProcess = true;
+
+                // No need process this event on others nodes, skip this event.
+                evtsData.evts.remove(evtData.eventId());
+
+                evtsData.evtIdGen--;
+
+                DiscoverySpiCustomMessage ack = msg.ackMessage();
+
+                if (ack != null) {
+                    evtData = createAckEvent(ack, evtData);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Generated CUSTOM event (ack for fast stop process) [evt=" + evtData + ", msg=" + msg + ']');
+
+                    notifyCustomEvent(evtData, ack);
                 }
+                else
+                    evtData = null;
             }
+        }
+
+        if (evtData != null) {
+            evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);
+
+            rtState.locNodeInfo.lastProcEvt = evtData.eventId();
 
             saveAndProcessNewEvents();
+
+            if (fastStopProcess)
+                deleteCustomEventDataAsync(zkClient, evtPath);
         }
     }
 
@@ -2463,7 +2498,7 @@ public class ZookeeperDiscoveryImpl {
                         else {
                             if (evtData0.msg == null) {
                                 if (evtData0.ackEvent()) {
-                                    String path = zkPaths.ackEventDataPath(evtData0.eventId());
+                                    String path = zkPaths.ackEventDataPath(evtData0.origEvtId);
 
                                     msg = unmarshalZip(zkClient.getData(path));
                                 }
@@ -2580,7 +2615,7 @@ public class ZookeeperDiscoveryImpl {
 
             ZkJoinEventDataForJoined dataForJoined = unmarshalZip(dataForJoinedBytes);
 
-            rtState.gridStartTime = evtsData.gridStartTime;
+            rtState.gridStartTime = evtsData.clusterStartTime;
 
             locNode.internalId(evtData.joinedInternalId);
             locNode.order(evtData.topologyVersion());
@@ -2697,6 +2732,18 @@ public class ZookeeperDiscoveryImpl {
             throw localNodeFail("Received force EVT_NODE_FAILED event for local node.", true);
         else
             notifyNodeFail(node.internalId(), evtData.topologyVersion());
+
+        if (rtState.crd) {
+            ZookeeperClient zkClient = rtState.zkClient;
+
+            for (String child : zkClient.getChildren(zkPaths.aliveNodesDir)) {
+                if (ZkIgnitePaths.aliveInternalId(child) == node.internalId()) {
+                    zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + child);
+
+                    break;
+                }
+            }
+        }
     }
 
     /**
@@ -3000,11 +3047,11 @@ public class ZookeeperDiscoveryImpl {
 
         ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
             evtsData.evtIdGen,
+            0L,
             topVer,
             locNode.id(),
             msg,
-            null,
-            false);
+            null);
 
         evtData.resolvedMsg = msg;
 
@@ -3214,47 +3261,18 @@ public class ZookeeperDiscoveryImpl {
                     }
 
                     case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
-                        DiscoverySpiCustomMessage ack = handleProcessedCustomEvent(ctx, (ZkDiscoveryCustomEventData)evtData);
+                        DiscoverySpiCustomMessage ack = handleProcessedCustomEvent(ctx,
+                            (ZkDiscoveryCustomEventData)evtData);
 
                         if (ack != null) {
-                            rtState.evtsData.evtIdGen++;
-
-                            long evtId = rtState.evtsData.evtIdGen;
-
-                            byte[] ackBytes = marshalZip(ack);
-
-                            String path = zkPaths.ackEventDataPath(evtId);
-
-                            if (log.isDebugEnabled())
-                                log.debug("Create ack event: " + path);
-
-                            // TODO ZK: delete if previous exists?
-                            rtState.zkClient.createIfNeeded(
-                                path,
-                                ackBytes,
-                                CreateMode.PERSISTENT);
-
-                            ZkDiscoveryCustomEventData ackEvtData = new ZkDiscoveryCustomEventData(
-                                evtId,
-                                evtData.topologyVersion(), // Use topology version from original event.
-                                locNode.id(),
-                                null,
-                                null,
-                                true);
-
-                            ackEvtData.resolvedMsg = ack;
+                            ZkDiscoveryCustomEventData ackEvtData = createAckEvent(
+                                ack,
+                                (ZkDiscoveryCustomEventData)evtData);
 
                             if (newEvts == null)
                                 newEvts = new ArrayList<>();
 
                             newEvts.add(ackEvtData);
-
-                            if (log.isDebugEnabled()) {
-                                log.debug("Generated CUSTOM event ack [baseEvtId=" + evtData.eventId() +
-                                    ", evt=" + ackEvtData +
-                                    ", evtSize=" + ackBytes.length +
-                                    ", msg=" + ack + ']');
-                            }
                         }
 
                         break;
@@ -3285,6 +3303,54 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * @param ack Ack message.
+     * @param origEvt Original custom event.
+     * @return Event data.
+     * @throws Exception If failed.
+     */
+    private ZkDiscoveryCustomEventData createAckEvent(
+        DiscoverySpiCustomMessage ack,
+        ZkDiscoveryCustomEventData origEvt) throws Exception {
+        assert ack != null;
+
+        rtState.evtsData.evtIdGen++;
+
+        long evtId = rtState.evtsData.evtIdGen;
+
+        byte[] ackBytes = marshalZip(ack);
+
+        String path = zkPaths.ackEventDataPath(origEvt.eventId());
+
+        if (log.isDebugEnabled())
+            log.debug("Create ack event: " + path);
+
+        // TODO ZK: delete if previous exists?
+        rtState.zkClient.createIfNeeded(
+            path,
+            ackBytes,
+            CreateMode.PERSISTENT);
+
+        ZkDiscoveryCustomEventData ackEvtData = new ZkDiscoveryCustomEventData(
+            evtId,
+            origEvt.eventId(),
+            origEvt.topologyVersion(), // Use topology version from original event.
+            locNode.id(),
+            null,
+            null);
+
+        ackEvtData.resolvedMsg = ack;
+
+        if (log.isDebugEnabled()) {
+            log.debug("Generated CUSTOM event ack [origEvtId=" + origEvt.eventId() +
+                ", evt=" + ackEvtData +
+                ", evtSize=" + ackBytes.length +
+                ", msg=" + ack + ']');
+        }
+
+        return ackEvtData;
+    }
+
+    /**
      * @param failedNodes Failed nodes.
      * @throws Exception If failed.
      */
@@ -3383,7 +3449,7 @@ public class ZookeeperDiscoveryImpl {
                 return evtData.resolvedMsg.ackMessage();
         }
         else {
-            String path = zkPaths.ackEventDataPath(evtData.eventId());
+            String path = zkPaths.ackEventDataPath(evtData.origEvtId);
 
             if (log.isDebugEnabled())
                 log.debug("Delete path: " + path);

http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index c3c22be..7c2f642 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -65,22 +66,30 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.logger.java.JavaLogger;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.plugin.security.SecurityCredentials;
@@ -817,6 +826,329 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testCustomEvents_FastStopProcess_1() throws Exception {
+        customEvents_FastStopProcess(1, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCustomEvents_FastStopProcess_2() throws Exception {
+        customEvents_FastStopProcess(5, 5);
+    }
+
+    /**
+     * @param srvs Servers number.
+     * @param clients Clients number.
+     * @throws Exception If failed.
+     */
+    private void customEvents_FastStopProcess(int srvs, int clients) throws Exception {
+        ackEveryEventSystemProperty();
+
+        Map<UUID, List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>>> rcvdMsgs =
+            new ConcurrentHashMap<>();
+
+        Ignite crd = startGrid(0);
+
+        UUID crdId = crd.cluster().localNode().id();
+
+        if (srvs > 1)
+            startGridsMultiThreaded(1, srvs - 1);
+
+        if (clients > 0) {
+            client = true;
+
+            startGridsMultiThreaded(srvs, clients);
+        }
+
+        awaitPartitionMapExchange();
+
+        List<Ignite> nodes = G.allGrids();
+
+        assertEquals(srvs + clients, nodes.size());
+
+        for (Ignite node : nodes)
+            registerTestEventListeners(node, rcvdMsgs);
+
+        int payload = 0;
+
+        AffinityTopologyVersion topVer = ((IgniteKernal)crd).context().discovery().topologyVersionEx();
+
+        for (Ignite node : nodes) {
+            UUID sndId = node.cluster().localNode().id();
+
+            info("Send from node: " + sndId);
+
+            GridDiscoveryManager discoveryMgr = ((IgniteKernal)node).context().discovery();
+
+            {
+                List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expCrdMsgs = new ArrayList<>();
+                List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expNodesMsgs = Collections.emptyList();
+
+                TestFastStopProcessCustomMessage msg = new TestFastStopProcessCustomMessage(false, payload++);
+
+                expCrdMsgs.add(new T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>(topVer, sndId, msg));
+
+                discoveryMgr.sendCustomEvent(msg);
+
+                doSleep(200); // Wait some time to check extra messages are not received.
+
+                checkEvents(crd, rcvdMsgs, expCrdMsgs);
+
+                for (Ignite node0 : nodes) {
+                    if (node0 != crd)
+                        checkEvents(node0, rcvdMsgs, expNodesMsgs);
+                }
+
+                rcvdMsgs.clear();
+            }
+            {
+                List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expCrdMsgs = new ArrayList<>();
+                List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expNodesMsgs = new ArrayList<>();
+
+                TestFastStopProcessCustomMessage msg = new TestFastStopProcessCustomMessage(true, payload++);
+
+                expCrdMsgs.add(new T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>(topVer, sndId, msg));
+
+                discoveryMgr.sendCustomEvent(msg);
+
+                TestFastStopProcessCustomMessageAck ackMsg = new TestFastStopProcessCustomMessageAck(msg.payload);
+
+                expCrdMsgs.add(new T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>(topVer, crdId, ackMsg));
+                expNodesMsgs.add(new T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>(topVer, crdId, ackMsg));
+
+                doSleep(200); // Wait some time to check extra messages are not received.
+
+                checkEvents(crd, rcvdMsgs, expCrdMsgs);
+
+                for (Ignite node0 : nodes) {
+                    if (node0 != crd)
+                        checkEvents(node0, rcvdMsgs, expNodesMsgs);
+                }
+
+                rcvdMsgs.clear();
+            }
+
+            waitForEventsAcks(crd);
+        }
+    }
+
+    /**
+     * @param node Node to check.
+     * @param rcvdMsgs Received messages.
+     * @param expMsgs Expected messages.
+     * @throws Exception If failed.
+     */
+    private void checkEvents(
+        Ignite node,
+        final Map<UUID, List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>>> rcvdMsgs,
+        final List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expMsgs) throws Exception {
+        final UUID nodeId = node.cluster().localNode().id();
+
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> msgs = rcvdMsgs.get(nodeId);
+
+                int size = msgs == null ? 0 : msgs.size();
+
+                return size >= expMsgs.size();
+            }
+        }, 5000));
+
+        List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> msgs = rcvdMsgs.get(nodeId);
+
+        if (msgs == null)
+            msgs = Collections.emptyList();
+
+        assertEqualsCollections(expMsgs, msgs);
+    }
+
+    /**
+     * @param node Node.
+     * @param rcvdMsgs Map to store received events.
+     */
+    private void registerTestEventListeners(Ignite node,
+        final Map<UUID, List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>>> rcvdMsgs) {
+        GridDiscoveryManager discoveryMgr = ((IgniteKernal)node).context().discovery();
+
+        final UUID nodeId = node.cluster().localNode().id();
+
+        discoveryMgr.setCustomEventListener(TestFastStopProcessCustomMessage.class,
+            new CustomEventListener<TestFastStopProcessCustomMessage>() {
+                @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, TestFastStopProcessCustomMessage msg) {
+                    List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> list = rcvdMsgs.get(nodeId);
+
+                    if (list == null)
+                        rcvdMsgs.put(nodeId, list = new ArrayList<>());
+
+                    list.add(new T3<>(topVer, snd.id(), (DiscoveryCustomMessage)msg));
+                }
+            }
+        );
+        discoveryMgr.setCustomEventListener(TestFastStopProcessCustomMessageAck.class,
+            new CustomEventListener<TestFastStopProcessCustomMessageAck>() {
+                @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, TestFastStopProcessCustomMessageAck msg) {
+                    List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> list = rcvdMsgs.get(nodeId);
+
+                    if (list == null)
+                        rcvdMsgs.put(nodeId, list = new ArrayList<>());
+
+                    list.add(new T3<>(topVer, snd.id(), (DiscoveryCustomMessage)msg));
+                }
+            }
+        );
+    }
+
+    /**
+     *
+     */
+    static class TestFastStopProcessCustomMessage implements DiscoveryCustomMessage {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final IgniteUuid id = IgniteUuid.randomUuid();;
+
+        /** */
+        private final boolean createAck;
+
+        /** */
+        private final int payload;
+
+        /**
+         * @param createAck Create ack message flag.
+         * @param payload Payload.
+         */
+        TestFastStopProcessCustomMessage(boolean createAck, int payload) {
+            this.createAck = createAck;
+            this.payload = payload;
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid id() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+            return createAck ? new TestFastStopProcessCustomMessageAck(payload) : null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isMutable() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean stopProcess() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
+            AffinityTopologyVersion topVer,
+            DiscoCache discoCache) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestFastStopProcessCustomMessage that = (TestFastStopProcessCustomMessage)o;
+
+            return createAck == that.createAck && payload == that.payload;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(createAck, payload);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestFastStopProcessCustomMessage.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestFastStopProcessCustomMessageAck implements DiscoveryCustomMessage {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final IgniteUuid id = IgniteUuid.randomUuid();;
+
+        /** */
+        private final int payload;
+
+        /**
+         * @param payload Payload.
+         */
+        TestFastStopProcessCustomMessageAck(int payload) {
+            this.payload = payload;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid id() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isMutable() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean stopProcess() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
+            AffinityTopologyVersion topVer,
+            DiscoCache discoCache) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestFastStopProcessCustomMessageAck that = (TestFastStopProcessCustomMessageAck)o;
+            return payload == that.payload;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(payload);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestFastStopProcessCustomMessageAck.class, this);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testSegmentation1() throws Exception {
         sesTimeout = 2000;
         testSockNio = true;


[2/3] ignite git commit: Merge branch 'ignite-zk' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-zk

Posted by sb...@apache.org.
Merge branch 'ignite-zk' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/830faea1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/830faea1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/830faea1

Branch: refs/heads/ignite-zk
Commit: 830faea14c4f45982abf6afa297e434736fe3187
Parents: 913a537 09ab864
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 26 14:30:32 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Dec 26 14:30:32 2017 +0300

----------------------------------------------------------------------
 .../zk/internal/ZookeeperDiscoverySpiBasicTest.java    | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------