You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/27 08:21:54 UTC
[1/3] ignite git commit: zk
Repository: ignite
Updated Branches:
refs/heads/ignite-zk 09ab8649d -> 70bc10ba2
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/913a5373
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/913a5373
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/913a5373
Branch: refs/heads/ignite-zk
Commit: 913a5373d80a9c9d674e303ad9b50d0deb27d15a
Parents: 4e8516d
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 26 13:00:39 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Dec 26 13:08:01 2017 +0300
----------------------------------------------------------------------
.../internal/managers/discovery/IgniteDiscoverySpi.java | 6 ++++++
.../processors/continuous/GridContinuousProcessor.java | 10 ++++++++--
.../apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 5 +++++
.../ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java | 5 +++++
.../spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java | 2 ++
5 files changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/913a5373/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
index bf117f1..2e2b9af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.managers.discovery;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
/**
*
@@ -64,4 +65,9 @@ public interface IgniteDiscoverySpi extends DiscoverySpi {
* @param err Connection error.
*/
public void resolveCommunicationError(ClusterNode node, Exception err);
+
+ /**
+ * @return {@code True} if mutable {@link DiscoverySpiCustomMessage}s are supported.
+ */
+ public boolean supportsMutableCustomEvents();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/913a5373/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index dbcea4c..eb7966e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -84,7 +85,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -164,7 +165,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
- discoProtoVer = ctx.config().getDiscoverySpi() instanceof TcpDiscoverySpi ? 1 : 2;
+ DiscoverySpi discoSpi = ctx.config().getDiscoverySpi();
+
+ boolean nonMutableCustomEvts = (discoSpi instanceof IgniteDiscoverySpi) &&
+ !((IgniteDiscoverySpi)discoSpi).supportsMutableCustomEvents();
+
+ discoProtoVer = nonMutableCustomEvts ? 2 : 1;
if (ctx.config().isDaemon())
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/913a5373/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 9d7dce3..781272c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2111,6 +2111,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery
}
/** {@inheritDoc} */
+ @Override public boolean supportsMutableCustomEvents() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
@Override public void resolveCommunicationError(ClusterNode node, Exception err) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/913a5373/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index c4370cf..ebb667f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -246,6 +246,11 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery
}
/** {@inheritDoc} */
+ @Override public boolean supportsMutableCustomEvents() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public void resolveCommunicationError(ClusterNode node, Exception err) {
impl.resolveCommunicationError(node, err);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/913a5373/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index b8cfcfb..04bf113 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -3061,6 +3061,8 @@ public class ZookeeperDiscoveryImpl {
*/
@SuppressWarnings("unchecked")
private void notifyCustomEvent(final ZkDiscoveryCustomEventData evtData, final DiscoverySpiCustomMessage msg) {
+ assert !(msg instanceof ZkInternalMessage) : msg;
+
if (log.isDebugEnabled())
log.debug(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg + ']');
[3/3] ignite git commit: zk
Posted by sb...@apache.org.
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/70bc10ba
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/70bc10ba
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/70bc10ba
Branch: refs/heads/ignite-zk
Commit: 70bc10ba2dd5d9c4df194b2b35f8f924a8ec9822
Parents: 830faea
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 26 14:31:04 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 27 11:21:43 2017 +0300
----------------------------------------------------------------------
.../discovery/CustomMessageWrapper.java | 5 +
.../discovery/DiscoveryCustomMessage.java | 10 +-
.../discovery/GridDiscoveryManager.java | 11 +
.../cache/CacheAffinityChangeMessage.java | 5 +
.../cache/CacheStatisticsModeChangeMessage.java | 5 +
.../ClientCacheChangeDiscoveryMessage.java | 5 +
.../ClientCacheChangeDummyDiscoveryMessage.java | 5 +
.../cache/DynamicCacheChangeBatch.java | 5 +
.../binary/MetadataUpdateAcceptedMessage.java | 5 +
.../binary/MetadataUpdateProposedMessage.java | 5 +
.../cluster/ChangeGlobalStateFinishMessage.java | 5 +
.../cluster/ChangeGlobalStateMessage.java | 5 +
.../continuous/AbstractContinuousMessage.java | 5 +
.../continuous/GridContinuousProcessor.java | 7 +-
.../StartRoutineDiscoveryMessageV2.java | 2 +-
.../StopRoutineAckDiscoveryMessage.java | 5 +
.../marshaller/MappingAcceptedMessage.java | 5 +
.../marshaller/MappingProposedMessage.java | 5 +
.../message/SchemaFinishDiscoveryMessage.java | 5 +
.../message/SchemaProposeDiscoveryMessage.java | 5 +
.../discovery/DiscoverySpiCustomMessage.java | 15 +-
...kCommunicationErrorResolveFinishMessage.java | 5 +
...ZkCommunicationErrorResolveStartMessage.java | 5 +
.../zk/internal/ZkDiscoveryCustomEventData.java | 19 +-
.../zk/internal/ZkDiscoveryEventsData.java | 28 +-
.../zk/internal/ZkForceNodeFailMessage.java | 11 +
.../discovery/zk/internal/ZkIgnitePaths.java | 8 +-
.../zk/internal/ZkNoServersMessage.java | 5 +
.../zk/internal/ZookeeperDiscoveryImpl.java | 324 +++++++++++-------
.../ZookeeperDiscoverySpiBasicTest.java | 332 +++++++++++++++++++
30 files changed, 696 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
index 8f56248..133c727 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
@@ -49,6 +49,11 @@ class CustomMessageWrapper implements DiscoverySpiCustomMessage {
return delegate.isMutable();
}
+ /** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return delegate.stopProcess();
+ }
+
/**
* @return Delegate.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
index c708c62..6ed2096 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.managers.discovery;
import java.io.Serializable;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.jetbrains.annotations.Nullable;
@@ -87,11 +88,18 @@ public interface DiscoveryCustomMessage extends Serializable {
@Nullable public DiscoveryCustomMessage ackMessage();
/**
- * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
+ * @return {@code True} if message can be modified during listener notification. Changes will be sent to next nodes.
*/
public boolean isMutable();
/**
+ * See {@link DiscoverySpiCustomMessage#stopProcess()}.
+ *
+ * @return {@code True} if message should not be sent to others nodes after it was processed on coordinator.
+ */
+ public boolean stopProcess();
+
+ /**
* Creates new discovery cache if message caused topology version change.
*
* @param mgr Discovery manager.
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9002233..eb6c2e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -2394,6 +2394,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * @return {@code True} if configured {@link DiscoverySpi} does not support mutable custom messages.
+ */
+ public boolean unmutableCustomMessages() {
+ DiscoverySpi spi = getSpi();
+
+ return (spi instanceof IgniteDiscoverySpi) &&
+ !((IgniteDiscoverySpi)spi).supportsMutableCustomEvents();
+
+ }
+
+ /**
* @param node Problem node.
* @param err Error.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
index fe1014c..937a889 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
@@ -156,6 +156,11 @@ public class CacheAffinityChangeMessage implements DiscoveryCustomMessage {
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
return discoCache.copy(topVer, null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
index 40bcfaf..e33256f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
@@ -101,6 +101,11 @@ public class CacheStatisticsModeChangeMessage implements DiscoveryCustomMessage
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer,
DiscoCache discoCache) {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
index e35d80e..ae76c95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
@@ -173,6 +173,11 @@ public class ClientCacheChangeDiscoveryMessage implements DiscoveryCustomMessage
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
index 6ed3ecc..4ce0c87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
@@ -105,6 +105,11 @@ public class ClientCacheChangeDummyDiscoveryMessage implements DiscoveryCustomMe
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 83459a5..d85e29b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -77,6 +77,11 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer,
DiscoCache discoCache) {
return mgr.createDiscoCacheOnCacheChange(topVer, discoCache);
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
index 0416746..df64613 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
@@ -71,6 +71,11 @@ public class MetadataUpdateAcceptedMessage implements DiscoveryCustomMessage {
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
index f9bd660..84e32e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
@@ -134,6 +134,11 @@ public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessa
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
index a1fbacf..c56c0eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
@@ -83,6 +83,11 @@ public class ChangeGlobalStateFinishMessage implements DiscoveryCustomMessage {
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
index 6a642bc..f10a652 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
@@ -116,6 +116,11 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer,
DiscoCache discoCache) {
return mgr.createDiscoCacheOnCacheChange(topVer, discoCache);
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
index e9754d1..928c619 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
@@ -63,6 +63,11 @@ public abstract class AbstractContinuousMessage implements DiscoveryCustomMessag
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index eb7966e..899af48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -165,12 +165,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
- DiscoverySpi discoSpi = ctx.config().getDiscoverySpi();
-
- boolean nonMutableCustomEvts = (discoSpi instanceof IgniteDiscoverySpi) &&
- !((IgniteDiscoverySpi)discoSpi).supportsMutableCustomEvents();
-
- discoProtoVer = nonMutableCustomEvts ? 2 : 1;
+ discoProtoVer = ctx.discovery().unmutableCustomMessages() ? 2 : 1;
if (ctx.config().isDaemon())
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
index e9760a8..275765d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
@@ -42,7 +42,7 @@ public class StartRoutineDiscoveryMessageV2 extends AbstractContinuousMessage {
* @param startReqData Start request data.
* @param keepBinary Keep binary flag.
*/
- public StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestDataV2 startReqData, boolean keepBinary) {
+ StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestDataV2 startReqData, boolean keepBinary) {
super(routineId);
this.startReqData = startReqData;
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
index 79d8b29..dfba0e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
@@ -42,6 +42,11 @@ public class StopRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(StopRoutineAckDiscoveryMessage.class, this, "routineId", routineId());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
index 7af0559..80e3f7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
@@ -63,6 +63,11 @@ public class MappingAcceptedMessage implements DiscoveryCustomMessage {
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
index b4e13fb..9358585 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
@@ -98,6 +98,11 @@ public class MappingProposedMessage implements DiscoveryCustomMessage {
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
index 2245b24..f802e09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
@@ -59,6 +59,11 @@ public class SchemaFinishDiscoveryMessage extends SchemaAbstractDiscoveryMessage
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean exchange() {
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
index 0e1270b..62b6d6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
@@ -60,6 +60,11 @@ public class SchemaProposeDiscoveryMessage extends SchemaAbstractDiscoveryMessag
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean exchange() {
return exchange;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index a0f9b75..10e1101 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -30,12 +30,23 @@ import org.jetbrains.annotations.Nullable;
*/
public interface DiscoverySpiCustomMessage extends Serializable {
/**
- * Called when message passed the ring.
+ * Called when custom message has been handled by all nodes.
+ *
+ * @return Ack message or {@code null} if ack is not required.
*/
@Nullable public DiscoverySpiCustomMessage ackMessage();
/**
- * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
+ * @return {@code True} if message can be modified during listener notification. Changes will be send to next nodes.
*/
public boolean isMutable();
+
+ /**
+ * Called on discovery coordinator node after after listener is notified. If returns {@code true}
+ * then message is not passed to others nodes, if after this {@link #ackMessage()} returns ack, it is sent to
+ * all nodes.
+ *
+ * @return {@code True} if message should not be sent to all nodes.
+ */
+ public boolean stopProcess();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
index 147b78f..9b7476c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
@@ -58,6 +58,11 @@ class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMess
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(ZkCommunicationErrorResolveFinishMessage.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
index e85277b..bb63f30 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
@@ -50,6 +50,11 @@ public class ZkCommunicationErrorResolveStartMessage implements DiscoverySpiCust
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(ZkCommunicationErrorResolveStartMessage.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
index 6375bc7..ec55682 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
@@ -30,7 +30,7 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
private static final long serialVersionUID = 0L;
/** */
- private static final int CUSTOM_MSG_ACK_FLAG = 0x01;
+ final long origEvtId;
/** */
final UUID sndNodeId;
@@ -46,37 +46,36 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
/**
* @param evtId Event ID.
+ * @param origEvtId For acknowledge events ID of original event.
* @param topVer Topology version.
* @param sndNodeId Sender node ID.
* @param msg Message instance.
* @param evtPath Event path.
- * @param ack Acknowledge event flag.
*/
- ZkDiscoveryCustomEventData(long evtId,
+ ZkDiscoveryCustomEventData(
+ long evtId,
+ long origEvtId,
long topVer,
UUID sndNodeId,
DiscoverySpiCustomMessage msg,
- String evtPath,
- boolean ack)
+ String evtPath)
{
super(evtId, DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, topVer);
assert sndNodeId != null;
- assert msg != null || ack || !F.isEmpty(evtPath);
+ assert msg != null || origEvtId != 0 || !F.isEmpty(evtPath);
+ this.origEvtId = origEvtId;
this.msg = msg;
this.sndNodeId = sndNodeId;
this.evtPath = evtPath;
-
- if (ack)
- flags |= CUSTOM_MSG_ACK_FLAG;
}
/**
* @return {@code True} for custom event ack message.
*/
boolean ackEvent() {
- return flagSet(CUSTOM_MSG_ACK_FLAG);
+ return origEvtId != 0;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
index cb2d0be..70e6ba2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
@@ -30,31 +30,31 @@ class ZkDiscoveryEventsData implements Serializable {
/** */
private static final long serialVersionUID = 0L;
- /** */
+ /** Unique cluster ID (generated when first node in cluster starts). */
final UUID clusterId;
- /** */
- int procCustEvt = -1;
+ /** Internal order of last processed custom event. */
+ long procCustEvt = -1;
- /** */
+ /** Event ID counter. */
long evtIdGen;
- /** */
+ /** Current topology version. */
long topVer;
- /** */
+ /** Max node internal order in cluster. */
long maxInternalOrder;
- /** */
+ /** Min internal order in cluster. */
final long startInternalOrder;
- /** */
- final long gridStartTime;
+ /** Cluster start time (recorded when first node in cluster starts). */
+ final long clusterStartTime;
- /** */
+ /** Events to process. */
final TreeMap<Long, ZkDiscoveryEventData> evts;
- /** */
+ /** ID of current active communication error resolve process. */
private UUID commErrFutId;
/**
@@ -77,19 +77,19 @@ class ZkDiscoveryEventsData implements Serializable {
* @param clusterId Cluster ID.
* @param startInternalOrder Starting internal order for cluster.
* @param topVer Current topology version.
- * @param gridStartTime Cluster start time.
+ * @param clusterStartTime Cluster start time.
* @param evts Events history.
*/
private ZkDiscoveryEventsData(
UUID clusterId,
long startInternalOrder,
- long gridStartTime,
+ long clusterStartTime,
long topVer,
TreeMap<Long, ZkDiscoveryEventData> evts)
{
this.clusterId = clusterId;
this.startInternalOrder = startInternalOrder;
- this.gridStartTime = gridStartTime;
+ this.clusterStartTime = clusterStartTime;
this.topVer = topVer;
this.evts = evts;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
index 333f457..c76bcba 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spi.discovery.zk.internal;
import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;
@@ -52,4 +53,14 @@ public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInte
@Override public boolean isMutable() {
return false;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ZkForceNodeFailMessage.class, this);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index 1c8706e..4ba6de2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -322,11 +322,13 @@ class ZkIgnitePaths {
}
/**
- * @param evtId Event ID.
+ * @param origEvtId ID of original custom event.
* @return Path for custom event ack.
*/
- String ackEventDataPath(long evtId) {
- return customEvtsAcksDir + "/" + String.valueOf(evtId);
+ String ackEventDataPath(long origEvtId) {
+ assert origEvtId != 0;
+
+ return customEvtsAcksDir + "/" + String.valueOf(origEvtId);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
index dcbd205..626fe74 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
@@ -39,6 +39,11 @@ class ZkNoServersMessage implements DiscoverySpiCustomMessage, ZkInternalMessage
}
/** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(ZkNoServersMessage.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 04bf113..ad7da00 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -1365,11 +1365,11 @@ public class ZookeeperDiscoveryImpl {
ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
evtsData.evtIdGen,
+ 0L,
evtsData.topVer,
locNode.id(),
new ZkNoServersMessage(),
- null,
- false);
+ null);
Collection<ZookeeperClusterNode> nodesToAck = Collections.emptyList();
@@ -2069,7 +2069,7 @@ public class ZookeeperDiscoveryImpl {
if (log.isInfoEnabled()) {
log.info("New cluster started [locId=" + locNode.id() +
", clusterId=" + rtState.evtsData.clusterId +
- ", startTime=" + rtState.evtsData.gridStartTime + ']');
+ ", startTime=" + rtState.evtsData.clusterStartTime + ']');
}
locNode.internalId(locInternalId);
@@ -2176,7 +2176,7 @@ public class ZookeeperDiscoveryImpl {
ZookeeperClient zkClient = rtState.zkClient;
ZkDiscoveryEventsData evtsData = rtState.evtsData;
- TreeMap<Integer, String> newEvts = null;
+ TreeMap<Integer, String> unprocessedEvts = null;
for (int i = 0; i < customEvtNodes.size(); i++) {
String evtPath = customEvtNodes.get(i);
@@ -2184,129 +2184,164 @@ public class ZookeeperDiscoveryImpl {
int evtSeq = ZkIgnitePaths.customEventSequence(evtPath);
if (evtSeq > evtsData.procCustEvt) {
- if (newEvts == null)
- newEvts = new TreeMap<>();
+ if (unprocessedEvts == null)
+ unprocessedEvts = new TreeMap<>();
- newEvts.put(evtSeq, evtPath);
+ unprocessedEvts.put(evtSeq, evtPath);
}
}
- if (newEvts != null) {
- Set<UUID> alives = null;
+ if (unprocessedEvts == null)
+ return;
- for (Map.Entry<Integer, String> evtE : newEvts.entrySet()) {
- evtsData.procCustEvt = evtE.getKey();
+ for (Map.Entry<Integer, String> evtE : unprocessedEvts.entrySet()) {
+ evtsData.procCustEvt = evtE.getKey();
- String evtPath = evtE.getValue();
+ String evtPath = evtE.getValue();
- UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtPath);
+ UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtPath);
- ZookeeperClusterNode sndNode = rtState.top.nodesById.get(sndNodeId);
+ ZookeeperClusterNode sndNode = rtState.top.nodesById.get(sndNodeId);
- if (alives != null && !alives.contains(sndNode.id()))
- sndNode = null;
+ if (sndNode != null) {
+ byte[] evtBytes = readCustomEventData(zkClient, evtPath, sndNodeId);
- if (sndNode != null) {
- byte[] evtBytes = readCustomEventData(zkClient, evtPath, sndNodeId);
+ DiscoverySpiCustomMessage msg;
- DiscoverySpiCustomMessage msg;
+ try {
+ msg = unmarshalZip(evtBytes);
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to unmarshal custom discovery message: " + e, e);
- try {
- msg = unmarshalZip(evtBytes);
+ deleteCustomEventDataAsync(rtState.zkClient, evtPath);
- if (msg instanceof ZkForceNodeFailMessage) {
- ZkForceNodeFailMessage msg0 = (ZkForceNodeFailMessage)msg;
+ continue;
+ }
- if (alives == null)
- alives = new HashSet<>(rtState.top.nodesById.keySet());
+ generateAndProcessCustomEventOnCoordinator(evtPath, sndNode, msg);
+ }
+ else {
+ U.warn(log, "Ignore custom event from unknown node: " + sndNodeId);
- if (alives.contains(msg0.nodeId)) {
- evtsData.topVer++;
+ deleteCustomEventDataAsync(rtState.zkClient, evtPath);
+ }
+ }
+ }
- alives.remove(msg0.nodeId);
+ /**
+ * @param evtPath Event data path.
+ * @param sndNode Sender node.
+ * @param msg Message instance.
+ * @throws Exception If failed.
+ */
+ private void generateAndProcessCustomEventOnCoordinator(String evtPath,
+ ZookeeperClusterNode sndNode,
+ DiscoverySpiCustomMessage msg) throws Exception
+ {
+ ZookeeperClient zkClient = rtState.zkClient;
+ ZkDiscoveryEventsData evtsData = rtState.evtsData;
- ZookeeperClusterNode node = rtState.top.nodesById.get(msg0.nodeId);
+ if (msg instanceof ZkForceNodeFailMessage) {
+ ZkForceNodeFailMessage msg0 = (ZkForceNodeFailMessage)msg;
- assert node != null : msg0.nodeId;
+ if (rtState.top.nodesById.containsKey(msg0.nodeId))
+ evtsData.topVer++;
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Ignore forcible node fail request for unknown node: " + msg0.nodeId);
- // TODO ZK: delete when process event
- for (String child : zkClient.getChildren(zkPaths.aliveNodesDir)) {
- if (ZkIgnitePaths.aliveInternalId(child) == node.internalId()) {
- zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + child);
+ deleteCustomEventDataAsync(zkClient, evtPath);
- break;
- }
- }
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Ignore forcible node fail request for unknown node: " + msg0.nodeId);
+ return;
+ }
+ }
+ else if (msg instanceof ZkCommunicationErrorResolveStartMessage) {
+ ZkCommunicationErrorResolveStartMessage msg0 =
+ (ZkCommunicationErrorResolveStartMessage)msg;
- deleteCustomEventDataAsync(zkClient, evtPath);
+ if (evtsData.communicationErrorResolveFutureId() != null) {
+ if (log.isInfoEnabled()) {
+ log.info("Ignore communication error resolve message, resolve process " +
+ "already started [sndNode=" + sndNode + ']');
+ }
- continue;
- }
- }
- else if (msg instanceof ZkCommunicationErrorResolveStartMessage) {
- ZkCommunicationErrorResolveStartMessage msg0 =
- (ZkCommunicationErrorResolveStartMessage)msg;
-
- if (evtsData.communicationErrorResolveFutureId() != null) {
- if (log.isInfoEnabled()) {
- log.info("Ignore communication error resolve message, resolve process " +
- "already started [sndNode=" + sndNode + ']');
- }
+ deleteCustomEventDataAsync(zkClient, evtPath);
- deleteCustomEventDataAsync(zkClient, evtPath);
+ return;
+ }
+ else {
+ if (log.isInfoEnabled()) {
+ log.info("Start cluster-wide communication error resolve [sndNode=" + sndNode +
+ ", reqId=" + msg0.id +
+ ", topVer=" + evtsData.topVer + ']');
+ }
- continue;
- }
- else {
- if (log.isInfoEnabled()) {
- log.info("Start cluster-wide communication error resolve [sndNode=" + sndNode +
- ", reqId=" + msg0.id +
- ", topVer=" + evtsData.topVer + ']');
- }
+ zkClient.createIfNeeded(zkPaths.distributedFutureBasePath(msg0.id),
+ null,
+ PERSISTENT);
- zkClient.createIfNeeded(zkPaths.distributedFutureBasePath(msg0.id),
- null,
- PERSISTENT);
+ evtsData.communicationErrorResolveFutureId(msg0.id);
+ }
+ }
- evtsData.communicationErrorResolveFutureId(msg0.id);
- }
- }
+ evtsData.evtIdGen++;
- evtsData.evtIdGen++;
+ ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
+ evtsData.evtIdGen,
+ 0L,
+ evtsData.topVer,
+ sndNode.id(),
+ null,
+ evtPath);
- ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
- evtsData.evtIdGen,
- evtsData.topVer,
- sndNodeId,
- null,
- evtPath,
- false);
+ evtData.resolvedMsg = msg;
- evtData.resolvedMsg = msg;
+ if (log.isDebugEnabled())
+ log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']');
- evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);
+ boolean fastStopProcess = false;
- if (log.isDebugEnabled())
- log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']');
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal custom discovery message: " + e, e);
+ if (msg instanceof ZkInternalMessage)
+ processInternalMessage(evtData, (ZkInternalMessage)msg);
+ else {
+ notifyCustomEvent(evtData, msg);
- deleteCustomEventDataAsync(rtState.zkClient, evtPath);
- }
- }
- else {
- U.warn(log, "Ignore custom event from unknown node: " + sndNodeId);
+ if (msg.stopProcess()) {
+ if (log.isDebugEnabled())
+ log.debug("Fast stop process custom event [evt=" + evtData + ", msg=" + msg + ']');
- deleteCustomEventDataAsync(rtState.zkClient, evtPath);
+ fastStopProcess = true;
+
+ // No need process this event on others nodes, skip this event.
+ evtsData.evts.remove(evtData.eventId());
+
+ evtsData.evtIdGen--;
+
+ DiscoverySpiCustomMessage ack = msg.ackMessage();
+
+ if (ack != null) {
+ evtData = createAckEvent(ack, evtData);
+
+ if (log.isDebugEnabled())
+ log.debug("Generated CUSTOM event (ack for fast stop process) [evt=" + evtData + ", msg=" + msg + ']');
+
+ notifyCustomEvent(evtData, ack);
}
+ else
+ evtData = null;
}
+ }
+
+ if (evtData != null) {
+ evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);
+
+ rtState.locNodeInfo.lastProcEvt = evtData.eventId();
saveAndProcessNewEvents();
+
+ if (fastStopProcess)
+ deleteCustomEventDataAsync(zkClient, evtPath);
}
}
@@ -2463,7 +2498,7 @@ public class ZookeeperDiscoveryImpl {
else {
if (evtData0.msg == null) {
if (evtData0.ackEvent()) {
- String path = zkPaths.ackEventDataPath(evtData0.eventId());
+ String path = zkPaths.ackEventDataPath(evtData0.origEvtId);
msg = unmarshalZip(zkClient.getData(path));
}
@@ -2580,7 +2615,7 @@ public class ZookeeperDiscoveryImpl {
ZkJoinEventDataForJoined dataForJoined = unmarshalZip(dataForJoinedBytes);
- rtState.gridStartTime = evtsData.gridStartTime;
+ rtState.gridStartTime = evtsData.clusterStartTime;
locNode.internalId(evtData.joinedInternalId);
locNode.order(evtData.topologyVersion());
@@ -2697,6 +2732,18 @@ public class ZookeeperDiscoveryImpl {
throw localNodeFail("Received force EVT_NODE_FAILED event for local node.", true);
else
notifyNodeFail(node.internalId(), evtData.topologyVersion());
+
+ if (rtState.crd) {
+ ZookeeperClient zkClient = rtState.zkClient;
+
+ for (String child : zkClient.getChildren(zkPaths.aliveNodesDir)) {
+ if (ZkIgnitePaths.aliveInternalId(child) == node.internalId()) {
+ zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + child);
+
+ break;
+ }
+ }
+ }
}
/**
@@ -3000,11 +3047,11 @@ public class ZookeeperDiscoveryImpl {
ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
evtsData.evtIdGen,
+ 0L,
topVer,
locNode.id(),
msg,
- null,
- false);
+ null);
evtData.resolvedMsg = msg;
@@ -3214,47 +3261,18 @@ public class ZookeeperDiscoveryImpl {
}
case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
- DiscoverySpiCustomMessage ack = handleProcessedCustomEvent(ctx, (ZkDiscoveryCustomEventData)evtData);
+ DiscoverySpiCustomMessage ack = handleProcessedCustomEvent(ctx,
+ (ZkDiscoveryCustomEventData)evtData);
if (ack != null) {
- rtState.evtsData.evtIdGen++;
-
- long evtId = rtState.evtsData.evtIdGen;
-
- byte[] ackBytes = marshalZip(ack);
-
- String path = zkPaths.ackEventDataPath(evtId);
-
- if (log.isDebugEnabled())
- log.debug("Create ack event: " + path);
-
- // TODO ZK: delete if previous exists?
- rtState.zkClient.createIfNeeded(
- path,
- ackBytes,
- CreateMode.PERSISTENT);
-
- ZkDiscoveryCustomEventData ackEvtData = new ZkDiscoveryCustomEventData(
- evtId,
- evtData.topologyVersion(), // Use topology version from original event.
- locNode.id(),
- null,
- null,
- true);
-
- ackEvtData.resolvedMsg = ack;
+ ZkDiscoveryCustomEventData ackEvtData = createAckEvent(
+ ack,
+ (ZkDiscoveryCustomEventData)evtData);
if (newEvts == null)
newEvts = new ArrayList<>();
newEvts.add(ackEvtData);
-
- if (log.isDebugEnabled()) {
- log.debug("Generated CUSTOM event ack [baseEvtId=" + evtData.eventId() +
- ", evt=" + ackEvtData +
- ", evtSize=" + ackBytes.length +
- ", msg=" + ack + ']');
- }
}
break;
@@ -3285,6 +3303,54 @@ public class ZookeeperDiscoveryImpl {
}
/**
+ * @param ack Ack message.
+ * @param origEvt Original custom event.
+ * @return Event data.
+ * @throws Exception If failed.
+ */
+ private ZkDiscoveryCustomEventData createAckEvent(
+ DiscoverySpiCustomMessage ack,
+ ZkDiscoveryCustomEventData origEvt) throws Exception {
+ assert ack != null;
+
+ rtState.evtsData.evtIdGen++;
+
+ long evtId = rtState.evtsData.evtIdGen;
+
+ byte[] ackBytes = marshalZip(ack);
+
+ String path = zkPaths.ackEventDataPath(origEvt.eventId());
+
+ if (log.isDebugEnabled())
+ log.debug("Create ack event: " + path);
+
+ // TODO ZK: delete if previous exists?
+ rtState.zkClient.createIfNeeded(
+ path,
+ ackBytes,
+ CreateMode.PERSISTENT);
+
+ ZkDiscoveryCustomEventData ackEvtData = new ZkDiscoveryCustomEventData(
+ evtId,
+ origEvt.eventId(),
+ origEvt.topologyVersion(), // Use topology version from original event.
+ locNode.id(),
+ null,
+ null);
+
+ ackEvtData.resolvedMsg = ack;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Generated CUSTOM event ack [origEvtId=" + origEvt.eventId() +
+ ", evt=" + ackEvtData +
+ ", evtSize=" + ackBytes.length +
+ ", msg=" + ack + ']');
+ }
+
+ return ackEvtData;
+ }
+
+ /**
* @param failedNodes Failed nodes.
* @throws Exception If failed.
*/
@@ -3383,7 +3449,7 @@ public class ZookeeperDiscoveryImpl {
return evtData.resolvedMsg.ackMessage();
}
else {
- String path = zkPaths.ackEventDataPath(evtData.eventId());
+ String path = zkPaths.ackEventDataPath(evtData.origEvtId);
if (log.isDebugEnabled())
log.debug("Delete path: " + path);
http://git-wip-us.apache.org/repos/asf/ignite/blob/70bc10ba/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index c3c22be..7c2f642 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
@@ -65,22 +66,30 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.logger.java.JavaLogger;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.plugin.security.SecurityCredentials;
@@ -817,6 +826,329 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testCustomEvents_FastStopProcess_1() throws Exception {
+ customEvents_FastStopProcess(1, 0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCustomEvents_FastStopProcess_2() throws Exception {
+ customEvents_FastStopProcess(5, 5);
+ }
+
+ /**
+ * @param srvs Servers number.
+ * @param clients Clients number.
+ * @throws Exception If failed.
+ */
+ private void customEvents_FastStopProcess(int srvs, int clients) throws Exception {
+ ackEveryEventSystemProperty();
+
+ Map<UUID, List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>>> rcvdMsgs =
+ new ConcurrentHashMap<>();
+
+ Ignite crd = startGrid(0);
+
+ UUID crdId = crd.cluster().localNode().id();
+
+ if (srvs > 1)
+ startGridsMultiThreaded(1, srvs - 1);
+
+ if (clients > 0) {
+ client = true;
+
+ startGridsMultiThreaded(srvs, clients);
+ }
+
+ awaitPartitionMapExchange();
+
+ List<Ignite> nodes = G.allGrids();
+
+ assertEquals(srvs + clients, nodes.size());
+
+ for (Ignite node : nodes)
+ registerTestEventListeners(node, rcvdMsgs);
+
+ int payload = 0;
+
+ AffinityTopologyVersion topVer = ((IgniteKernal)crd).context().discovery().topologyVersionEx();
+
+ for (Ignite node : nodes) {
+ UUID sndId = node.cluster().localNode().id();
+
+ info("Send from node: " + sndId);
+
+ GridDiscoveryManager discoveryMgr = ((IgniteKernal)node).context().discovery();
+
+ {
+ List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expCrdMsgs = new ArrayList<>();
+ List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expNodesMsgs = Collections.emptyList();
+
+ TestFastStopProcessCustomMessage msg = new TestFastStopProcessCustomMessage(false, payload++);
+
+ expCrdMsgs.add(new T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>(topVer, sndId, msg));
+
+ discoveryMgr.sendCustomEvent(msg);
+
+ doSleep(200); // Wait some time to check extra messages are not received.
+
+ checkEvents(crd, rcvdMsgs, expCrdMsgs);
+
+ for (Ignite node0 : nodes) {
+ if (node0 != crd)
+ checkEvents(node0, rcvdMsgs, expNodesMsgs);
+ }
+
+ rcvdMsgs.clear();
+ }
+ {
+ List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expCrdMsgs = new ArrayList<>();
+ List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expNodesMsgs = new ArrayList<>();
+
+ TestFastStopProcessCustomMessage msg = new TestFastStopProcessCustomMessage(true, payload++);
+
+ expCrdMsgs.add(new T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>(topVer, sndId, msg));
+
+ discoveryMgr.sendCustomEvent(msg);
+
+ TestFastStopProcessCustomMessageAck ackMsg = new TestFastStopProcessCustomMessageAck(msg.payload);
+
+ expCrdMsgs.add(new T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>(topVer, crdId, ackMsg));
+ expNodesMsgs.add(new T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>(topVer, crdId, ackMsg));
+
+ doSleep(200); // Wait some time to check extra messages are not received.
+
+ checkEvents(crd, rcvdMsgs, expCrdMsgs);
+
+ for (Ignite node0 : nodes) {
+ if (node0 != crd)
+ checkEvents(node0, rcvdMsgs, expNodesMsgs);
+ }
+
+ rcvdMsgs.clear();
+ }
+
+ waitForEventsAcks(crd);
+ }
+ }
+
+ /**
+ * @param node Node to check.
+ * @param rcvdMsgs Received messages.
+ * @param expMsgs Expected messages.
+ * @throws Exception If failed.
+ */
+ private void checkEvents(
+ Ignite node,
+ final Map<UUID, List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>>> rcvdMsgs,
+ final List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expMsgs) throws Exception {
+ final UUID nodeId = node.cluster().localNode().id();
+
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> msgs = rcvdMsgs.get(nodeId);
+
+ int size = msgs == null ? 0 : msgs.size();
+
+ return size >= expMsgs.size();
+ }
+ }, 5000));
+
+ List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> msgs = rcvdMsgs.get(nodeId);
+
+ if (msgs == null)
+ msgs = Collections.emptyList();
+
+ assertEqualsCollections(expMsgs, msgs);
+ }
+
+ /**
+ * @param node Node.
+ * @param rcvdMsgs Map to store received events.
+ */
+ private void registerTestEventListeners(Ignite node,
+ final Map<UUID, List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>>> rcvdMsgs) {
+ GridDiscoveryManager discoveryMgr = ((IgniteKernal)node).context().discovery();
+
+ final UUID nodeId = node.cluster().localNode().id();
+
+ discoveryMgr.setCustomEventListener(TestFastStopProcessCustomMessage.class,
+ new CustomEventListener<TestFastStopProcessCustomMessage>() {
+ @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, TestFastStopProcessCustomMessage msg) {
+ List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> list = rcvdMsgs.get(nodeId);
+
+ if (list == null)
+ rcvdMsgs.put(nodeId, list = new ArrayList<>());
+
+ list.add(new T3<>(topVer, snd.id(), (DiscoveryCustomMessage)msg));
+ }
+ }
+ );
+ discoveryMgr.setCustomEventListener(TestFastStopProcessCustomMessageAck.class,
+ new CustomEventListener<TestFastStopProcessCustomMessageAck>() {
+ @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, TestFastStopProcessCustomMessageAck msg) {
+ List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> list = rcvdMsgs.get(nodeId);
+
+ if (list == null)
+ rcvdMsgs.put(nodeId, list = new ArrayList<>());
+
+ list.add(new T3<>(topVer, snd.id(), (DiscoveryCustomMessage)msg));
+ }
+ }
+ );
+ }
+
+ /**
+ *
+ */
+ static class TestFastStopProcessCustomMessage implements DiscoveryCustomMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteUuid id = IgniteUuid.randomUuid();;
+
+ /** */
+ private final boolean createAck;
+
+ /** */
+ private final int payload;
+
+ /**
+ * @param createAck Create ack message flag.
+ * @param payload Payload.
+ */
+ TestFastStopProcessCustomMessage(boolean createAck, int payload) {
+ this.createAck = createAck;
+ this.payload = payload;
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return createAck ? new TestFastStopProcessCustomMessageAck(payload) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
+ AffinityTopologyVersion topVer,
+ DiscoCache discoCache) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestFastStopProcessCustomMessage that = (TestFastStopProcessCustomMessage)o;
+
+ return createAck == that.createAck && payload == that.payload;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(createAck, payload);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestFastStopProcessCustomMessage.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class TestFastStopProcessCustomMessageAck implements DiscoveryCustomMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteUuid id = IgniteUuid.randomUuid();;
+
+ /** */
+ private final int payload;
+
+ /**
+ * @param payload Payload.
+ */
+ TestFastStopProcessCustomMessageAck(int payload) {
+ this.payload = payload;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
+ AffinityTopologyVersion topVer,
+ DiscoCache discoCache) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestFastStopProcessCustomMessageAck that = (TestFastStopProcessCustomMessageAck)o;
+ return payload == that.payload;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(payload);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestFastStopProcessCustomMessageAck.class, this);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testSegmentation1() throws Exception {
sesTimeout = 2000;
testSockNio = true;
[2/3] ignite git commit: Merge branch 'ignite-zk' of
https://git-wip-us.apache.org/repos/asf/ignite into ignite-zk
Posted by sb...@apache.org.
Merge branch 'ignite-zk' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/830faea1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/830faea1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/830faea1
Branch: refs/heads/ignite-zk
Commit: 830faea14c4f45982abf6afa297e434736fe3187
Parents: 913a537 09ab864
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 26 14:30:32 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Dec 26 14:30:32 2017 +0300
----------------------------------------------------------------------
.../zk/internal/ZookeeperDiscoverySpiBasicTest.java | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------