You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/24 08:11:36 UTC
[3/6] ignite git commit: Merging IGNITE-1171 - fixed problems with
custom events in discovery
Merging IGNITE-1171 - fixed problems with custom events in discovery
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6f3ef6a8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6f3ef6a8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6f3ef6a8
Branch: refs/heads/master
Commit: 6f3ef6a84ee1c3e77d32ca9930835d1720918e20
Parents: 517d0f5
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Wed Sep 23 16:36:15 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Wed Sep 23 16:36:15 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/IgniteAtomicLong.java | 2 +-
.../cache/DynamicCacheDescriptor.java | 10 +-
.../GridCachePartitionExchangeManager.java | 6 +
.../processors/cache/GridCacheProcessor.java | 18 +-
.../continuous/CacheContinuousQueryManager.java | 10 +-
.../communication/tcp/TcpCommunicationSpi.java | 7 +-
.../discovery/DiscoverySpiCustomMessage.java | 12 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 301 ++++++++++++++----
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 6 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +-
.../tcp/internal/TcpDiscoveryNodesRing.java | 94 ++----
.../messages/TcpDiscoveryDiscardMessage.java | 15 +-
.../TcpDiscoveryNodeAddFinishedMessage.java | 2 +-
.../messages/TcpDiscoveryNodeAddedMessage.java | 19 +-
.../distributed/CacheAffEarlySelfTest.java | 245 ---------------
.../distributed/CacheAffinityEarlyTest.java | 168 ++++++++++
...GridCacheValueConsistencyAtomicSelfTest.java | 2 +-
.../tcp/TcpDiscoveryMultiThreadedTest.java | 53 ++--
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 315 ++++++++++++++++++-
.../testsuites/IgniteCacheTestSuite4.java | 2 +
20 files changed, 864 insertions(+), 425 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
index 83e2525..bac1a68 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
@@ -160,4 +160,4 @@ public interface IgniteAtomicLong extends Closeable {
* @throws IgniteException If operation failed.
*/
@Override public void close();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index f3c3be9..3cfc34e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -39,9 +39,6 @@ public class DynamicCacheDescriptor {
@GridToStringExclude
private CacheConfiguration cacheCfg;
- /** Cancelled flag. */
- private boolean cancelled;
-
/** Locally configured flag. */
private boolean locCfg;
@@ -156,6 +153,13 @@ public class DynamicCacheDescriptor {
}
/**
+ * @return Started flag.
+ */
+ public boolean started() {
+ return started;
+ }
+
+ /**
* @return Cache configuration.
*/
public CacheConfiguration cacheConfiguration() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 34c571c..eb76233 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1435,6 +1435,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
private static final long serialVersionUID = 0L;
/** */
+ @GridToStringInclude
private AffinityTopologyVersion topVer;
/**
@@ -1455,5 +1456,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return done;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AffinityReadyFuture.class, this, super.toString());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e92ea57..74124bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1522,10 +1522,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Collection of started cache names.
*/
public Collection<String> cacheNames() {
- return F.viewReadOnly(registeredCaches.keySet(),
- new IgniteClosure<String, String>() {
- @Override public String apply(String s) {
- return unmaskNull(s);
+ return F.viewReadOnly(registeredCaches.values(),
+ new IgniteClosure<DynamicCacheDescriptor, String>() {
+ @Override public String apply(DynamicCacheDescriptor desc) {
+ return desc.cacheConfiguration().getName();
+ }
+ },
+ new IgnitePredicate<DynamicCacheDescriptor>() {
+ @Override public boolean apply(DynamicCacheDescriptor desc) {
+ return desc.started();
}
});
}
@@ -1568,6 +1573,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.deploymentId(),
topVer
);
+
+ DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
+
+ if (desc != null)
+ desc.onStart();
}
// Start statically configured caches received from remote nodes during exchange.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index da02b97..c719f1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -448,8 +448,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
taskNameHash,
skipPrimaryCheck);
- UUID id = cctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval,
- autoUnsubscribe, grp.predicate()).get();
+ UUID id = cctx.kernalContext().continuous().startRoutine(
+ hnd,
+ bufSize,
+ timeInterval,
+ autoUnsubscribe,
+ grp.predicate()).get();
if (notifyExisting) {
final Iterator<GridCacheEntryEx> it = cctx.cache().allEntries().iterator();
@@ -811,4 +815,4 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2594213..c93d5af 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2126,8 +2126,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
timeoutHelper.checkFailureTimeoutReached(e))) {
- log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
- failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
+ if (log.isDebugEnabled())
+ log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
+ failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
throw e;
}
@@ -2700,7 +2701,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*
* FOR TEST PURPOSES ONLY!!!
*/
- void simulateNodeFailure() {
+ public void simulateNodeFailure() {
if (nioSrvr != null)
nioSrvr.stop();
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index 373c121..a0f9b75 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,13 +18,15 @@
package org.apache.ignite.spi.discovery;
import java.io.Serializable;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.jetbrains.annotations.Nullable;
/**
* Message to send across ring.
*
- * @see org.apache.ignite.internal.managers.discovery.GridDiscoveryManager#sendCustomEvent(
- * org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage)
+ * @see GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)
*/
public interface DiscoverySpiCustomMessage extends Serializable {
/**
@@ -36,4 +38,4 @@ public interface DiscoverySpiCustomMessage extends Serializable {
* @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
*/
public boolean isMutable();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8a205d2..d8ee953 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -37,10 +37,13 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Queue;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
@@ -64,6 +67,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.util.GridBoundedLinkedHashSet;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -145,7 +149,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
/**
*
*/
-@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+@SuppressWarnings("All")
class ServerImpl extends TcpDiscoveryImpl {
/** */
private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
@@ -1368,8 +1372,13 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msgs Messages to include.
* @param discardMsgId Discarded message ID.
*/
- private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID destNodeId,
- @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
+ private void prepareNodeAddedMessage(
+ TcpDiscoveryAbstractMessage msg,
+ UUID destNodeId,
+ @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+ @Nullable IgniteUuid discardMsgId,
+ @Nullable IgniteUuid discardCustomMsgId
+ ) {
assert destNodeId != null;
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
@@ -1393,7 +1402,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
nodeAddedMsg.topology(topToSnd);
- nodeAddedMsg.messages(msgs, discardMsgId);
+ nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId);
Map<Long, Collection<ClusterNode>> hist;
@@ -1416,7 +1425,7 @@ class ServerImpl extends TcpDiscoveryImpl {
nodeAddedMsg.topology(null);
nodeAddedMsg.topologyHistory(null);
- nodeAddedMsg.messages(null, null);
+ nodeAddedMsg.messages(null, null, null);
}
}
@@ -1825,7 +1834,7 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) {
if (msg instanceof TcpDiscoveryNodeAddedMessage)
- prepareNodeAddedMessage(msg, destNodeId, null, null);
+ prepareNodeAddedMessage(msg, destNodeId, null, null, null);
return msg;
}
@@ -1834,16 +1843,22 @@ class ServerImpl extends TcpDiscoveryImpl {
/**
* Pending messages container.
*/
- private static class PendingMessages {
+ private static class PendingMessages implements Iterable<TcpDiscoveryAbstractMessage> {
/** */
private static final int MAX = 1024;
/** Pending messages. */
private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+ /** Processed custom message IDs. */
+ private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<IgniteUuid>(MAX * 2);
+
/** Discarded message ID. */
private IgniteUuid discardId;
+ /** Discarded message ID. */
+ private IgniteUuid customDiscardId;
+
/**
* Adds pending message and shrinks queue if it exceeds limit
* (messages that were not discarded yet are never removed).
@@ -1869,31 +1884,118 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msgs Message.
* @param discardId Discarded message ID.
*/
- void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) {
+ void reset(
+ @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+ @Nullable IgniteUuid discardId,
+ @Nullable IgniteUuid customDiscardId
+ ) {
this.msgs.clear();
if (msgs != null)
this.msgs.addAll(msgs);
this.discardId = discardId;
+ this.customDiscardId = customDiscardId;
}
/**
- * Clears pending messages.
+ * Discards message with provided ID and all before it.
+ *
+ * @param id Discarded message ID.
*/
- void clear() {
- msgs.clear();
+ void discard(IgniteUuid id, boolean custom) {
+ if (custom)
+ customDiscardId = id;
+ else
+ discardId = id;
+ }
- discardId = null;
+ /**
+ * Gets iterator for non-discarded messages.
+ *
+ * @return Non-discarded messages iterator.
+ */
+ public Iterator<TcpDiscoveryAbstractMessage> iterator() {
+ return new SkipIterator();
}
/**
- * Discards message with provided ID and all before it.
*
- * @param id Discarded message ID.
*/
- void discard(IgniteUuid id) {
- discardId = id;
+ private class SkipIterator implements Iterator<TcpDiscoveryAbstractMessage> {
+ /** Skip non-custom messages flag. */
+ private boolean skipMsg = discardId != null;
+
+ /** Skip custom messages flag. */
+ private boolean skipCustomMsg;
+
+ /** Internal iterator. */
+ private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+
+ /** Next message. */
+ private TcpDiscoveryAbstractMessage next;
+
+ {
+ advance();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return next != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public TcpDiscoveryAbstractMessage next() {
+ if (next == null)
+ throw new NoSuchElementException();
+
+ TcpDiscoveryAbstractMessage next0 = next;
+
+ advance();
+
+ return next0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Advances iterator to the next available item.
+ */
+ private void advance() {
+ next = null;
+
+ while (msgIt.hasNext()) {
+ TcpDiscoveryAbstractMessage msg0 = msgIt.next();
+
+ if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
+ if (skipCustomMsg) {
+ assert customDiscardId != null;
+
+ if (F.eq(customDiscardId, msg0.id()))
+ skipCustomMsg = false;
+
+ continue;
+ }
+ }
+ else {
+ if (skipMsg) {
+ assert discardId != null;
+
+ if (F.eq(discardId, msg0.id()))
+ skipMsg = false;
+
+ continue;
+ }
+ }
+
+ next = msg0;
+
+ break;
+ }
+ }
}
}
@@ -1941,6 +2043,12 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Connection check threshold. */
private long connCheckThreshold;
+ /** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished messages. */
+ private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>();
+
+ /** Collection to track joining nodes. */
+ private Set<UUID> joiningNodes = new HashSet<>();
+
/**
*/
protected RingMessageWorker() {
@@ -2046,6 +2154,8 @@ class ServerImpl extends TcpDiscoveryImpl {
sendHeartbeatMessage();
checkHeartbeatsReceiving();
+
+ checkPendingCustomMessages();
}
/**
@@ -2323,20 +2433,11 @@ class ServerImpl extends TcpDiscoveryImpl {
debugLog("Pending messages will be sent [failure=" + failure +
", forceSndPending=" + forceSndPending + ']');
- boolean skip = pendingMsgs.discardId != null;
-
- for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
- if (skip) {
- if (pendingMsg.id().equals(pendingMsgs.discardId))
- skip = false;
-
- continue;
- }
-
+ for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
long tstamp = U.currentTimeMillis();
prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
- pendingMsgs.discardId);
+ pendingMsgs.discardId, pendingMsgs.customDiscardId);
if (timeoutHelper == null)
timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
@@ -2354,13 +2455,13 @@ class ServerImpl extends TcpDiscoveryImpl {
int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
if (log.isDebugEnabled())
- log.debug("Pending message has been sent to next node [msg=" + msg.id() +
- ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
+ log.debug("Pending message has been sent to next node [msgId=" + msg.id() +
+ ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
", res=" + res + ']');
if (debugMode)
- debugLog("Pending message has been sent to next node [msg=" + msg.id() +
- ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
+ debugLog("Pending message has been sent to next node [msgId=" + msg.id() +
+ ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
", res=" + res + ']');
// Resetting timeout control object to create a new one for the next bunch of
@@ -2377,7 +2478,8 @@ class ServerImpl extends TcpDiscoveryImpl {
msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
}
else
- prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
+ prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId,
+ pendingMsgs.customDiscardId);
try {
long tstamp = U.currentTimeMillis();
@@ -2478,21 +2580,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- if (msg instanceof TcpDiscoveryStatusCheckMessage) {
- TcpDiscoveryStatusCheckMessage msg0 = (TcpDiscoveryStatusCheckMessage)msg;
-
- if (next.id().equals(msg0.failedNodeId())) {
- next = null;
-
- if (log.isDebugEnabled())
- log.debug("Discarding status check since next node has indeed failed [next=" + next +
- ", msg=" + msg + ']');
-
- // Discard status check message by exiting loop and handle failure.
- break;
- }
- }
-
next = null;
searchNext = true;
@@ -2524,6 +2611,29 @@ class ServerImpl extends TcpDiscoveryImpl {
for (TcpDiscoveryNode n : failedNodes)
msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, n.id(), n.internalOrder()));
+ if (!sent) {
+ if (log.isDebugEnabled())
+ log.debug("Pending messages will be resent to local node");
+
+ if (debugMode)
+ log.debug("Pending messages will be resent to local node");
+
+ for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
+ prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
+ pendingMsgs.customDiscardId);
+
+ msgWorker.addMessage(pendingMsg);
+
+ if (log.isDebugEnabled())
+ log.debug("Pending message has been sent to local node [msg=" + msg.id() +
+ ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + ']');
+
+ if (debugMode)
+ debugLog("Pending message has been sent to local node [msg=" + msg.id() +
+ ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + ']');
+ }
+ }
+
LT.warn(log, null, "Local node has detected failed nodes and started cluster-wide procedure. " +
"To speed up failure detection please see 'Failure Detection' section under javadoc" +
" for 'TcpDiscoverySpi'");
@@ -3077,7 +3187,7 @@ class ServerImpl extends TcpDiscoveryImpl {
processNodeAddFinishedMessage(addFinishMsg);
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
@@ -3118,6 +3228,8 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
+ joiningNodes.add(node.id());
+
if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) {
boolean authFailed = true;
@@ -3222,6 +3334,8 @@ class ServerImpl extends TcpDiscoveryImpl {
n.visible(true);
}
+ joiningNodes.clear();
+
locNode.setAttributes(node.attributes());
locNode.visible(true);
@@ -3237,10 +3351,11 @@ class ServerImpl extends TcpDiscoveryImpl {
topHist.clear();
topHist.putAll(msg.topologyHistory());
- pendingMsgs.discard(msg.discardedMessageId());
+ pendingMsgs.reset(msg.messages(), msg.discardedMessageId(),
+ msg.discardedCustomMessageId());
// Clear data to minimize message size.
- msg.messages(null, null);
+ msg.messages(null, null, null);
msg.topology(null);
msg.topologyHistory(null);
msg.clearDiscoveryData();
@@ -3307,7 +3422,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.verified()) {
spi.stats.onRingMessageReceived(msg);
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
@@ -3342,7 +3457,11 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) {
+ joiningNodes.remove(nodeId);
+
+ TcpDiscoverySpiState state = spiStateCopy();
+
+ if (msg.verified() && !locNodeId.equals(nodeId) && state != CONNECTING && fireEvt) {
spi.stats.onNodeJoined();
// Make sure that node with greater order will never get EVT_NODE_JOINED
@@ -3357,7 +3476,7 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean b = ring.topologyVersion(topVer);
assert b : "Topology version has not been updated: [ring=" + ring + ", msg=" + msg +
- ", lastMsg=" + lastMsg + ", spiState=" + spiStateCopy() + ']';
+ ", lastMsg=" + lastMsg + ", spiState=" + state + ']';
if (log.isDebugEnabled())
log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
@@ -3365,7 +3484,8 @@ class ServerImpl extends TcpDiscoveryImpl {
lastMsg = msg;
}
- notifyDiscovery(EVT_NODE_JOINED, topVer, node);
+ if (state == CONNECTED)
+ notifyDiscovery(EVT_NODE_JOINED, topVer, node);
try {
if (spi.ipFinder.isShared() && locNodeCoord)
@@ -3381,7 +3501,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- if (msg.verified() && locNodeId.equals(nodeId) && spiStateCopy() == CONNECTING) {
+ if (msg.verified() && locNodeId.equals(nodeId) && state == CONNECTING) {
assert node != null;
assert topVer > 0 : "Invalid topology version: " + msg;
@@ -3402,6 +3522,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
+
+ checkPendingCustomMessages();
}
/**
@@ -3481,7 +3603,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.verified()) {
spi.stats.onRingMessageReceived(msg);
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
@@ -3553,6 +3675,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
+ joiningNodes.remove(leftNode.id());
+
spi.stats.onNodeLeft();
notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode);
@@ -3580,6 +3704,8 @@ class ServerImpl extends TcpDiscoveryImpl {
U.closeQuiet(sock);
}
+
+ checkPendingCustomMessages();
}
/**
@@ -3650,7 +3776,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.verified()) {
spi.stats.onRingMessageReceived(msg);
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
@@ -3707,6 +3833,8 @@ class ServerImpl extends TcpDiscoveryImpl {
", msg=" + msg.warning() + ']');
}
+ joiningNodes.remove(node.id());
+
notifyDiscovery(EVT_NODE_FAILED, topVer, node);
spi.stats.onNodeFailed();
@@ -3720,6 +3848,8 @@ class ServerImpl extends TcpDiscoveryImpl {
U.closeQuiet(sock);
}
+
+ checkPendingCustomMessages();
}
/**
@@ -4046,7 +4176,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.verified())
- pendingMsgs.discard(msgId);
+ pendingMsgs.discard(msgId, msg.customMessageDiscard());
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
@@ -4098,18 +4228,23 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
if (isLocalNodeCoordinator()) {
- boolean sndNext;
+ if (!joiningNodes.isEmpty()) {
+ pendingCustomMsgs.add(msg);
- if (!msg.verified()) {
+ return;
+ }
+
+ boolean sndNext = !msg.verified();
+
+ if (sndNext) {
msg.verify(getLocalNodeId());
msg.topologyVersion(ring.topologyVersion());
- notifyDiscoveryListener(msg);
-
- sndNext = true;
+ if (pendingMsgs.procCustomMsgs.add(msg.id()))
+ notifyDiscoveryListener(msg);
+ else
+ sndNext = false;
}
- else
- sndNext = false;
if (sndNext && ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
@@ -4139,12 +4274,30 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));
}
}
else {
- if (msg.verified())
+ TcpDiscoverySpiState state0;
+
+ synchronized (mux) {
+ state0 = spiState;
+ }
+
+ if (msg.verified() && msg.topologyVersion() != ring.topologyVersion()) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding custom event message [msg=" + msg + ", ring=" + ring + ']');
+
+ return;
+ }
+
+ if (msg.verified() && state0 == CONNECTED && pendingMsgs.procCustomMsgs.add(msg.id())) {
+ assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes + ", msg=" + msg + ", loc=" + locNode.id() +
+ ", topver=" + ring.topologyVersion();
+ assert msg.topologyVersion() == ring.topologyVersion() : "msg: " + msg + ", topver=" + ring.topologyVersion();
+
notifyDiscoveryListener(msg);
+ }
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
@@ -4152,6 +4305,18 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * Checks and flushes custom event messages if no nodes are attempting to join the grid.
+ */
+ private void checkPendingCustomMessages() {
+ if (joiningNodes.isEmpty() && isLocalNodeCoordinator()) {
+ TcpDiscoveryCustomEventMessage msg;
+
+ while ((msg = pendingCustomMsgs.poll()) != null)
+ processCustomMessage(msg);
+ }
+ }
+
+ /**
* @param msg Custom message.
*/
private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) {
@@ -5081,7 +5246,7 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
- prepareNodeAddedMessage(msg, clientNodeId, null, null);
+ prepareNodeAddedMessage(msg, clientNodeId, null, null, null);
writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
spi.failureDetectionTimeout() : spi.getSocketTimeout());
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index e5be530..2786d0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -313,4 +313,4 @@ abstract class TcpDiscoveryImpl {
return res;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 80fcc46..6254605 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2038,4 +2038,4 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return S.toString(SocketTimeoutObject.class, this);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index 2b17696..7ca092c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -17,7 +17,17 @@
package org.apache.ignite.spi.discovery.tcp.internal;
-import java.util.ArrayList;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.PN;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.jetbrains.annotations.Nullable;
+
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -29,16 +39,6 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.PN;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.jetbrains.annotations.Nullable;
/**
* Convenient way to represent topology for {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}
@@ -81,6 +81,9 @@ public class TcpDiscoveryNodesRing {
/** */
private long nodeOrder;
+ /** */
+ private long maxInternalOrder;
+
/** Lock. */
@GridToStringExclude
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
@@ -99,6 +102,8 @@ public class TcpDiscoveryNodesRing {
this.locNode = locNode;
clear();
+
+ maxInternalOrder = locNode.internalOrder();
}
finally {
rwLock.writeLock().unlock();
@@ -204,7 +209,9 @@ public class TcpDiscoveryNodesRing {
if (nodesMap.containsKey(node.id()))
return false;
- assert node.internalOrder() > maxInternalOrder() : "Adding node to the middle of the ring " +
+ long maxInternalOrder0 = maxInternalOrder();
+
+ assert node.internalOrder() > maxInternalOrder0 : "Adding node to the middle of the ring " +
"[ring=" + this + ", node=" + node + ']';
nodesMap.put(node.id(), node);
@@ -216,6 +223,8 @@ public class TcpDiscoveryNodesRing {
nodes.add(node);
nodeOrder = node.internalOrder();
+
+ maxInternalOrder = node.internalOrder();
}
finally {
rwLock.writeLock().unlock();
@@ -231,9 +240,13 @@ public class TcpDiscoveryNodesRing {
rwLock.readLock().lock();
try {
- TcpDiscoveryNode last = nodes.last();
+ if (maxInternalOrder == 0) {
+ TcpDiscoveryNode last = nodes.last();
+
+ return last != null ? maxInternalOrder = last.internalOrder() : -1;
+ }
- return last != null ? last.internalOrder() : -1;
+ return maxInternalOrder;
}
finally {
rwLock.readLock().unlock();
@@ -336,47 +349,6 @@ public class TcpDiscoveryNodesRing {
}
/**
- * Removes nodes from the topology.
- *
- * @param nodeIds IDs of the nodes to remove.
- * @return Collection of removed nodes.
- */
- public Collection<TcpDiscoveryNode> removeNodes(Collection<UUID> nodeIds) {
- assert !F.isEmpty(nodeIds);
-
- rwLock.writeLock().lock();
-
- try {
- boolean firstRmv = true;
-
- Collection<TcpDiscoveryNode> res = null;
-
- for (UUID id : nodeIds) {
- TcpDiscoveryNode rmv = nodesMap.remove(id);
-
- if (rmv != null) {
- if (firstRmv) {
- nodes = new TreeSet<>(nodes);
-
- res = new ArrayList<>(nodeIds.size());
-
- firstRmv = false;
- }
-
- nodes.remove(rmv);
-
- res.add(rmv);
- }
- }
-
- return res == null ? Collections.<TcpDiscoveryNode>emptyList() : res;
- }
- finally {
- rwLock.writeLock().unlock();
- }
- }
-
- /**
* Removes all remote nodes, leaves only local node.
* <p>
* This should be called when SPI should be disconnected from topology and
@@ -397,6 +369,7 @@ public class TcpDiscoveryNodesRing {
nodesMap.put(locNode.id(), locNode);
nodeOrder = 0;
+ maxInternalOrder = 0;
topVer = 0;
}
@@ -622,13 +595,8 @@ public class TcpDiscoveryNodesRing {
rwLock.writeLock().lock();
try {
- if (nodeOrder == 0) {
- TcpDiscoveryNode last = nodes.last();
-
- assert last != null;
-
- nodeOrder = last.internalOrder();
- }
+ if (nodeOrder == 0)
+ nodeOrder = maxInternalOrder();
return ++nodeOrder;
}
@@ -681,4 +649,4 @@ public class TcpDiscoveryNodesRing {
rwLock.readLock().unlock();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
index 1e1fa6b..145f19e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
@@ -32,16 +32,20 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage {
/** ID of the message to discard (this and all preceding). */
private final IgniteUuid msgId;
+ /** True if this is discard ID for custom event message. */
+ private final boolean customMsgDiscard;
+
/**
* Constructor.
*
* @param creatorNodeId Creator node ID.
* @param msgId Message ID.
*/
- public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId) {
+ public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId, boolean customMsgDiscard) {
super(creatorNodeId);
this.msgId = msgId;
+ this.customMsgDiscard = customMsgDiscard;
}
/**
@@ -53,6 +57,15 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage {
return msgId;
}
+ /**
+ * Flag indicating whether the ID to discard is for a custom message or not.
+ *
+ * @return Custom message flag.
+ */
+ public boolean customMessageDiscard() {
+ return customMsgDiscard;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryDiscardMessage.class, this, "super", super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index c6a469f..1b99a56 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -101,4 +101,4 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
@Override public String toString() {
return S.toString(TcpDiscoveryNodeAddFinishedMessage.class, this, "super", super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 01c6789..5a7146d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -48,6 +48,9 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
/** Discarded message ID. */
private IgniteUuid discardMsgId;
+ /** Discarded message ID. */
+ private IgniteUuid discardCustomMsgId;
+
/** Current topology. Initialized by coordinator. */
@GridToStringInclude
private Collection<TcpDiscoveryNode> top;
@@ -117,14 +120,28 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
}
/**
+ * Gets discarded custom message ID.
+ *
+ * @return Discarded message ID.
+ */
+ @Nullable public IgniteUuid discardedCustomMessageId() {
+ return discardCustomMsgId;
+ }
+
+ /**
* Sets pending messages to send to new node.
*
* @param msgs Pending messages to send to new node.
* @param discardMsgId Discarded message ID.
*/
- public void messages(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
+ public void messages(
+ @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+ @Nullable IgniteUuid discardMsgId,
+ @Nullable IgniteUuid discardCustomMsgId
+ ) {
this.msgs = msgs;
this.discardMsgId = discardMsgId;
+ this.discardCustomMsgId = discardCustomMsgId;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
deleted file mode 100644
index 7f0ca11..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Random;
-import java.util.UUID;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteFutureTimeoutException;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- *
- */
-public class CacheAffEarlySelfTest extends GridCommonAbstractTest {
- /** Grid count. */
- private static int GRID_CNT = 8;
-
- /** Operation timeout. */
- private static long OP_TIMEOUT = 5000;
-
- /** Always dump threads or only once per operation. */
- private static boolean ALWAYS_DUMP_THREADS = false;
-
- /** Stopped. */
- private volatile boolean stopped;
-
- /** Iteration. */
- private int iters = 10;
-
- /** Concurrent. */
- private boolean concurrent = true;
-
- /** Futs. */
- private Collection<IgniteInternalFuture<?>> futs = new ArrayList<>(GRID_CNT);
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder(true);
- finder.setAddresses(Collections.singletonList("127.0.0.1:47500..47510"));
-
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
- discoSpi.setIpFinder(finder);
-
- cfg.setDiscoverySpi(discoSpi);
-
- OptimizedMarshaller marsh = new OptimizedMarshaller();
- marsh.setRequireSerializable(false);
-
- cfg.setMarshaller(marsh);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected long getTestTimeout() {
- return 6 * 60 * 1000L;
- }
-
- /**
- *
- */
- public void testStartNodes() throws Exception {
- for (int i = 0; i < iters; i++) {
- try {
- System.out.println("*** Iteration " + (i + 1) + '/' + iters);
-
- IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
- @Override public void run() {
- try {
- doTest();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, 1);
-
- fut.get(30000);
- }
- catch (IgniteFutureTimeoutCheckedException e) {
- // No-op.
- }
- finally {
- stopAllGrids(true);
- }
- }
- }
-
- /**
- *
- */
- public void doTest() throws Exception {
- for (int i = 0; i < GRID_CNT; i++) {
- final int idx = i;
-
- final Ignite grid = concurrent ? null : startGrid(idx);
-
- IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
- @Override public void run() {
- Random rnd = new Random();
-
- try {
- final Ignite ignite = grid == null ? startGrid(idx) : grid;
-
- final IgniteCache<UUID, UUID> cache = getCache(ignite).withAsync();
-
- CacheAffEarlySelfTest.this.execute(cache, new IgniteInClosure<IgniteCache<UUID,UUID>>() {
- @Override public void apply(IgniteCache<UUID, UUID> entries) {
- cache.put(ignite.cluster().localNode().id(), UUID.randomUUID());
- }
- });
-
- while (!stopped) {
- int val = Math.abs(rnd.nextInt(100));
- if (val >= 0 && val < 40)
- execute(cache, new IgniteInClosure<IgniteCache<UUID, UUID>>() {
- @Override public void apply(IgniteCache<UUID, UUID> entries) {
- cache.containsKey(ignite.cluster().localNode().id());
- }
- });
- else if (val >= 40 && val < 80)
- execute(cache, new IgniteInClosure<IgniteCache<UUID, UUID>>() {
- @Override public void apply(IgniteCache<UUID, UUID> entries) {
- cache.get(ignite.cluster().localNode().id());
- }
- });
- else
- execute(cache, new IgniteInClosure<IgniteCache<UUID, UUID>>() {
- @Override public void apply(IgniteCache<UUID, UUID> entries) {
- cache.put(ignite.cluster().localNode().id(), UUID.randomUUID());
- }
- });
-
- Thread.sleep(50);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, 1);
-
- futs.add(fut);
- }
-
- Thread.sleep(10000);
-
- stopped = true;
-
- for (IgniteInternalFuture<?> fut : futs)
- fut.get();
- }
-
- /**
- * @param cache Cache.
- * @param c Closure.
- */
- private void execute(IgniteCache<UUID, UUID> cache, IgniteInClosure<IgniteCache<UUID, UUID>> c) {
- c.apply(cache);
-
- IgniteFuture<Object> fut = cache.future();
-
- boolean success = false;
-
- int iter = 0;
-
- while (!success) {
- try {
- fut.get(OP_TIMEOUT);
-
- success = true;
- }
- catch (IgniteFutureTimeoutException e) {
- debug(iter == 0 || ALWAYS_DUMP_THREADS);
- }
-
- iter++;
- }
- }
-
- /**
- *
- */
- private void debug(boolean dumpThreads) {
- log.info("DUMPING DEBUG INFO:");
-
- for (Ignite ignite : G.allGrids())
- ((IgniteKernal)ignite).dumpDebugInfo();
-
- if (dumpThreads) {
- U.dumpThreads(null);
-
- U.dumpThreads(log);
- }
- }
-
- /**
- * @param grid Grid.
- */
- private IgniteCache<UUID, UUID> getCache(Ignite grid) {
- CacheConfiguration<UUID, UUID> ccfg = defaultCacheConfiguration();
-
- ccfg.setCacheMode(CacheMode.PARTITIONED);
- ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
- ccfg.setBackups(1);
- ccfg.setNearConfiguration(null);
-
- return grid.getOrCreateCache(ccfg);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffinityEarlyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffinityEarlyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffinityEarlyTest.java
new file mode 100644
index 0000000..6b67139
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffinityEarlyTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheAffinityEarlyTest extends GridCommonAbstractTest {
+ /** Grid count. */
+ private static int GRID_CNT = 8;
+
+ /** Stopped. */
+ private volatile boolean stopped;
+
+ /** Iteration. */
+ private static final int iters = 10;
+
+ /** Concurrent. */
+ private static final boolean concurrent = true;
+
+ /** Futs. */
+ private Collection<IgniteInternalFuture<?>> futs = new ArrayList<>(GRID_CNT);
+
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+ discoSpi.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ OptimizedMarshaller marsh = new OptimizedMarshaller();
+ marsh.setRequireSerializable(false);
+
+ cfg.setMarshaller(marsh);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 6 * 60 * 1000L;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartNodes() throws Exception {
+ for (int i = 0; i < iters; i++) {
+ try {
+ log.info("Iteration: " + (i + 1) + '/' + iters);
+
+ doTest();
+ }
+ finally {
+ stopAllGrids(true);
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void doTest() throws Exception {
+ final AtomicBoolean failed = new AtomicBoolean();
+
+ for (int i = 0; i < GRID_CNT; i++) {
+ final int idx = i;
+
+ final Ignite grid = concurrent ? null : startGrid(idx);
+
+ IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+ @Override public void run() {
+ Random rnd = new Random();
+
+ try {
+ Ignite ignite = grid == null ? startGrid(idx) : grid;
+
+ IgniteCache<Object, Object> cache = getCache(ignite);
+
+ cache.put(ignite.cluster().localNode().id(), UUID.randomUUID());
+
+ while (!stopped) {
+ int val = Math.abs(rnd.nextInt(100));
+
+ if (val >= 0 && val < 40)
+ cache.containsKey(ignite.cluster().localNode().id());
+ else if (val >= 40 && val < 80)
+ cache.get(ignite.cluster().localNode().id());
+ else
+ cache.put(ignite.cluster().localNode().id(), UUID.randomUUID());
+
+ Thread.sleep(50);
+ }
+ }
+ catch (Exception e) {
+ log.error("Unexpected error: " + e, e);
+
+ failed.set(true);
+ }
+ }
+ }, 1);
+
+ futs.add(fut);
+ }
+
+ Thread.sleep(10_000);
+
+ stopped = true;
+
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.get();
+
+ assertFalse(failed.get());
+ }
+
+ /**
+ * @param grid Grid.
+ * @return Cache.
+ */
+ private IgniteCache getCache(Ignite grid) {
+ CacheConfiguration ccfg = defaultCacheConfiguration();
+
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+ ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+ ccfg.setBackups(1);
+ ccfg.setNearConfiguration(null);
+
+ return grid.getOrCreateCache(ccfg);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
index 7451911..18c8d8e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
@@ -35,4 +35,4 @@ public class GridCacheValueConsistencyAtomicSelfTest extends GridCacheValueConsi
@Override protected int iterationCount() {
return 100_000;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 16fa662..1ccbe1f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -102,7 +102,7 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
- return 3 * 60 * 1000;
+ return 5 * 60 * 1000;
}
/**
@@ -249,35 +249,48 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
* @throws Exception If any error occurs.
*/
public void testMultipleStartOnCoordinatorStop() throws Exception{
- clientFlagGlobal = false;
+ for (int k = 0; k < 3; k++) {
+ log.info("Iteration: " + k);
- startGrids(GRID_CNT);
+ clientFlagGlobal = false;
- final CyclicBarrier barrier = new CyclicBarrier(GRID_CNT + 4);
+ final int START_NODES = 5;
+ final int JOIN_NODES = 10;
- final AtomicInteger startIdx = new AtomicInteger(GRID_CNT);
+ startGrids(START_NODES);
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- barrier.await();
+ final CyclicBarrier barrier = new CyclicBarrier(JOIN_NODES + 1);
- Ignite ignite = startGrid(startIdx.getAndIncrement());
+ final AtomicInteger startIdx = new AtomicInteger(START_NODES);
- assertFalse(ignite.configuration().isClientMode());
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int idx = startIdx.getAndIncrement();
- log.info("Started node: " + ignite.name());
+ Thread.currentThread().setName("start-thread-" + idx);
- return null;
- }
- }, GRID_CNT + 3, "start-thread");
+ barrier.await();
- barrier.await();
+ Ignite ignite = startGrid(idx);
- U.sleep(ThreadLocalRandom.current().nextInt(10, 100));
+ assertFalse(ignite.configuration().isClientMode());
- for (int i = 0; i < GRID_CNT; i++)
- stopGrid(i);
+ log.info("Started node: " + ignite.name());
+
+ return null;
+ }
+ }, JOIN_NODES, "start-thread");
- fut.get();
+ barrier.await();
+
+ U.sleep(ThreadLocalRandom.current().nextInt(10, 100));
+
+ for (int i = 0; i < START_NODES; i++)
+ stopGrid(i);
+
+ fut.get();
+
+ stopAllGrids();
+ }
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 981f649..0280e9c 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -35,8 +35,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
@@ -45,6 +47,7 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.port.GridPortRecord;
+import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
@@ -52,11 +55,14 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
@@ -87,6 +93,9 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
/** */
private UUID nodeId;
+ /** */
+ private TcpDiscoverySpi nodeSpi;
+
/**
* @throws Exception If fails.
*/
@@ -99,8 +108,11 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- TcpDiscoverySpi spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ?
- new TestTcpDiscoverySpi() : new TcpDiscoverySpi();
+ TcpDiscoverySpi spi = nodeSpi;
+
+ if (spi == null)
+ spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ?
+ new TestTcpDiscoverySpi() : new TcpDiscoverySpi();
discoMap.put(gridName, spi);
@@ -1164,6 +1176,305 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed
+ */
+ public void testCustomEventRace1_1() throws Exception {
+ try {
+ customEventRace1(true, false);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testCustomEventRace1_2() throws Exception {
+ try {
+ customEventRace1(false, false);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testCustomEventRace1_3() throws Exception {
+ try {
+ customEventRace1(true, true);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @param cacheStartFrom1 If {code true} starts cache from node1.
+ * @param stopCrd If {@code true} stops coordinator.
+ * @throws Exception If failed
+ */
+ private void customEventRace1(final boolean cacheStartFrom1, boolean stopCrd) throws Exception {
+ TestCustomEventRaceSpi spi0 = new TestCustomEventRaceSpi();
+
+ nodeSpi = spi0;
+
+ final Ignite ignite0 = startGrid(0);
+
+ nodeSpi = new TestCustomEventRaceSpi();
+
+ final Ignite ignite1 = startGrid(1);
+
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+
+ spi0.nodeAdded1 = latch1;
+ spi0.nodeAdded2 = latch2;
+ spi0.debug = true;
+
+ IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ log.info("Start 2");
+
+ nodeSpi = new TestCustomEventRaceSpi();
+
+ Ignite ignite2 = startGrid(2);
+
+ return null;
+ }
+ });
+
+ latch1.await();
+
+ final String CACHE_NAME = "cache";
+
+ IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(CACHE_NAME);
+
+ Ignite ignite = cacheStartFrom1 ? ignite1 : ignite0;
+
+ ignite.createCache(ccfg);
+
+ return null;
+ }
+ });
+
+ if (stopCrd) {
+ spi0.stop = true;
+
+ latch2.countDown();
+
+ ignite0.close();
+ }
+ else {
+ U.sleep(500);
+
+ latch2.countDown();
+ }
+
+ fut1.get();
+ fut2.get();
+
+ IgniteCache<Object, Object> cache = grid(2).cache(CACHE_NAME);
+
+ assertNotNull(cache);
+
+ cache.put(1, 1);
+
+ assertEquals(1, cache.get(1));
+
+ nodeSpi = new TestCustomEventRaceSpi();
+
+ Ignite ignite = startGrid(3);
+
+ cache = ignite.cache(CACHE_NAME);
+
+ cache.put(2, 2);
+
+ assertEquals(1, cache.get(1));
+ assertEquals(2, cache.get(2));
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testCustomEventCoordinatorFailure1() throws Exception {
+ try {
+ customEventCoordinatorFailure(true);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testCustomEventCoordinatorFailure2() throws Exception {
+ try {
+ customEventCoordinatorFailure(false);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @param twoNodes If {@code true} starts two nodes, otherwise three.
+ * @throws Exception If failed
+ */
+ private void customEventCoordinatorFailure(boolean twoNodes) throws Exception {
+ TestCustomEventCoordinatorFailureSpi spi0 = new TestCustomEventCoordinatorFailureSpi();
+
+ nodeSpi = spi0;
+
+ Ignite ignite0 = startGrid(0);
+
+ nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+
+ Ignite ignite1 = startGrid(1);
+
+ nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+
+ Ignite ignite2 = twoNodes ? null : startGrid(2);
+
+ final Ignite createCacheNode = ignite2 != null ? ignite2 : ignite1;
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ spi0.latch = latch;
+
+ final String CACHE_NAME = "test-cache";
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ log.info("Create test cache");
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(CACHE_NAME);
+
+ createCacheNode.createCache(ccfg);
+
+ return null;
+ }
+ }, "create-cache-thread");
+
+ ((TcpCommunicationSpi)ignite0.configuration().getCommunicationSpi()).simulateNodeFailure();
+
+ latch.await();
+
+ ignite0.close();
+
+ fut.get();
+
+ IgniteCache<Object, Object> cache = grid(1).cache(CACHE_NAME);
+
+ assertNotNull(cache);
+
+ cache.put(1, 1);
+
+ assertEquals(1, cache.get(1));
+
+ log.info("Try start one more node.");
+
+ nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+
+ Ignite ignite = startGrid(twoNodes ? 2 : 3);
+
+ cache = ignite.cache(CACHE_NAME);
+
+ assertNotNull(cache);
+
+ cache.put(2, 2);
+
+ assertEquals(1, cache.get(1));
+ assertEquals(2, cache.get(2));
+ }
+
+ /**
+ *
+ */
+ private static class TestCustomEventCoordinatorFailureSpi extends TcpDiscoverySpi {
+ /** */
+ private volatile CountDownLatch latch;
+
+ /** */
+ private boolean stop;
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
+ if (msg instanceof TcpDiscoveryCustomEventMessage && latch != null) {
+ log.info("Stop node on custom event: " + msg);
+
+ latch.countDown();
+
+ stop = true;
+ }
+
+ if (stop)
+ return;
+
+ super.writeToSocket(sock, msg, bout, timeout);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestCustomEventRaceSpi extends TcpDiscoverySpi {
+ /** */
+ private volatile CountDownLatch nodeAdded1;
+
+ /** */
+ private volatile CountDownLatch nodeAdded2;
+
+ /** */
+ private volatile boolean stop;
+
+ /** */
+ private boolean debug;
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
+ if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+ if (nodeAdded1 != null) {
+ nodeAdded1.countDown();
+
+ if (debug)
+ log.info("--- Wait node added: " + msg);
+
+ U.await(nodeAdded2);
+
+ nodeAdded1 = null;
+ nodeAdded2 = null;
+ }
+
+ if (stop)
+ return;
+
+ if (debug)
+ log.info("--- Send node added: " + msg);
+ }
+
+ if (debug && msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+ log.info("--- Send node finished: " + msg);
+
+ if (debug && msg instanceof TcpDiscoveryCustomEventMessage)
+ log.info("--- Send custom event: " + msg);
+
+ super.writeToSocket(sock, msg, bout, timeout);
+ }
+ }
+
+ /**
* Starts new grid with given index. Method optimize is not invoked.
*
* @param idx Index of the grid to start.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 88977fb..289da3d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCacheTypesTest;
import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransactionAtomicSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransactionSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheLockFailoverSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheMultiTxLockSelfTest;
@@ -195,6 +196,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(IgniteCacheConfigurationDefaultTemplateTest.class);
suite.addTestSuite(IgniteDynamicClientCacheStartSelfTest.class);
suite.addTestSuite(IgniteDynamicCacheStartNoExchangeTimeoutTest.class);
+ suite.addTestSuite(CacheAffinityEarlyTest.class);
suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);