You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/24 08:11:34 UTC
[1/6] ignite git commit: ignite-1524 Fixed processing of
ClientReconnectMessage
Repository: ignite
Updated Branches:
refs/heads/master b3bcf4aee -> b56b15cda
ignite-1524 Fixed processing of ClientReconnectMessage
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04f4f54a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04f4f54a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04f4f54a
Branch: refs/heads/master
Commit: 04f4f54a7ff1d43fa3baf4fa07865a8163796a82
Parents: 1942d75
Author: sboikov <sb...@gridgain.com>
Authored: Wed Sep 23 09:31:59 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Sep 23 09:31:59 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 97 +++++++++++---------
...lientDiscoverySpiFailureTimeoutSelfTest.java | 33 +++++--
2 files changed, 81 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/04f4f54a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 4ce46e8..8a205d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2958,70 +2958,81 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Client reconnect message.
*/
private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
+ UUID nodeId = msg.creatorNodeId();
+
UUID locNodeId = getLocalNodeId();
boolean isLocNodeRouter = locNodeId.equals(msg.routerNodeId());
if (!msg.verified()) {
- assert isLocNodeRouter;
-
- msg.verify(locNodeId);
+ TcpDiscoveryNode node = ring.node(nodeId);
- if (ring.hasRemoteNodes()) {
- sendMessageAcrossRing(msg);
+ assert node == null || node.isClient();
- return;
+ if (node != null) {
+ node.clientRouterNodeId(msg.routerNodeId());
+ node.aliveCheck(spi.maxMissedClientHbs);
}
- }
-
- UUID nodeId = msg.creatorNodeId();
- TcpDiscoveryNode node = ring.node(nodeId);
+ if (isLocalNodeCoordinator()) {
+ msg.verify(locNodeId);
- assert node == null || node.isClient();
+ if (node != null) {
+ Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node);
- if (node != null) {
- assert node.isClient();
+ if (pending != null) {
+ msg.pendingMessages(pending);
+ msg.success(true);
- node.clientRouterNodeId(msg.routerNodeId());
- node.aliveCheck(spi.maxMissedClientHbs);
+ if (log.isDebugEnabled())
+ log.debug("Accept client reconnect, restored pending messages " +
+ "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Failing reconnecting client node because failed to restore pending " +
+ "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
- if (isLocalNodeCoordinator()) {
- Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node);
+ processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
+ node.id(), node.internalOrder()));
+ }
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']');
- if (pending != null) {
- msg.pendingMessages(pending);
- msg.success(true);
+ if (isLocNodeRouter) {
+ ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
- if (log.isDebugEnabled())
- log.debug("Accept client reconnect, restored pending messages " +
- "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
+ if (wrk != null)
+ wrk.addMessage(msg);
+ else if (log.isDebugEnabled())
+ log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
+ locNodeId + ", clientNodeId=" + nodeId + ']');
}
else {
- if (log.isDebugEnabled())
- log.debug("Failing reconnecting client node because failed to restore pending " +
- "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
-
- processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
- node.id(), node.internalOrder()));
+ if (ring.hasRemoteNodes())
+ sendMessageAcrossRing(msg);
}
}
- }
- else if (log.isDebugEnabled())
- log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']');
-
- if (isLocNodeRouter) {
- ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
-
- if (wrk != null)
- wrk.addMessage(msg);
- else if (log.isDebugEnabled())
- log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
- locNodeId + ", clientNodeId=" + nodeId + ']');
+ else {
+ if (ring.hasRemoteNodes())
+ sendMessageAcrossRing(msg);
+ }
}
else {
- if (ring.hasRemoteNodes())
- sendMessageAcrossRing(msg);
+ if (isLocNodeRouter) {
+ ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
+
+ if (wrk != null)
+ wrk.addMessage(msg);
+ else if (log.isDebugEnabled())
+ log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
+ locNodeId + ", clientNodeId=" + nodeId + ']');
+ }
+ else {
+ if (ring.hasRemoteNodes() && !locNodeId.equals(msg.verifierNodeId()))
+ sendMessageAcrossRing(msg);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/04f4f54a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 14417c1..344efc0 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -21,7 +21,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -170,11 +172,26 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
}
/**
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectOnCoordinatorRouterFail1() throws Exception {
+ clientReconnectOnCoordinatorRouterFail(1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectOnCoordinatorRouterFail2() throws Exception {
+ clientReconnectOnCoordinatorRouterFail(2);
+ }
+
+ /**
* Test tries to provoke scenario when client sends reconnect message before router failure detected.
*
+ * @param srvNodes Number of additional server nodes.
* @throws Exception If failed.
*/
- public void _testClientReconnectOnCoordinatorRouterFail() throws Exception {
+ public void clientReconnectOnCoordinatorRouterFail(int srvNodes) throws Exception {
startServerNodes(1);
Ignite srv = G.ignite("server-0");
@@ -189,24 +206,28 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
Collections.singleton("localhost:" + srvNode.discoveryPort() + ".." + (srvNode.discoveryPort() + 1)));
failureThreshold = 1000L;
- netTimeout = 500L;
+ netTimeout = 1000L;
startClientNodes(1); // Client should connect to coordinator.
failureThreshold = 10_000L;
netTimeout = 5000L;
- for (int i = 0; i < 2; i++) {
+ List<String> nodes = new ArrayList<>();
+
+ for (int i = 0; i < srvNodes; i++) {
Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+ nodes.add(g.name());
+
srvNodeIds.add(g.cluster().localNode().id());
}
- checkNodes(3, 1);
+ checkNodes(1 + srvNodes, 1);
- final CountDownLatch latch = new CountDownLatch(3);
+ nodes.add("client-0");
- String nodes[] = {"server-1", "server-2", "client-0"};
+ final CountDownLatch latch = new CountDownLatch(nodes.size());
final AtomicBoolean err = new AtomicBoolean();
[3/6] ignite git commit: Merging IGNITE-1171 - fixed problems with
custom events in discovery
Posted by sb...@apache.org.
Merging IGNITE-1171 - fixed problems with custom events in discovery
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6f3ef6a8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6f3ef6a8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6f3ef6a8
Branch: refs/heads/master
Commit: 6f3ef6a84ee1c3e77d32ca9930835d1720918e20
Parents: 517d0f5
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Wed Sep 23 16:36:15 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Wed Sep 23 16:36:15 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/IgniteAtomicLong.java | 2 +-
.../cache/DynamicCacheDescriptor.java | 10 +-
.../GridCachePartitionExchangeManager.java | 6 +
.../processors/cache/GridCacheProcessor.java | 18 +-
.../continuous/CacheContinuousQueryManager.java | 10 +-
.../communication/tcp/TcpCommunicationSpi.java | 7 +-
.../discovery/DiscoverySpiCustomMessage.java | 12 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 301 ++++++++++++++----
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 6 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +-
.../tcp/internal/TcpDiscoveryNodesRing.java | 94 ++----
.../messages/TcpDiscoveryDiscardMessage.java | 15 +-
.../TcpDiscoveryNodeAddFinishedMessage.java | 2 +-
.../messages/TcpDiscoveryNodeAddedMessage.java | 19 +-
.../distributed/CacheAffEarlySelfTest.java | 245 ---------------
.../distributed/CacheAffinityEarlyTest.java | 168 ++++++++++
...GridCacheValueConsistencyAtomicSelfTest.java | 2 +-
.../tcp/TcpDiscoveryMultiThreadedTest.java | 53 ++--
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 315 ++++++++++++++++++-
.../testsuites/IgniteCacheTestSuite4.java | 2 +
20 files changed, 864 insertions(+), 425 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
index 83e2525..bac1a68 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
@@ -160,4 +160,4 @@ public interface IgniteAtomicLong extends Closeable {
* @throws IgniteException If operation failed.
*/
@Override public void close();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index f3c3be9..3cfc34e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -39,9 +39,6 @@ public class DynamicCacheDescriptor {
@GridToStringExclude
private CacheConfiguration cacheCfg;
- /** Cancelled flag. */
- private boolean cancelled;
-
/** Locally configured flag. */
private boolean locCfg;
@@ -156,6 +153,13 @@ public class DynamicCacheDescriptor {
}
/**
+ * @return Started flag.
+ */
+ public boolean started() {
+ return started;
+ }
+
+ /**
* @return Cache configuration.
*/
public CacheConfiguration cacheConfiguration() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 34c571c..eb76233 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1435,6 +1435,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
private static final long serialVersionUID = 0L;
/** */
+ @GridToStringInclude
private AffinityTopologyVersion topVer;
/**
@@ -1455,5 +1456,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return done;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AffinityReadyFuture.class, this, super.toString());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e92ea57..74124bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1522,10 +1522,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Collection of started cache names.
*/
public Collection<String> cacheNames() {
- return F.viewReadOnly(registeredCaches.keySet(),
- new IgniteClosure<String, String>() {
- @Override public String apply(String s) {
- return unmaskNull(s);
+ return F.viewReadOnly(registeredCaches.values(),
+ new IgniteClosure<DynamicCacheDescriptor, String>() {
+ @Override public String apply(DynamicCacheDescriptor desc) {
+ return desc.cacheConfiguration().getName();
+ }
+ },
+ new IgnitePredicate<DynamicCacheDescriptor>() {
+ @Override public boolean apply(DynamicCacheDescriptor desc) {
+ return desc.started();
}
});
}
@@ -1568,6 +1573,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.deploymentId(),
topVer
);
+
+ DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
+
+ if (desc != null)
+ desc.onStart();
}
// Start statically configured caches received from remote nodes during exchange.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index da02b97..c719f1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -448,8 +448,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
taskNameHash,
skipPrimaryCheck);
- UUID id = cctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval,
- autoUnsubscribe, grp.predicate()).get();
+ UUID id = cctx.kernalContext().continuous().startRoutine(
+ hnd,
+ bufSize,
+ timeInterval,
+ autoUnsubscribe,
+ grp.predicate()).get();
if (notifyExisting) {
final Iterator<GridCacheEntryEx> it = cctx.cache().allEntries().iterator();
@@ -811,4 +815,4 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2594213..c93d5af 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2126,8 +2126,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
timeoutHelper.checkFailureTimeoutReached(e))) {
- log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
- failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
+ if (log.isDebugEnabled())
+ log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
+ failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
throw e;
}
@@ -2700,7 +2701,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*
* FOR TEST PURPOSES ONLY!!!
*/
- void simulateNodeFailure() {
+ public void simulateNodeFailure() {
if (nioSrvr != null)
nioSrvr.stop();
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index 373c121..a0f9b75 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,13 +18,15 @@
package org.apache.ignite.spi.discovery;
import java.io.Serializable;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.jetbrains.annotations.Nullable;
/**
* Message to send across ring.
*
- * @see org.apache.ignite.internal.managers.discovery.GridDiscoveryManager#sendCustomEvent(
- * org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage)
+ * @see GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)
*/
public interface DiscoverySpiCustomMessage extends Serializable {
/**
@@ -36,4 +38,4 @@ public interface DiscoverySpiCustomMessage extends Serializable {
* @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
*/
public boolean isMutable();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8a205d2..d8ee953 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -37,10 +37,13 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Queue;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
@@ -64,6 +67,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.util.GridBoundedLinkedHashSet;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -145,7 +149,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
/**
*
*/
-@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+@SuppressWarnings("All")
class ServerImpl extends TcpDiscoveryImpl {
/** */
private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
@@ -1368,8 +1372,13 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msgs Messages to include.
* @param discardMsgId Discarded message ID.
*/
- private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID destNodeId,
- @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
+ private void prepareNodeAddedMessage(
+ TcpDiscoveryAbstractMessage msg,
+ UUID destNodeId,
+ @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+ @Nullable IgniteUuid discardMsgId,
+ @Nullable IgniteUuid discardCustomMsgId
+ ) {
assert destNodeId != null;
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
@@ -1393,7 +1402,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
nodeAddedMsg.topology(topToSnd);
- nodeAddedMsg.messages(msgs, discardMsgId);
+ nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId);
Map<Long, Collection<ClusterNode>> hist;
@@ -1416,7 +1425,7 @@ class ServerImpl extends TcpDiscoveryImpl {
nodeAddedMsg.topology(null);
nodeAddedMsg.topologyHistory(null);
- nodeAddedMsg.messages(null, null);
+ nodeAddedMsg.messages(null, null, null);
}
}
@@ -1825,7 +1834,7 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) {
if (msg instanceof TcpDiscoveryNodeAddedMessage)
- prepareNodeAddedMessage(msg, destNodeId, null, null);
+ prepareNodeAddedMessage(msg, destNodeId, null, null, null);
return msg;
}
@@ -1834,16 +1843,22 @@ class ServerImpl extends TcpDiscoveryImpl {
/**
* Pending messages container.
*/
- private static class PendingMessages {
+ private static class PendingMessages implements Iterable<TcpDiscoveryAbstractMessage> {
/** */
private static final int MAX = 1024;
/** Pending messages. */
private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+ /** Processed custom message IDs. */
+ private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<IgniteUuid>(MAX * 2);
+
/** Discarded message ID. */
private IgniteUuid discardId;
+ /** Discarded message ID. */
+ private IgniteUuid customDiscardId;
+
/**
* Adds pending message and shrinks queue if it exceeds limit
* (messages that were not discarded yet are never removed).
@@ -1869,31 +1884,118 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msgs Message.
* @param discardId Discarded message ID.
*/
- void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) {
+ void reset(
+ @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+ @Nullable IgniteUuid discardId,
+ @Nullable IgniteUuid customDiscardId
+ ) {
this.msgs.clear();
if (msgs != null)
this.msgs.addAll(msgs);
this.discardId = discardId;
+ this.customDiscardId = customDiscardId;
}
/**
- * Clears pending messages.
+ * Discards message with provided ID and all before it.
+ *
+ * @param id Discarded message ID.
*/
- void clear() {
- msgs.clear();
+ void discard(IgniteUuid id, boolean custom) {
+ if (custom)
+ customDiscardId = id;
+ else
+ discardId = id;
+ }
- discardId = null;
+ /**
+ * Gets iterator for non-discarded messages.
+ *
+ * @return Non-discarded messages iterator.
+ */
+ public Iterator<TcpDiscoveryAbstractMessage> iterator() {
+ return new SkipIterator();
}
/**
- * Discards message with provided ID and all before it.
*
- * @param id Discarded message ID.
*/
- void discard(IgniteUuid id) {
- discardId = id;
+ private class SkipIterator implements Iterator<TcpDiscoveryAbstractMessage> {
+ /** Skip non-custom messages flag. */
+ private boolean skipMsg = discardId != null;
+
+ /** Skip custom messages flag. */
+ private boolean skipCustomMsg;
+
+ /** Internal iterator. */
+ private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+
+ /** Next message. */
+ private TcpDiscoveryAbstractMessage next;
+
+ {
+ advance();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return next != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public TcpDiscoveryAbstractMessage next() {
+ if (next == null)
+ throw new NoSuchElementException();
+
+ TcpDiscoveryAbstractMessage next0 = next;
+
+ advance();
+
+ return next0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Advances iterator to the next available item.
+ */
+ private void advance() {
+ next = null;
+
+ while (msgIt.hasNext()) {
+ TcpDiscoveryAbstractMessage msg0 = msgIt.next();
+
+ if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
+ if (skipCustomMsg) {
+ assert customDiscardId != null;
+
+ if (F.eq(customDiscardId, msg0.id()))
+ skipCustomMsg = false;
+
+ continue;
+ }
+ }
+ else {
+ if (skipMsg) {
+ assert discardId != null;
+
+ if (F.eq(discardId, msg0.id()))
+ skipMsg = false;
+
+ continue;
+ }
+ }
+
+ next = msg0;
+
+ break;
+ }
+ }
}
}
@@ -1941,6 +2043,12 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Connection check threshold. */
private long connCheckThreshold;
+ /** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished messages. */
+ private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>();
+
+ /** Collection to track joining nodes. */
+ private Set<UUID> joiningNodes = new HashSet<>();
+
/**
*/
protected RingMessageWorker() {
@@ -2046,6 +2154,8 @@ class ServerImpl extends TcpDiscoveryImpl {
sendHeartbeatMessage();
checkHeartbeatsReceiving();
+
+ checkPendingCustomMessages();
}
/**
@@ -2323,20 +2433,11 @@ class ServerImpl extends TcpDiscoveryImpl {
debugLog("Pending messages will be sent [failure=" + failure +
", forceSndPending=" + forceSndPending + ']');
- boolean skip = pendingMsgs.discardId != null;
-
- for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
- if (skip) {
- if (pendingMsg.id().equals(pendingMsgs.discardId))
- skip = false;
-
- continue;
- }
-
+ for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
long tstamp = U.currentTimeMillis();
prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
- pendingMsgs.discardId);
+ pendingMsgs.discardId, pendingMsgs.customDiscardId);
if (timeoutHelper == null)
timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
@@ -2354,13 +2455,13 @@ class ServerImpl extends TcpDiscoveryImpl {
int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
if (log.isDebugEnabled())
- log.debug("Pending message has been sent to next node [msg=" + msg.id() +
- ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
+ log.debug("Pending message has been sent to next node [msgId=" + msg.id() +
+ ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
", res=" + res + ']');
if (debugMode)
- debugLog("Pending message has been sent to next node [msg=" + msg.id() +
- ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
+ debugLog("Pending message has been sent to next node [msgId=" + msg.id() +
+ ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
", res=" + res + ']');
// Resetting timeout control object to create a new one for the next bunch of
@@ -2377,7 +2478,8 @@ class ServerImpl extends TcpDiscoveryImpl {
msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
}
else
- prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
+ prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId,
+ pendingMsgs.customDiscardId);
try {
long tstamp = U.currentTimeMillis();
@@ -2478,21 +2580,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- if (msg instanceof TcpDiscoveryStatusCheckMessage) {
- TcpDiscoveryStatusCheckMessage msg0 = (TcpDiscoveryStatusCheckMessage)msg;
-
- if (next.id().equals(msg0.failedNodeId())) {
- next = null;
-
- if (log.isDebugEnabled())
- log.debug("Discarding status check since next node has indeed failed [next=" + next +
- ", msg=" + msg + ']');
-
- // Discard status check message by exiting loop and handle failure.
- break;
- }
- }
-
next = null;
searchNext = true;
@@ -2524,6 +2611,29 @@ class ServerImpl extends TcpDiscoveryImpl {
for (TcpDiscoveryNode n : failedNodes)
msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, n.id(), n.internalOrder()));
+ if (!sent) {
+ if (log.isDebugEnabled())
+ log.debug("Pending messages will be resent to local node");
+
+ if (debugMode)
+ log.debug("Pending messages will be resent to local node");
+
+ for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
+ prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
+ pendingMsgs.customDiscardId);
+
+ msgWorker.addMessage(pendingMsg);
+
+ if (log.isDebugEnabled())
+ log.debug("Pending message has been sent to local node [msg=" + msg.id() +
+ ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + ']');
+
+ if (debugMode)
+ debugLog("Pending message has been sent to local node [msg=" + msg.id() +
+ ", pendingMsgId=" + pendingMsg + ", next=" + next.id() + ']');
+ }
+ }
+
LT.warn(log, null, "Local node has detected failed nodes and started cluster-wide procedure. " +
"To speed up failure detection please see 'Failure Detection' section under javadoc" +
" for 'TcpDiscoverySpi'");
@@ -3077,7 +3187,7 @@ class ServerImpl extends TcpDiscoveryImpl {
processNodeAddFinishedMessage(addFinishMsg);
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
@@ -3118,6 +3228,8 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
+ joiningNodes.add(node.id());
+
if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) {
boolean authFailed = true;
@@ -3222,6 +3334,8 @@ class ServerImpl extends TcpDiscoveryImpl {
n.visible(true);
}
+ joiningNodes.clear();
+
locNode.setAttributes(node.attributes());
locNode.visible(true);
@@ -3237,10 +3351,11 @@ class ServerImpl extends TcpDiscoveryImpl {
topHist.clear();
topHist.putAll(msg.topologyHistory());
- pendingMsgs.discard(msg.discardedMessageId());
+ pendingMsgs.reset(msg.messages(), msg.discardedMessageId(),
+ msg.discardedCustomMessageId());
// Clear data to minimize message size.
- msg.messages(null, null);
+ msg.messages(null, null, null);
msg.topology(null);
msg.topologyHistory(null);
msg.clearDiscoveryData();
@@ -3307,7 +3422,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.verified()) {
spi.stats.onRingMessageReceived(msg);
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
@@ -3342,7 +3457,11 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) {
+ joiningNodes.remove(nodeId);
+
+ TcpDiscoverySpiState state = spiStateCopy();
+
+ if (msg.verified() && !locNodeId.equals(nodeId) && state != CONNECTING && fireEvt) {
spi.stats.onNodeJoined();
// Make sure that node with greater order will never get EVT_NODE_JOINED
@@ -3357,7 +3476,7 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean b = ring.topologyVersion(topVer);
assert b : "Topology version has not been updated: [ring=" + ring + ", msg=" + msg +
- ", lastMsg=" + lastMsg + ", spiState=" + spiStateCopy() + ']';
+ ", lastMsg=" + lastMsg + ", spiState=" + state + ']';
if (log.isDebugEnabled())
log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
@@ -3365,7 +3484,8 @@ class ServerImpl extends TcpDiscoveryImpl {
lastMsg = msg;
}
- notifyDiscovery(EVT_NODE_JOINED, topVer, node);
+ if (state == CONNECTED)
+ notifyDiscovery(EVT_NODE_JOINED, topVer, node);
try {
if (spi.ipFinder.isShared() && locNodeCoord)
@@ -3381,7 +3501,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- if (msg.verified() && locNodeId.equals(nodeId) && spiStateCopy() == CONNECTING) {
+ if (msg.verified() && locNodeId.equals(nodeId) && state == CONNECTING) {
assert node != null;
assert topVer > 0 : "Invalid topology version: " + msg;
@@ -3402,6 +3522,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
+
+ checkPendingCustomMessages();
}
/**
@@ -3481,7 +3603,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.verified()) {
spi.stats.onRingMessageReceived(msg);
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
@@ -3553,6 +3675,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
+ joiningNodes.remove(leftNode.id());
+
spi.stats.onNodeLeft();
notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode);
@@ -3580,6 +3704,8 @@ class ServerImpl extends TcpDiscoveryImpl {
U.closeQuiet(sock);
}
+
+ checkPendingCustomMessages();
}
/**
@@ -3650,7 +3776,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg.verified()) {
spi.stats.onRingMessageReceived(msg);
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
}
@@ -3707,6 +3833,8 @@ class ServerImpl extends TcpDiscoveryImpl {
", msg=" + msg.warning() + ']');
}
+ joiningNodes.remove(node.id());
+
notifyDiscovery(EVT_NODE_FAILED, topVer, node);
spi.stats.onNodeFailed();
@@ -3720,6 +3848,8 @@ class ServerImpl extends TcpDiscoveryImpl {
U.closeQuiet(sock);
}
+
+ checkPendingCustomMessages();
}
/**
@@ -4046,7 +4176,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.verified())
- pendingMsgs.discard(msgId);
+ pendingMsgs.discard(msgId, msg.customMessageDiscard());
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
@@ -4098,18 +4228,23 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
if (isLocalNodeCoordinator()) {
- boolean sndNext;
+ if (!joiningNodes.isEmpty()) {
+ pendingCustomMsgs.add(msg);
- if (!msg.verified()) {
+ return;
+ }
+
+ boolean sndNext = !msg.verified();
+
+ if (sndNext) {
msg.verify(getLocalNodeId());
msg.topologyVersion(ring.topologyVersion());
- notifyDiscoveryListener(msg);
-
- sndNext = true;
+ if (pendingMsgs.procCustomMsgs.add(msg.id()))
+ notifyDiscoveryListener(msg);
+ else
+ sndNext = false;
}
- else
- sndNext = false;
if (sndNext && ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
@@ -4139,12 +4274,30 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id()));
+ addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));
}
}
else {
- if (msg.verified())
+ TcpDiscoverySpiState state0;
+
+ synchronized (mux) {
+ state0 = spiState;
+ }
+
+ if (msg.verified() && msg.topologyVersion() != ring.topologyVersion()) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding custom event message [msg=" + msg + ", ring=" + ring + ']');
+
+ return;
+ }
+
+ if (msg.verified() && state0 == CONNECTED && pendingMsgs.procCustomMsgs.add(msg.id())) {
+ assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes + ", msg=" + msg + ", loc=" + locNode.id() +
+ ", topver=" + ring.topologyVersion();
+ assert msg.topologyVersion() == ring.topologyVersion() : "msg: " + msg + ", topver=" + ring.topologyVersion();
+
notifyDiscoveryListener(msg);
+ }
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
@@ -4152,6 +4305,18 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * Checks and flushes custom event messages if no nodes are attempting to join the grid.
+ */
+ private void checkPendingCustomMessages() {
+ if (joiningNodes.isEmpty() && isLocalNodeCoordinator()) {
+ TcpDiscoveryCustomEventMessage msg;
+
+ while ((msg = pendingCustomMsgs.poll()) != null)
+ processCustomMessage(msg);
+ }
+ }
+
+ /**
* @param msg Custom message.
*/
private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) {
@@ -5081,7 +5246,7 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
- prepareNodeAddedMessage(msg, clientNodeId, null, null);
+ prepareNodeAddedMessage(msg, clientNodeId, null, null, null);
writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
spi.failureDetectionTimeout() : spi.getSocketTimeout());
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index e5be530..2786d0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -313,4 +313,4 @@ abstract class TcpDiscoveryImpl {
return res;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 80fcc46..6254605 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2038,4 +2038,4 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
return S.toString(SocketTimeoutObject.class, this);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index 2b17696..7ca092c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -17,7 +17,17 @@
package org.apache.ignite.spi.discovery.tcp.internal;
-import java.util.ArrayList;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.PN;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.jetbrains.annotations.Nullable;
+
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -29,16 +39,6 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.PN;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.jetbrains.annotations.Nullable;
/**
* Convenient way to represent topology for {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}
@@ -81,6 +81,9 @@ public class TcpDiscoveryNodesRing {
/** */
private long nodeOrder;
+ /** */
+ private long maxInternalOrder;
+
/** Lock. */
@GridToStringExclude
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
@@ -99,6 +102,8 @@ public class TcpDiscoveryNodesRing {
this.locNode = locNode;
clear();
+
+ maxInternalOrder = locNode.internalOrder();
}
finally {
rwLock.writeLock().unlock();
@@ -204,7 +209,9 @@ public class TcpDiscoveryNodesRing {
if (nodesMap.containsKey(node.id()))
return false;
- assert node.internalOrder() > maxInternalOrder() : "Adding node to the middle of the ring " +
+ long maxInternalOrder0 = maxInternalOrder();
+
+ assert node.internalOrder() > maxInternalOrder0 : "Adding node to the middle of the ring " +
"[ring=" + this + ", node=" + node + ']';
nodesMap.put(node.id(), node);
@@ -216,6 +223,8 @@ public class TcpDiscoveryNodesRing {
nodes.add(node);
nodeOrder = node.internalOrder();
+
+ maxInternalOrder = node.internalOrder();
}
finally {
rwLock.writeLock().unlock();
@@ -231,9 +240,13 @@ public class TcpDiscoveryNodesRing {
rwLock.readLock().lock();
try {
- TcpDiscoveryNode last = nodes.last();
+ if (maxInternalOrder == 0) {
+ TcpDiscoveryNode last = nodes.last();
+
+ return last != null ? maxInternalOrder = last.internalOrder() : -1;
+ }
- return last != null ? last.internalOrder() : -1;
+ return maxInternalOrder;
}
finally {
rwLock.readLock().unlock();
@@ -336,47 +349,6 @@ public class TcpDiscoveryNodesRing {
}
/**
- * Removes nodes from the topology.
- *
- * @param nodeIds IDs of the nodes to remove.
- * @return Collection of removed nodes.
- */
- public Collection<TcpDiscoveryNode> removeNodes(Collection<UUID> nodeIds) {
- assert !F.isEmpty(nodeIds);
-
- rwLock.writeLock().lock();
-
- try {
- boolean firstRmv = true;
-
- Collection<TcpDiscoveryNode> res = null;
-
- for (UUID id : nodeIds) {
- TcpDiscoveryNode rmv = nodesMap.remove(id);
-
- if (rmv != null) {
- if (firstRmv) {
- nodes = new TreeSet<>(nodes);
-
- res = new ArrayList<>(nodeIds.size());
-
- firstRmv = false;
- }
-
- nodes.remove(rmv);
-
- res.add(rmv);
- }
- }
-
- return res == null ? Collections.<TcpDiscoveryNode>emptyList() : res;
- }
- finally {
- rwLock.writeLock().unlock();
- }
- }
-
- /**
* Removes all remote nodes, leaves only local node.
* <p>
* This should be called when SPI should be disconnected from topology and
@@ -397,6 +369,7 @@ public class TcpDiscoveryNodesRing {
nodesMap.put(locNode.id(), locNode);
nodeOrder = 0;
+ maxInternalOrder = 0;
topVer = 0;
}
@@ -622,13 +595,8 @@ public class TcpDiscoveryNodesRing {
rwLock.writeLock().lock();
try {
- if (nodeOrder == 0) {
- TcpDiscoveryNode last = nodes.last();
-
- assert last != null;
-
- nodeOrder = last.internalOrder();
- }
+ if (nodeOrder == 0)
+ nodeOrder = maxInternalOrder();
return ++nodeOrder;
}
@@ -681,4 +649,4 @@ public class TcpDiscoveryNodesRing {
rwLock.readLock().unlock();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
index 1e1fa6b..145f19e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
@@ -32,16 +32,20 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage {
/** ID of the message to discard (this and all preceding). */
private final IgniteUuid msgId;
+ /** True if this is discard ID for custom event message. */
+ private final boolean customMsgDiscard;
+
/**
* Constructor.
*
* @param creatorNodeId Creator node ID.
* @param msgId Message ID.
*/
- public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId) {
+ public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId, boolean customMsgDiscard) {
super(creatorNodeId);
this.msgId = msgId;
+ this.customMsgDiscard = customMsgDiscard;
}
/**
@@ -53,6 +57,15 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage {
return msgId;
}
+ /**
+ * Flag indicating whether the ID to discard is for a custom message or not.
+ *
+ * @return Custom message flag.
+ */
+ public boolean customMessageDiscard() {
+ return customMsgDiscard;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryDiscardMessage.class, this, "super", super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index c6a469f..1b99a56 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -101,4 +101,4 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
@Override public String toString() {
return S.toString(TcpDiscoveryNodeAddFinishedMessage.class, this, "super", super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 01c6789..5a7146d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -48,6 +48,9 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
/** Discarded message ID. */
private IgniteUuid discardMsgId;
+ /** Discarded message ID. */
+ private IgniteUuid discardCustomMsgId;
+
/** Current topology. Initialized by coordinator. */
@GridToStringInclude
private Collection<TcpDiscoveryNode> top;
@@ -117,14 +120,28 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
}
/**
+ * Gets discarded custom message ID.
+ *
+ * @return Discarded message ID.
+ */
+ @Nullable public IgniteUuid discardedCustomMessageId() {
+ return discardCustomMsgId;
+ }
+
+ /**
* Sets pending messages to send to new node.
*
* @param msgs Pending messages to send to new node.
* @param discardMsgId Discarded message ID.
*/
- public void messages(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
+ public void messages(
+ @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+ @Nullable IgniteUuid discardMsgId,
+ @Nullable IgniteUuid discardCustomMsgId
+ ) {
this.msgs = msgs;
this.discardMsgId = discardMsgId;
+ this.discardCustomMsgId = discardCustomMsgId;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
deleted file mode 100644
index 7f0ca11..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffEarlySelfTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Random;
-import java.util.UUID;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteFutureTimeoutException;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- *
- */
-public class CacheAffEarlySelfTest extends GridCommonAbstractTest {
- /** Grid count. */
- private static int GRID_CNT = 8;
-
- /** Operation timeout. */
- private static long OP_TIMEOUT = 5000;
-
- /** Always dump threads or only once per operation. */
- private static boolean ALWAYS_DUMP_THREADS = false;
-
- /** Stopped. */
- private volatile boolean stopped;
-
- /** Iteration. */
- private int iters = 10;
-
- /** Concurrent. */
- private boolean concurrent = true;
-
- /** Futs. */
- private Collection<IgniteInternalFuture<?>> futs = new ArrayList<>(GRID_CNT);
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder(true);
- finder.setAddresses(Collections.singletonList("127.0.0.1:47500..47510"));
-
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
- discoSpi.setIpFinder(finder);
-
- cfg.setDiscoverySpi(discoSpi);
-
- OptimizedMarshaller marsh = new OptimizedMarshaller();
- marsh.setRequireSerializable(false);
-
- cfg.setMarshaller(marsh);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected long getTestTimeout() {
- return 6 * 60 * 1000L;
- }
-
- /**
- *
- */
- public void testStartNodes() throws Exception {
- for (int i = 0; i < iters; i++) {
- try {
- System.out.println("*** Iteration " + (i + 1) + '/' + iters);
-
- IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
- @Override public void run() {
- try {
- doTest();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, 1);
-
- fut.get(30000);
- }
- catch (IgniteFutureTimeoutCheckedException e) {
- // No-op.
- }
- finally {
- stopAllGrids(true);
- }
- }
- }
-
- /**
- *
- */
- public void doTest() throws Exception {
- for (int i = 0; i < GRID_CNT; i++) {
- final int idx = i;
-
- final Ignite grid = concurrent ? null : startGrid(idx);
-
- IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
- @Override public void run() {
- Random rnd = new Random();
-
- try {
- final Ignite ignite = grid == null ? startGrid(idx) : grid;
-
- final IgniteCache<UUID, UUID> cache = getCache(ignite).withAsync();
-
- CacheAffEarlySelfTest.this.execute(cache, new IgniteInClosure<IgniteCache<UUID,UUID>>() {
- @Override public void apply(IgniteCache<UUID, UUID> entries) {
- cache.put(ignite.cluster().localNode().id(), UUID.randomUUID());
- }
- });
-
- while (!stopped) {
- int val = Math.abs(rnd.nextInt(100));
- if (val >= 0 && val < 40)
- execute(cache, new IgniteInClosure<IgniteCache<UUID, UUID>>() {
- @Override public void apply(IgniteCache<UUID, UUID> entries) {
- cache.containsKey(ignite.cluster().localNode().id());
- }
- });
- else if (val >= 40 && val < 80)
- execute(cache, new IgniteInClosure<IgniteCache<UUID, UUID>>() {
- @Override public void apply(IgniteCache<UUID, UUID> entries) {
- cache.get(ignite.cluster().localNode().id());
- }
- });
- else
- execute(cache, new IgniteInClosure<IgniteCache<UUID, UUID>>() {
- @Override public void apply(IgniteCache<UUID, UUID> entries) {
- cache.put(ignite.cluster().localNode().id(), UUID.randomUUID());
- }
- });
-
- Thread.sleep(50);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, 1);
-
- futs.add(fut);
- }
-
- Thread.sleep(10000);
-
- stopped = true;
-
- for (IgniteInternalFuture<?> fut : futs)
- fut.get();
- }
-
- /**
- * @param cache Cache.
- * @param c Closure.
- */
- private void execute(IgniteCache<UUID, UUID> cache, IgniteInClosure<IgniteCache<UUID, UUID>> c) {
- c.apply(cache);
-
- IgniteFuture<Object> fut = cache.future();
-
- boolean success = false;
-
- int iter = 0;
-
- while (!success) {
- try {
- fut.get(OP_TIMEOUT);
-
- success = true;
- }
- catch (IgniteFutureTimeoutException e) {
- debug(iter == 0 || ALWAYS_DUMP_THREADS);
- }
-
- iter++;
- }
- }
-
- /**
- *
- */
- private void debug(boolean dumpThreads) {
- log.info("DUMPING DEBUG INFO:");
-
- for (Ignite ignite : G.allGrids())
- ((IgniteKernal)ignite).dumpDebugInfo();
-
- if (dumpThreads) {
- U.dumpThreads(null);
-
- U.dumpThreads(log);
- }
- }
-
- /**
- * @param grid Grid.
- */
- private IgniteCache<UUID, UUID> getCache(Ignite grid) {
- CacheConfiguration<UUID, UUID> ccfg = defaultCacheConfiguration();
-
- ccfg.setCacheMode(CacheMode.PARTITIONED);
- ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
- ccfg.setBackups(1);
- ccfg.setNearConfiguration(null);
-
- return grid.getOrCreateCache(ccfg);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffinityEarlyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffinityEarlyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffinityEarlyTest.java
new file mode 100644
index 0000000..6b67139
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAffinityEarlyTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheAffinityEarlyTest extends GridCommonAbstractTest {
+ /** Grid count. */
+ private static int GRID_CNT = 8;
+
+ /** Stopped. */
+ private volatile boolean stopped;
+
+ /** Iteration. */
+ private static final int iters = 10;
+
+ /** Concurrent. */
+ private static final boolean concurrent = true;
+
+ /** Futs. */
+ private Collection<IgniteInternalFuture<?>> futs = new ArrayList<>(GRID_CNT);
+
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+ discoSpi.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ OptimizedMarshaller marsh = new OptimizedMarshaller();
+ marsh.setRequireSerializable(false);
+
+ cfg.setMarshaller(marsh);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 6 * 60 * 1000L;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartNodes() throws Exception {
+ for (int i = 0; i < iters; i++) {
+ try {
+ log.info("Iteration: " + (i + 1) + '/' + iters);
+
+ doTest();
+ }
+ finally {
+ stopAllGrids(true);
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void doTest() throws Exception {
+ final AtomicBoolean failed = new AtomicBoolean();
+
+ for (int i = 0; i < GRID_CNT; i++) {
+ final int idx = i;
+
+ final Ignite grid = concurrent ? null : startGrid(idx);
+
+ IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+ @Override public void run() {
+ Random rnd = new Random();
+
+ try {
+ Ignite ignite = grid == null ? startGrid(idx) : grid;
+
+ IgniteCache<Object, Object> cache = getCache(ignite);
+
+ cache.put(ignite.cluster().localNode().id(), UUID.randomUUID());
+
+ while (!stopped) {
+ int val = Math.abs(rnd.nextInt(100));
+
+ if (val >= 0 && val < 40)
+ cache.containsKey(ignite.cluster().localNode().id());
+ else if (val >= 40 && val < 80)
+ cache.get(ignite.cluster().localNode().id());
+ else
+ cache.put(ignite.cluster().localNode().id(), UUID.randomUUID());
+
+ Thread.sleep(50);
+ }
+ }
+ catch (Exception e) {
+ log.error("Unexpected error: " + e, e);
+
+ failed.set(true);
+ }
+ }
+ }, 1);
+
+ futs.add(fut);
+ }
+
+ Thread.sleep(10_000);
+
+ stopped = true;
+
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.get();
+
+ assertFalse(failed.get());
+ }
+
+ /**
+ * @param grid Grid.
+ * @return Cache.
+ */
+ private IgniteCache getCache(Ignite grid) {
+ CacheConfiguration ccfg = defaultCacheConfiguration();
+
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+ ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+ ccfg.setBackups(1);
+ ccfg.setNearConfiguration(null);
+
+ return grid.getOrCreateCache(ccfg);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
index 7451911..18c8d8e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
@@ -35,4 +35,4 @@ public class GridCacheValueConsistencyAtomicSelfTest extends GridCacheValueConsi
@Override protected int iterationCount() {
return 100_000;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 16fa662..1ccbe1f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -102,7 +102,7 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
- return 3 * 60 * 1000;
+ return 5 * 60 * 1000;
}
/**
@@ -249,35 +249,48 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
* @throws Exception If any error occurs.
*/
public void testMultipleStartOnCoordinatorStop() throws Exception{
- clientFlagGlobal = false;
+ for (int k = 0; k < 3; k++) {
+ log.info("Iteration: " + k);
- startGrids(GRID_CNT);
+ clientFlagGlobal = false;
- final CyclicBarrier barrier = new CyclicBarrier(GRID_CNT + 4);
+ final int START_NODES = 5;
+ final int JOIN_NODES = 10;
- final AtomicInteger startIdx = new AtomicInteger(GRID_CNT);
+ startGrids(START_NODES);
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- barrier.await();
+ final CyclicBarrier barrier = new CyclicBarrier(JOIN_NODES + 1);
- Ignite ignite = startGrid(startIdx.getAndIncrement());
+ final AtomicInteger startIdx = new AtomicInteger(START_NODES);
- assertFalse(ignite.configuration().isClientMode());
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int idx = startIdx.getAndIncrement();
- log.info("Started node: " + ignite.name());
+ Thread.currentThread().setName("start-thread-" + idx);
- return null;
- }
- }, GRID_CNT + 3, "start-thread");
+ barrier.await();
- barrier.await();
+ Ignite ignite = startGrid(idx);
- U.sleep(ThreadLocalRandom.current().nextInt(10, 100));
+ assertFalse(ignite.configuration().isClientMode());
- for (int i = 0; i < GRID_CNT; i++)
- stopGrid(i);
+ log.info("Started node: " + ignite.name());
+
+ return null;
+ }
+ }, JOIN_NODES, "start-thread");
- fut.get();
+ barrier.await();
+
+ U.sleep(ThreadLocalRandom.current().nextInt(10, 100));
+
+ for (int i = 0; i < START_NODES; i++)
+ stopGrid(i);
+
+ fut.get();
+
+ stopAllGrids();
+ }
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 981f649..0280e9c 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -35,8 +35,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
@@ -45,6 +47,7 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.port.GridPortRecord;
+import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
@@ -52,11 +55,14 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
@@ -87,6 +93,9 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
/** */
private UUID nodeId;
+ /** */
+ private TcpDiscoverySpi nodeSpi;
+
/**
* @throws Exception If fails.
*/
@@ -99,8 +108,11 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- TcpDiscoverySpi spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ?
- new TestTcpDiscoverySpi() : new TcpDiscoverySpi();
+ TcpDiscoverySpi spi = nodeSpi;
+
+ if (spi == null)
+ spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ?
+ new TestTcpDiscoverySpi() : new TcpDiscoverySpi();
discoMap.put(gridName, spi);
@@ -1164,6 +1176,305 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed
+ */
+ public void testCustomEventRace1_1() throws Exception {
+ try {
+ customEventRace1(true, false);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testCustomEventRace1_2() throws Exception {
+ try {
+ customEventRace1(false, false);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testCustomEventRace1_3() throws Exception {
+ try {
+ customEventRace1(true, true);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @param cacheStartFrom1 If {code true} starts cache from node1.
+ * @param stopCrd If {@code true} stops coordinator.
+ * @throws Exception If failed
+ */
+ private void customEventRace1(final boolean cacheStartFrom1, boolean stopCrd) throws Exception {
+ TestCustomEventRaceSpi spi0 = new TestCustomEventRaceSpi();
+
+ nodeSpi = spi0;
+
+ final Ignite ignite0 = startGrid(0);
+
+ nodeSpi = new TestCustomEventRaceSpi();
+
+ final Ignite ignite1 = startGrid(1);
+
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+
+ spi0.nodeAdded1 = latch1;
+ spi0.nodeAdded2 = latch2;
+ spi0.debug = true;
+
+ IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ log.info("Start 2");
+
+ nodeSpi = new TestCustomEventRaceSpi();
+
+ Ignite ignite2 = startGrid(2);
+
+ return null;
+ }
+ });
+
+ latch1.await();
+
+ final String CACHE_NAME = "cache";
+
+ IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(CACHE_NAME);
+
+ Ignite ignite = cacheStartFrom1 ? ignite1 : ignite0;
+
+ ignite.createCache(ccfg);
+
+ return null;
+ }
+ });
+
+ if (stopCrd) {
+ spi0.stop = true;
+
+ latch2.countDown();
+
+ ignite0.close();
+ }
+ else {
+ U.sleep(500);
+
+ latch2.countDown();
+ }
+
+ fut1.get();
+ fut2.get();
+
+ IgniteCache<Object, Object> cache = grid(2).cache(CACHE_NAME);
+
+ assertNotNull(cache);
+
+ cache.put(1, 1);
+
+ assertEquals(1, cache.get(1));
+
+ nodeSpi = new TestCustomEventRaceSpi();
+
+ Ignite ignite = startGrid(3);
+
+ cache = ignite.cache(CACHE_NAME);
+
+ cache.put(2, 2);
+
+ assertEquals(1, cache.get(1));
+ assertEquals(2, cache.get(2));
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testCustomEventCoordinatorFailure1() throws Exception {
+ try {
+ customEventCoordinatorFailure(true);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testCustomEventCoordinatorFailure2() throws Exception {
+ try {
+ customEventCoordinatorFailure(false);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @param twoNodes If {@code true} starts two nodes, otherwise three.
+ * @throws Exception If failed
+ */
+ private void customEventCoordinatorFailure(boolean twoNodes) throws Exception {
+ TestCustomEventCoordinatorFailureSpi spi0 = new TestCustomEventCoordinatorFailureSpi();
+
+ nodeSpi = spi0;
+
+ Ignite ignite0 = startGrid(0);
+
+ nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+
+ Ignite ignite1 = startGrid(1);
+
+ nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+
+ Ignite ignite2 = twoNodes ? null : startGrid(2);
+
+ final Ignite createCacheNode = ignite2 != null ? ignite2 : ignite1;
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ spi0.latch = latch;
+
+ final String CACHE_NAME = "test-cache";
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ log.info("Create test cache");
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(CACHE_NAME);
+
+ createCacheNode.createCache(ccfg);
+
+ return null;
+ }
+ }, "create-cache-thread");
+
+ ((TcpCommunicationSpi)ignite0.configuration().getCommunicationSpi()).simulateNodeFailure();
+
+ latch.await();
+
+ ignite0.close();
+
+ fut.get();
+
+ IgniteCache<Object, Object> cache = grid(1).cache(CACHE_NAME);
+
+ assertNotNull(cache);
+
+ cache.put(1, 1);
+
+ assertEquals(1, cache.get(1));
+
+ log.info("Try start one more node.");
+
+ nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+
+ Ignite ignite = startGrid(twoNodes ? 2 : 3);
+
+ cache = ignite.cache(CACHE_NAME);
+
+ assertNotNull(cache);
+
+ cache.put(2, 2);
+
+ assertEquals(1, cache.get(1));
+ assertEquals(2, cache.get(2));
+ }
+
+ /**
+ *
+ */
+ private static class TestCustomEventCoordinatorFailureSpi extends TcpDiscoverySpi {
+ /** */
+ private volatile CountDownLatch latch;
+
+ /** */
+ private boolean stop;
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
+ if (msg instanceof TcpDiscoveryCustomEventMessage && latch != null) {
+ log.info("Stop node on custom event: " + msg);
+
+ latch.countDown();
+
+ stop = true;
+ }
+
+ if (stop)
+ return;
+
+ super.writeToSocket(sock, msg, bout, timeout);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestCustomEventRaceSpi extends TcpDiscoverySpi {
+ /** */
+ private volatile CountDownLatch nodeAdded1;
+
+ /** */
+ private volatile CountDownLatch nodeAdded2;
+
+ /** */
+ private volatile boolean stop;
+
+ /** */
+ private boolean debug;
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
+ if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+ if (nodeAdded1 != null) {
+ nodeAdded1.countDown();
+
+ if (debug)
+ log.info("--- Wait node added: " + msg);
+
+ U.await(nodeAdded2);
+
+ nodeAdded1 = null;
+ nodeAdded2 = null;
+ }
+
+ if (stop)
+ return;
+
+ if (debug)
+ log.info("--- Send node added: " + msg);
+ }
+
+ if (debug && msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+ log.info("--- Send node finished: " + msg);
+
+ if (debug && msg instanceof TcpDiscoveryCustomEventMessage)
+ log.info("--- Send custom event: " + msg);
+
+ super.writeToSocket(sock, msg, bout, timeout);
+ }
+ }
+
+ /**
* Starts new grid with given index. Method optimize is not invoked.
*
* @param idx Index of the grid to start.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f3ef6a8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 88977fb..289da3d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCacheTypesTest;
import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransactionAtomicSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransactionSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheLockFailoverSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheMultiTxLockSelfTest;
@@ -195,6 +196,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(IgniteCacheConfigurationDefaultTemplateTest.class);
suite.addTestSuite(IgniteDynamicClientCacheStartSelfTest.class);
suite.addTestSuite(IgniteDynamicCacheStartNoExchangeTimeoutTest.class);
+ suite.addTestSuite(CacheAffinityEarlyTest.class);
suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);
[2/6] ignite git commit: Fixed test.
Posted by sb...@apache.org.
Fixed test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/517d0f58
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/517d0f58
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/517d0f58
Branch: refs/heads/master
Commit: 517d0f584f67e9291b7f6f2efe3f42b7131f6a25
Parents: 04f4f54
Author: sboikov <sb...@gridgain.com>
Authored: Wed Sep 23 15:43:13 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Sep 23 15:43:13 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAbstractRemoveFailureTest.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/517d0f58/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
index 647746e..a3d9948 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
@@ -365,13 +365,13 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCommonAbstr
U.sleep(random(START_DELAY.get1(), START_DELAY.get2()));
- if (stop.get())
- return;
-
log.info("Restarting node " + idx);
startGrid(idx);
+ if (stop.get())
+ return;
+
U.sleep(1000);
}
[5/6] ignite git commit: IGNITE-1536 - Removed duplicated continuous
query notifications in REPLICATED cache
Posted by sb...@apache.org.
IGNITE-1536 - Removed duplicated continuous query notifications in REPLICATED cache
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7db44f11
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7db44f11
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7db44f11
Branch: refs/heads/master
Commit: 7db44f11f7925b5a29a0a3e017baa93b52fb2982
Parents: 70a8a92
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Sep 23 18:53:06 2015 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Sep 23 18:53:06 2015 -0700
----------------------------------------------------------------------
.../processors/cache/IgniteCacheProxy.java | 4 +-
.../continuous/CacheContinuousQueryManager.java | 58 +++------
.../continuous/GridContinuousProcessor.java | 3 +-
...ontinuousQueryReplicatedOneNodeSelfTest.java | 120 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 4 +-
5 files changed, 144 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db44f11/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index cc6c19a..ae96f23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -556,7 +556,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
qry.getPageSize(),
qry.getTimeInterval(),
qry.isAutoUnsubscribe(),
- loc ? ctx.grid().cluster().forLocal() : null);
+ loc);
final QueryCursor<Cache.Entry<K, V>> cur =
qry.getInitialQuery() != null ? query(qry.getInitialQuery()) : null;
@@ -1896,4 +1896,4 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
@Override public String toString() {
return S.toString(IgniteCacheProxy.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db44f11/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index c719f1e..6a151a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -43,10 +43,9 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.ContinuousQuery;
-import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -55,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.resources.LoggerResource;
import org.jsr166.ConcurrentHashMap8;
@@ -271,7 +271,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @param bufSize Buffer size.
* @param timeInterval Time interval.
* @param autoUnsubscribe Auto unsubscribe flag.
- * @param grp Cluster group.
+ * @param loc Local flag.
* @return Continuous routine ID.
* @throws IgniteCheckedException In case of error.
*/
@@ -280,7 +280,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
int bufSize,
long timeInterval,
boolean autoUnsubscribe,
- ClusterGroup grp) throws IgniteCheckedException
+ boolean loc) throws IgniteCheckedException
{
return executeQuery0(
locLsnr,
@@ -293,7 +293,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
true,
false,
true,
- grp);
+ loc);
}
/**
@@ -321,7 +321,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
true,
false,
true,
- loc ? cctx.grid().cluster().forLocal() : null);
+ loc);
}
/**
@@ -383,7 +383,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @param oldValRequired Old value required flag.
* @param sync Synchronous flag.
* @param ignoreExpired Ignore expired event flag.
- * @param grp Cluster group.
+ * @param loc Local flag.
* @return Continuous routine ID.
* @throws IgniteCheckedException In case of error.
*/
@@ -397,44 +397,15 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
boolean oldValRequired,
boolean sync,
boolean ignoreExpired,
- ClusterGroup grp) throws IgniteCheckedException
+ boolean loc) throws IgniteCheckedException
{
cctx.checkSecurity(SecurityPermission.CACHE_READ);
- if (grp == null)
- grp = cctx.kernalContext().grid().cluster();
-
- Collection<ClusterNode> nodes = grp.nodes();
-
- if (nodes.isEmpty())
- throw new ClusterTopologyException("Failed to execute continuous query (empty cluster group is " +
- "provided).");
-
- boolean skipPrimaryCheck = false;
-
- switch (cctx.config().getCacheMode()) {
- case LOCAL:
- if (!nodes.contains(cctx.localNode()))
- throw new ClusterTopologyException("Continuous query for LOCAL cache can be executed " +
- "only locally (provided projection contains remote nodes only).");
- else if (nodes.size() > 1)
- U.warn(log, "Continuous query for LOCAL cache will be executed locally (provided projection is " +
- "ignored).");
-
- grp = grp.forNode(cctx.localNode());
-
- break;
-
- case REPLICATED:
- if (nodes.size() == 1 && F.first(nodes).equals(cctx.localNode()))
- skipPrimaryCheck = cctx.affinityNode();
-
- break;
- }
-
int taskNameHash = !internal && cctx.kernalContext().security().enabled() ?
cctx.kernalContext().job().currentTaskNameHash() : 0;
+ boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode();
+
GridContinuousHandler hnd = new CacheContinuousQueryHandler(
cctx.name(),
TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
@@ -448,12 +419,17 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
taskNameHash,
skipPrimaryCheck);
+ IgnitePredicate<ClusterNode> pred = null;
+
+ if (loc || cctx.config().getCacheMode() == CacheMode.LOCAL)
+ pred = F.nodeForNodeId(cctx.localNodeId());
+
UUID id = cctx.kernalContext().continuous().startRoutine(
hnd,
bufSize,
timeInterval,
autoUnsubscribe,
- grp.predicate()).get();
+ pred).get();
if (notifyExisting) {
final Iterator<GridCacheEntryEx> it = cctx.cache().allEntries().iterator();
@@ -635,7 +611,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
cfg.isOldValueRequired(),
cfg.isSynchronous(),
false,
- null);
+ false);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db44f11/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 18c1f36..e29bdd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -795,7 +795,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
try {
IgnitePredicate<ClusterNode> prjPred = data.projectionPredicate();
- ctx.resource().injectGeneric(prjPred);
+ if (prjPred != null)
+ ctx.resource().injectGeneric(prjPred);
if (prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) {
registered = registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db44f11/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedOneNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedOneNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedOneNodeSelfTest.java
new file mode 100644
index 0000000..8152b2a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedOneNodeSelfTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test for replicated cache with one node.
+ */
+public class GridCacheContinuousQueryReplicatedOneNodeSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setCacheMode(CacheMode.REPLICATED);
+ cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+
+ cfg.setCacheConfiguration(cacheCfg);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocal() throws Exception {
+ doTest(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDistributed() throws Exception {
+ doTest(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void doTest(boolean loc) throws Exception {
+ try {
+ IgniteCache<String, Integer> cache = startGrid(0).cache(null);
+
+ ContinuousQuery<String, Integer> qry = new ContinuousQuery<>();
+
+ final AtomicInteger cnt = new AtomicInteger();
+ final CountDownLatch latch = new CountDownLatch(10);
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<String, Integer>() {
+ @Override
+ public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends Integer>> evts)
+ throws CacheEntryListenerException {
+ for (CacheEntryEvent<? extends String, ? extends Integer> evt : evts) {
+ cnt.incrementAndGet();
+ latch.countDown();
+ }
+ }
+ });
+
+ cache.query(qry.setLocal(loc));
+
+ startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ for (int i = 0; i < 10; i++)
+ cache.put("key" + i, i);
+
+ assert latch.await(5000, TimeUnit.MILLISECONDS);
+
+ assertEquals(10, cnt.get());
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db44f11/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 41670d1..fe54b63 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionedP2PDisabledSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionedSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedOneNodeSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedP2PDisabledSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
@@ -158,6 +159,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class);
suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);
suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryReplicatedOneNodeSelfTest.class);
// Reduce fields queries.
suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);
@@ -187,4 +189,4 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
return suite;
}
-}
\ No newline at end of file
+}
[6/6] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-1.4'
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-1.4'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b56b15cd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b56b15cd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b56b15cd
Branch: refs/heads/master
Commit: b56b15cda7fc94b25400b1334b365053f0017f7f
Parents: b3bcf4a 7db44f1
Author: sboikov <sb...@gridgain.com>
Authored: Thu Sep 24 09:09:46 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Sep 24 09:09:46 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/IgniteAtomicLong.java | 2 +-
.../apache/ignite/internal/IgniteKernal.java | 4 +-
.../cache/DynamicCacheDescriptor.java | 10 +-
.../GridCachePartitionExchangeManager.java | 6 +
.../processors/cache/GridCacheProcessor.java | 18 +-
.../processors/cache/IgniteCacheProxy.java | 4 +-
.../continuous/CacheContinuousQueryManager.java | 66 ++-
.../continuous/GridContinuousProcessor.java | 3 +-
.../org/apache/ignite/mxbean/IgniteMXBean.java | 8 +-
.../communication/tcp/TcpCommunicationSpi.java | 7 +-
.../discovery/DiscoverySpiCustomMessage.java | 12 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 398 +++++++++++++------
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 6 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +-
.../tcp/internal/TcpDiscoveryNodesRing.java | 94 ++---
.../messages/TcpDiscoveryDiscardMessage.java | 15 +-
.../TcpDiscoveryNodeAddFinishedMessage.java | 2 +-
.../messages/TcpDiscoveryNodeAddedMessage.java | 19 +-
.../GridCacheAbstractRemoveFailureTest.java | 6 +-
.../distributed/CacheAffEarlySelfTest.java | 245 ------------
.../distributed/CacheAffinityEarlyTest.java | 168 ++++++++
...GridCacheValueConsistencyAtomicSelfTest.java | 2 +-
...ontinuousQueryReplicatedOneNodeSelfTest.java | 120 ++++++
...lientDiscoverySpiFailureTimeoutSelfTest.java | 33 +-
.../tcp/TcpDiscoveryMultiThreadedTest.java | 53 ++-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 315 ++++++++++++++-
.../testsuites/IgniteCacheTestSuite4.java | 2 +
.../IgniteCacheQuerySelfTestSuite.java | 4 +-
28 files changed, 1099 insertions(+), 525 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b56b15cd/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java
----------------------------------------------------------------------
[4/6] ignite git commit: Exposed IgniteKernal.dumpDebugInfo() to MX
bean
Posted by sb...@apache.org.
Exposed IgniteKernal.dumpDebugInfo() to MX bean
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/70a8a92d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/70a8a92d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/70a8a92d
Branch: refs/heads/master
Commit: 70a8a92da3cf39b0ed1cf18effd8fae1478cb2bd
Parents: 6f3ef6a
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Sep 23 17:23:31 2015 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Sep 23 17:23:31 2015 -0700
----------------------------------------------------------------------
.../main/java/org/apache/ignite/internal/IgniteKernal.java | 4 +---
.../src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java | 8 +++++++-
2 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/70a8a92d/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 82db059..60725e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -3153,9 +3153,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
return ctx.isDaemon() && U.hasAnnotation(comp.getClass(), SkipDaemon.class);
}
- /**
- *
- */
+ /** {@inheritDoc} */
public void dumpDebugInfo() {
U.warn(log, "Dumping debug info for node [id=" + ctx.localNodeId() +
", name=" + ctx.gridName() +
http://git-wip-us.apache.org/repos/asf/ignite/blob/70a8a92d/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
index 4755bf2..c30e0e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
@@ -366,4 +366,10 @@ public interface IgniteMXBean {
*/
@MXBeanDescription("Prints last suppressed errors.")
public void printLastErrors();
-}
\ No newline at end of file
+
+ /**
+ * Dumps debug information for the current node.
+ */
+ @MXBeanDescription("Dumps debug information for the current node.")
+ public void dumpDebugInfo();
+}