You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/19 17:00:37 UTC
[1/8] incubator-ignite git commit: Changed default timeout:
SocketWrite - 200 AckTimeout - 50 HeartbeatFrequency -100 NetworkTimeout -
5000
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-23 a2a6f31fe -> 72d6ea5d1
Changed default timeout:
SocketWrite - 200
AckTimeout - 50
HeartbeatFrequency -100
NetworkTimeout - 5000
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/df25d350
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/df25d350
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/df25d350
Branch: refs/heads/ignite-23
Commit: df25d35067beb8d5f40c97139eac0c06310b6666
Parents: c3dde57
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon May 18 11:41:41 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon May 18 11:41:41 2015 +0300
----------------------------------------------------------------------
.../spi/discovery/tcp/TcpDiscoverySpiAdapter.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/df25d350/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index b7e3cd5..802da02 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -54,17 +54,17 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
/** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */
public static final long DFLT_SOCK_TIMEOUT = 200;
- /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>200ms</tt>). */
- public static final long DFLT_ACK_TIMEOUT = 200;
+ /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>50ms</tt>). */
+ public static final long DFLT_ACK_TIMEOUT = 50;
- /** Default network timeout in milliseconds (value is <tt>200ms</tt>). */
- public static final long DFLT_NETWORK_TIMEOUT = 200;
+ /** Default network timeout in milliseconds (value is <tt>5000ms</tt>). */
+ public static final long DFLT_NETWORK_TIMEOUT = 5000;
/** Default value for thread priority (value is <tt>10</tt>). */
public static final int DFLT_THREAD_PRI = 10;
- /** Default heartbeat messages issuing frequency (value is <tt>300ms</tt>). */
- public static final long DFLT_HEARTBEAT_FREQ = 300;
+ /** Default heartbeat messages issuing frequency (value is <tt>100ms</tt>). */
+ public static final long DFLT_HEARTBEAT_FREQ = 100;
/** Default size of topology snapshots history. */
public static final int DFLT_TOP_HISTORY_SIZE = 1000;
[3/8] incubator-ignite git commit: Merge remote-tracking branch
'origin/ignite-sprint-5' into ignite-sprint-5
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-sprint-5' into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/36805cc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/36805cc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/36805cc1
Branch: refs/heads/ignite-23
Commit: 36805cc1dd157b343301ff4661bbdf3db5b596ff
Parents: 489323b c3dde57
Author: avinogradov <av...@gridgain.com>
Authored: Mon May 18 12:55:06 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Mon May 18 12:55:06 2015 +0300
----------------------------------------------------------------------
.../socket/WordsSocketStreamerServer.java | 2 +-
.../org/apache/ignite/internal/IgnitionEx.java | 136 ++++---------
.../internal/interop/InteropBootstrap.java | 34 ++++
.../interop/InteropBootstrapFactory.java | 39 ++++
.../internal/interop/InteropIgnition.java | 166 ++++++++++++++++
.../internal/interop/InteropProcessor.java | 36 ++++
.../processors/cache/GridCacheAdapter.java | 8 +-
.../processors/cache/GridCacheMapEntry.java | 35 +---
.../distributed/GridDistributedLockRequest.java | 111 ++---------
.../GridDistributedTxFinishRequest.java | 70 ++-----
.../GridDistributedTxPrepareRequest.java | 112 +++--------
.../GridDistributedTxRemoteAdapter.java | 20 +-
.../distributed/dht/GridDhtLockFuture.java | 2 -
.../distributed/dht/GridDhtLockRequest.java | 45 ++---
.../dht/GridDhtTransactionalCacheAdapter.java | 6 -
.../distributed/dht/GridDhtTxFinishFuture.java | 3 -
.../distributed/dht/GridDhtTxFinishRequest.java | 43 ++---
.../cache/distributed/dht/GridDhtTxLocal.java | 6 -
.../distributed/dht/GridDhtTxLocalAdapter.java | 68 +------
.../distributed/dht/GridDhtTxPrepareFuture.java | 18 +-
.../dht/GridDhtTxPrepareRequest.java | 60 +++---
.../cache/distributed/dht/GridDhtTxRemote.java | 8 +-
.../colocated/GridDhtColocatedLockFuture.java | 6 -
.../distributed/near/GridNearLockFuture.java | 6 -
.../distributed/near/GridNearLockRequest.java | 61 +++---
.../near/GridNearOptimisticTxPrepareFuture.java | 15 +-
.../GridNearPessimisticTxPrepareFuture.java | 2 -
.../near/GridNearTransactionalCache.java | 4 -
.../near/GridNearTxFinishRequest.java | 28 +--
.../cache/distributed/near/GridNearTxLocal.java | 20 +-
.../near/GridNearTxPrepareRequest.java | 52 +++--
.../distributed/near/GridNearTxRemote.java | 24 +--
.../cache/transactions/IgniteInternalTx.java | 10 -
.../transactions/IgniteTransactionsImpl.java | 4 +-
.../cache/transactions/IgniteTxAdapter.java | 72 +------
.../cache/transactions/IgniteTxEntry.java | 48 +----
.../cache/transactions/IgniteTxHandler.java | 6 -
.../transactions/IgniteTxLocalAdapter.java | 165 ++--------------
.../cache/transactions/IgniteTxLocalEx.java | 21 +-
.../cache/transactions/IgniteTxManager.java | 62 +-----
.../spi/discovery/tcp/TcpDiscoverySpi.java | 26 ---
.../near/IgniteCacheNearOnlyTxTest.java | 190 +++++++++++++++++++
.../processors/cache/jta/CacheJtaManager.java | 4 +-
43 files changed, 744 insertions(+), 1110 deletions(-)
----------------------------------------------------------------------
[4/8] incubator-ignite git commit: Merge branch 'ignite-timeout' into
ignite-sprint-5
Posted by sb...@apache.org.
Merge branch 'ignite-timeout' into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/15d55b1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/15d55b1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/15d55b1a
Branch: refs/heads/ignite-23
Commit: 15d55b1a5fcaee7c91f450426bc537f3dffd77cf
Parents: 36805cc df25d35
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Mon May 18 14:49:20 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Mon May 18 14:49:20 2015 +0300
----------------------------------------------------------------------
.../spi/discovery/tcp/TcpDiscoverySpiAdapter.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
[2/8] incubator-ignite git commit: GG-7190
Posted by sb...@apache.org.
GG-7190
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/489323b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/489323b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/489323b0
Branch: refs/heads/ignite-23
Commit: 489323b0175f6eba897caf3f40f4f0fcb970df19
Parents: 04774b5f
Author: avinogradov <av...@gridgain.com>
Authored: Mon May 18 12:54:41 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Mon May 18 12:54:41 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/489323b0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 5738778..7fa0a03 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -64,6 +64,7 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(GridCacheNearMultiNodeSelfTest.class));
suite.addTest(new TestSuite(GridCacheAtomicNearMultiNodeSelfTest.class));
suite.addTest(new TestSuite(GridCacheNearReadersSelfTest.class));
+ suite.addTest(new TestSuite(GridCacheNearReaderPreloadSelfTest.class));
suite.addTest(new TestSuite(GridCacheAtomicNearReadersSelfTest.class));
suite.addTest(new TestSuite(GridCachePartitionedAffinitySelfTest.class));
suite.addTest(new TestSuite(GridCacheRendezvousAffinityFunctionExcludeNeighborsSelfTest.class));
[8/8] incubator-ignite git commit: # ignite-23 skip client nodes from
partition exchange
Posted by sb...@apache.org.
# ignite-23 skip client nodes from partition exchange
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/72d6ea5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/72d6ea5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/72d6ea5d
Branch: refs/heads/ignite-23
Commit: 72d6ea5d1d772594d9ef4c567b2011024268475b
Parents: d6f5f15
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 19 14:49:37 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 19 17:47:15 2015 +0300
----------------------------------------------------------------------
.../ignite/internal/GridKernalContext.java | 5 +
.../ignite/internal/GridKernalContextImpl.java | 5 +
.../discovery/GridDiscoveryManager.java | 64 ++-
.../GridCachePartitionExchangeManager.java | 76 ++-
.../processors/cache/GridCachePreloader.java | 4 +-
.../cache/GridCachePreloaderAdapter.java | 4 +-
.../cache/GridCacheSharedContext.java | 1 +
.../processors/cache/GridCacheUtils.java | 58 +--
.../dht/GridDhtPartitionTopologyImpl.java | 4 +-
.../preloader/GridDhtPartitionDemandPool.java | 20 +-
.../preloader/GridDhtPartitionSupplyPool.java | 6 +-
.../GridDhtPartitionsExchangeFuture.java | 223 +++++----
.../preloader/GridDhtPartitionsFullMessage.java | 4 +-
.../GridDhtPartitionsSingleMessage.java | 33 +-
.../dht/preloader/GridDhtPreloader.java | 14 +-
.../preloader/GridDhtPreloaderAssignments.java | 3 +-
.../GridCacheAbstractRemoveFailureTest.java | 20 +
...niteCacheClientNodeChangingTopologyTest.java | 42 ++
.../IgniteCacheClientNodeExchangeTest.java | 184 -------
...teCacheClientNodePartitionsExchangeTest.java | 486 +++++++++++++++++++
.../GridCacheDhtClientRemoveFailureTest.java | 28 ++
...cClientInvalidPartitionHandlingSelfTest.java | 29 ++
.../GridCacheAtomicClientRemoveFailureTest.java | 28 ++
...eAtomicInvalidPartitionHandlingSelfTest.java | 12 +
.../IgniteCacheFailoverTestSuite.java | 3 +
.../testsuites/IgniteCacheTestSuite2.java | 2 +-
26 files changed, 985 insertions(+), 373 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index ad7d562..d6542f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -552,4 +552,9 @@ public interface GridKernalContext extends Iterable<GridComponent> {
* @return Marshaller context.
*/
public MarshallerContextImpl marshallerContext();
+
+ /**
+ * @return {@code True} if local node is client node (has flag {@link IgniteConfiguration#isClientMode()} set).
+ */
+ public boolean clientNode();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 1ff483e..f921d49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -894,6 +894,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public boolean clientNode() {
+ return cfg.isClientMode();
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridKernalContextImpl.class, this);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 62548d8..7130421 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1246,13 +1246,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Gets alive remote nodes with at least one cache configured.
+ * Gets alive remote server nodes with at least one cache configured.
*
* @param topVer Topology version (maximum allowed node order).
* @return Collection of alive cache nodes.
*/
- public Collection<ClusterNode> aliveRemoteNodesWithCaches(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).aliveRemoteNodesWithCaches(topVer.topologyVersion());
+ public Collection<ClusterNode> aliveRemoteServerNodesWithCaches(AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(null, topVer).aliveRemoteServerNodesWithCaches(topVer.topologyVersion());
+ }
+
+ /**
+ * Gets alive server nodes with at least one cache configured.
+ *
+ * @param topVer Topology version (maximum allowed node order).
+ * @return Collection of alive cache nodes.
+ */
+ public Collection<ClusterNode> aliveServerNodesWithCaches(AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(null, topVer).aliveServerNodesWithCaches(topVer.topologyVersion());
}
/**
@@ -1350,7 +1360,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
// Find the eldest acceptable discovery cache.
Map.Entry<AffinityTopologyVersion, DiscoCache> eldest = Collections.min(discoCacheHist.entrySet(), histCmp);
- if (topVer.compareTo(eldest.getKey()) < 0)
+ if (topVer.compareTo(eldest.getKey()) <= 0)
cache = eldest.getValue();
}
@@ -2094,9 +2104,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
private final Collection<ClusterNode> aliveNodesWithCaches;
/**
- * Cached alive remote nodes with caches.
+ * Cached alive server remote nodes with caches.
+ */
+ private final Collection<ClusterNode> aliveSrvNodesWithCaches;
+
+ /**
+ * Cached alive remote server nodes with caches.
*/
- private final Collection<ClusterNode> aliveRmtNodesWithCaches;
+ private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches;
/**
* @param loc Local node.
@@ -2131,7 +2146,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
aliveNodesWithCaches = new ConcurrentSkipListSet<>();
- aliveRmtNodesWithCaches = new ConcurrentSkipListSet<>();
+ aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>();
+ aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>();
nodesByVer = new TreeMap<>();
long maxOrder0 = 0;
@@ -2183,8 +2199,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (alive(node.id())) {
aliveNodesWithCaches.add(node);
- if (!loc.id().equals(node.id()))
- aliveRmtNodesWithCaches.add(node);
+ if (!CU.clientNode(node)) {
+ aliveSrvNodesWithCaches.add(node);
+
+ if (!loc.id().equals(node.id()))
+ aliveRmtSrvNodesWithCaches.add(node);
+ }
}
}
@@ -2269,13 +2289,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * @return All nodes with at least one cache configured.
- */
- Collection<ClusterNode> allNodesWithCaches() {
- return allNodesWithCaches;
- }
-
- /**
* Gets collection of nodes which have version equal or greater than {@code ver}.
*
* @param ver Version to check.
@@ -2374,13 +2387,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Gets all alive remote nodes with at least one cache configured.
+ * Gets all alive remote server nodes with at least one cache configured.
+ *
+ * @param topVer Topology version.
+ * @return Collection of nodes.
+ */
+ Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) {
+ return filter(topVer, aliveRmtSrvNodesWithCaches);
+ }
+
+ /**
+ * Gets all alive server nodes with at least one cache configured.
*
* @param topVer Topology version.
* @return Collection of nodes.
*/
- Collection<ClusterNode> aliveRemoteNodesWithCaches(final long topVer) {
- return filter(topVer, aliveRmtNodesWithCaches);
+ Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) {
+ return filter(topVer, aliveSrvNodesWithCaches);
}
/**
@@ -2417,7 +2440,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
filterNodeMap(aliveRmtCacheNodes, leftNode);
aliveNodesWithCaches.remove(leftNode);
- aliveRmtNodesWithCaches.remove(leftNode);
+ aliveSrvNodesWithCaches.remove(leftNode);
+ aliveRmtSrvNodesWithCaches.remove(leftNode);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index c399c23..cc06d4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -554,7 +554,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* Partition refresh callback.
*/
void refreshPartitions() {
- ClusterNode oldest = CU.oldest(cctx);
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+
+ if (oldest == null) {
+ if (log.isDebugEnabled())
+ log.debug("Skip partitions refresh, there are no server nodes [loc=" + cctx.localNodeId() + ']');
+
+ return;
+ }
if (log.isDebugEnabled())
log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']');
@@ -641,7 +648,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
private boolean sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id)
throws IgniteCheckedException {
- GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last());
+ GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
+ cctx.kernalContext().clientNode(),
+ cctx.versions().last());
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (!cacheCtx.isLocal()) {
@@ -687,6 +696,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @param exchId Exchange ID.
* @param discoEvt Discovery event.
+ * @param reqs Cache change requests.
* @return Exchange future.
*/
GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
@@ -827,7 +837,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param node Node ID.
* @param msg Message.
*/
- private void processSinglePartitionUpdate(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
+ private void processSinglePartitionUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
if (!enterBusy())
return;
@@ -858,8 +868,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (updated)
scheduleResendPartitions();
}
- else
- exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
+ else {
+ if (msg.client()) {
+ IgniteInternalFuture<?> fut = affinityReadyFuture(msg.exchangeId().topologyVersion());
+
+ if (fut != null) {
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ processSinglePartitionClientUpdate(node, msg);
+ }
+ });
+ }
+ else
+ processSinglePartitionClientUpdate(node, msg);
+ }
+ else
+ exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
+ }
}
finally {
leaveBusy();
@@ -867,6 +892,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * @param node Node.
+ * @param msg Message.
+ */
+ private void processSinglePartitionClientUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
+ final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
+ null,
+ null);
+
+ exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ // Finished future should reply only to sender client node.
+ exchFut.onReceive(node.id(), msg);
+ }
+ });
+ }
+
+ /**
* @param node Node ID.
* @param msg Message.
*/
@@ -982,7 +1024,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
busy = true;
- Map<Integer, GridDhtPreloaderAssignments<K, V>> assignsMap = new HashMap<>();
+ Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
boolean dummyReassign = exchFut.dummyReassign();
boolean forcePreload = exchFut.forcePreload();
@@ -1017,7 +1059,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
changed |= cacheCtx.topology().afterExchange(exchFut);
// Preload event notification.
- if (cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED)) {
+ if (!exchFut.skipPreload() && cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED)) {
if (!cacheCtx.isReplicated() || !startEvtFired) {
DiscoveryEvent discoEvt = exchFut.discoveryEvent();
@@ -1043,16 +1085,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- long delay = cacheCtx.config().getRebalanceDelay();
+ if (!exchFut.skipPreload()) {
+ assignsMap = new HashMap<>();
- GridDhtPreloaderAssignments<K, V> assigns = null;
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ long delay = cacheCtx.config().getRebalanceDelay();
+
+ GridDhtPreloaderAssignments assigns = null;
- // Don't delay for dummy reassigns to avoid infinite recursion.
- if (delay == 0 || forcePreload)
- assigns = cacheCtx.preloader().assign(exchFut);
+ // Don't delay for dummy reassigns to avoid infinite recursion.
+ if (delay == 0 || forcePreload)
+ assigns = cacheCtx.preloader().assign(exchFut);
- assignsMap.put(cacheCtx.cacheId(), assigns);
+ assignsMap.put(cacheCtx.cacheId(), assigns);
+ }
}
}
finally {
@@ -1061,7 +1107,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
if (assignsMap != null) {
- for (Map.Entry<Integer, GridDhtPreloaderAssignments<K, V>> e : assignsMap.entrySet()) {
+ for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
int cacheId = e.getKey();
GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 2e181f9..5a73843 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -78,7 +78,7 @@ public interface GridCachePreloader<K, V> {
* @param exchFut Exchange future to assign.
* @return Assignments.
*/
- public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut);
+ public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut);
/**
* Adds assignments to preloader.
@@ -86,7 +86,7 @@ public interface GridCachePreloader<K, V> {
* @param assignments Assignments to add.
* @param forcePreload Force preload flag.
*/
- public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload);
+ public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload);
/**
* @param p Preload predicate.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 80d3d6b..8cd5264 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -131,12 +131,12 @@ public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V>
}
/** {@inheritDoc} */
- @Override public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) {
+ @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
return null;
}
/** {@inheritDoc} */
- @Override public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload) {
+ @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
// No-op.
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 294c2b0..4c08beb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -499,6 +499,7 @@ public class GridCacheSharedContext<K, V> {
/**
* @param tx Transaction to rollback.
* @throws IgniteCheckedException If failed.
+ * @return Rollback future.
*/
public IgniteInternalFuture rollbackTxAsync(IgniteInternalTx tx) throws IgniteCheckedException {
Collection<Integer> cacheIds = tx.activeCacheIds();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index ef04ff4..b7bc115 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -527,8 +527,9 @@ public class GridCacheUtils {
* @param topOrder Maximum allowed node order.
* @return Affinity nodes.
*/
- public static Collection<ClusterNode> aliveRemoteCacheNodes(final GridCacheSharedContext ctx, AffinityTopologyVersion topOrder) {
- return ctx.discovery().aliveRemoteNodesWithCaches(topOrder);
+ public static Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final GridCacheSharedContext ctx,
+ AffinityTopologyVersion topOrder) {
+ return ctx.discovery().aliveRemoteServerNodesWithCaches(topOrder);
}
/**
@@ -607,26 +608,6 @@ public class GridCacheUtils {
* Gets oldest alive node for specified topology version.
*
* @param cctx Cache context.
- * @return Oldest node for the current topology version.
- */
- public static ClusterNode oldest(GridCacheContext cctx) {
- return oldest(cctx, AffinityTopologyVersion.NONE);
- }
-
- /**
- * Gets oldest alive node across nodes with at least one cache configured.
- *
- * @param ctx Cache context.
- * @return Oldest node.
- */
- public static ClusterNode oldest(GridCacheSharedContext ctx) {
- return oldest(ctx, AffinityTopologyVersion.NONE);
- }
-
- /**
- * Gets oldest alive node for specified topology version.
- *
- * @param cctx Cache context.
* @param topOrder Maximum allowed node order.
* @return Oldest node for the given topology version.
*/
@@ -665,6 +646,23 @@ public class GridCacheUtils {
}
/**
+ * Gets oldest alive server node with at least one cache configured for specified topology version.
+ *
+ * @param ctx Context.
+ * @param topVer Maximum allowed topology version.
+ * @return Oldest alive cache server node.
+ */
+ @Nullable public static ClusterNode oldestAliveCacheServerNode(GridCacheSharedContext ctx,
+ AffinityTopologyVersion topVer) {
+ Collection<ClusterNode> nodes = ctx.discovery().aliveServerNodesWithCaches(topVer);
+
+ if (nodes.isEmpty())
+ return null;
+
+ return oldest(nodes);
+ }
+
+ /**
* @param nodes Nodes.
* @return Oldest node for the given topology version.
*/
@@ -1802,16 +1800,22 @@ public class GridCacheUtils {
/**
* @param node Node.
- * @param filter Node filter.
- * @return {@code True} if node is not client node and pass given filter.
+ * @return {@code True} if given node is client node (has flag {@link IgniteConfiguration#isClientMode()} set).
*/
- public static boolean affinityNode(ClusterNode node, IgnitePredicate<ClusterNode> filter) {
+ public static boolean clientNode(ClusterNode node) {
Boolean clientModeAttr = node.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE);
assert clientModeAttr != null : node;
- boolean clientMode = clientModeAttr != null && clientModeAttr;
+ return clientModeAttr != null && clientModeAttr;
+ }
- return !clientMode && filter.apply(node);
+ /**
+ * @param node Node.
+ * @param filter Node filter.
+ * @return {@code True} if node is not client node and pass given filter.
+ */
+ public static boolean affinityNode(ClusterNode node, IgnitePredicate<ClusterNode> filter) {
+ return !clientNode(node) && filter.apply(node);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 073e0e7..56f6a62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -239,7 +239,9 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
removeNode(exchId.nodeId());
// In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = CU.oldest(cctx.shared(), topVer);
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+ assert oldest != null;
if (log.isDebugEnabled())
log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 633f237..1071468 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -53,12 +53,12 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
* and populating local cache.
*/
@SuppressWarnings("NonConstantFieldWithUpperCaseName")
-public class GridDhtPartitionDemandPool<K, V> {
+public class GridDhtPartitionDemandPool {
/** Dummy message to wake up a blocking queue if a node leaves. */
private final SupplyMessage DUMMY_TOP = new SupplyMessage();
/** */
- private final GridCacheContext<K, V> cctx;
+ private final GridCacheContext<?, ?> cctx;
/** */
private final IgniteLogger log;
@@ -99,7 +99,7 @@ public class GridDhtPartitionDemandPool<K, V> {
* @param cctx Cache context.
* @param busyLock Shutdown lock.
*/
- public GridDhtPartitionDemandPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) {
+ public GridDhtPartitionDemandPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
assert cctx != null;
assert busyLock != null;
@@ -327,7 +327,7 @@ public class GridDhtPartitionDemandPool<K, V> {
* @param assigns Assignments.
* @param force {@code True} if dummy reassign.
*/
- void addAssignments(final GridDhtPreloaderAssignments<K, V> assigns, boolean force) {
+ void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) {
if (log.isDebugEnabled())
log.debug("Adding partition assignments: " + assigns);
@@ -399,7 +399,7 @@ public class GridDhtPartitionDemandPool<K, V> {
private int id;
/** Partition-to-node assignments. */
- private final LinkedBlockingDeque<GridDhtPreloaderAssignments<K, V>> assignQ = new LinkedBlockingDeque<>();
+ private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
/** Message queue. */
private final LinkedBlockingDeque<SupplyMessage> msgQ =
@@ -425,7 +425,7 @@ public class GridDhtPartitionDemandPool<K, V> {
/**
* @param assigns Assignments.
*/
- void addAssignments(GridDhtPreloaderAssignments<K, V> assigns) {
+ void addAssignments(GridDhtPreloaderAssignments assigns) {
assert assigns != null;
assignQ.offer(assigns);
@@ -885,7 +885,7 @@ public class GridDhtPartitionDemandPool<K, V> {
}
// Sync up all demand threads at this step.
- GridDhtPreloaderAssignments<K, V> assigns = null;
+ GridDhtPreloaderAssignments assigns = null;
while (assigns == null)
assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this);
@@ -995,12 +995,12 @@ public class GridDhtPartitionDemandPool<K, V> {
* @param exchFut Exchange future.
* @return Assignments of partitions to nodes.
*/
- GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) {
+ GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
// No assignments for disabled preloader.
GridDhtPartitionTopology top = cctx.dht().topology();
if (!cctx.rebalanceEnabled())
- return new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion());
+ return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
int partCnt = cctx.affinity().partitions();
@@ -1009,7 +1009,7 @@ public class GridDhtPartitionDemandPool<K, V> {
"Topology version mismatch [exchId=" + exchFut.exchangeId() +
", topVer=" + top.topologyVersion() + ']';
- GridDhtPreloaderAssignments<K, V> assigns = new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion());
+ GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
AffinityTopologyVersion topVer = assigns.topologyVersion();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 5d9677d..84ac7c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -43,9 +43,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
/**
* Thread pool for supplying partitions to demanding nodes.
*/
-class GridDhtPartitionSupplyPool<K, V> {
+class GridDhtPartitionSupplyPool {
/** */
- private final GridCacheContext<K, V> cctx;
+ private final GridCacheContext<?, ?> cctx;
/** */
private final IgniteLogger log;
@@ -72,7 +72,7 @@ class GridDhtPartitionSupplyPool<K, V> {
* @param cctx Cache context.
* @param busyLock Shutdown lock.
*/
- GridDhtPartitionSupplyPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) {
+ GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
assert cctx != null;
assert busyLock != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f4dcf3b..102176e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -45,6 +45,7 @@ import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
/**
@@ -118,8 +119,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
private GridFutureAdapter<Boolean> initFut;
/** Topology snapshot. */
- private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot =
- new AtomicReference<>();
+ private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>();
/** Last committed cache version before next topology version use. */
private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
@@ -150,6 +150,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/** Cache validation results. */
private volatile Map<Integer, Boolean> cacheValidRes;
+ /** Skip preload flag. */
+ private boolean skipPreload;
+
/**
* Dummy future created to trigger reassignments if partition
* topology changed while preloading.
@@ -227,23 +230,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
initFut = new GridFutureAdapter<>();
// Grab all nodes with order of equal or less than last joined node.
- Collection<ClusterNode> nodes = CU.aliveCacheNodes(cctx, exchId.topologyVersion());
-
- if (nodes.isEmpty()) {
- initFut.onDone(true);
+ ClusterNode node = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
- onDone(exchId.topologyVersion());
-
- return;
- }
-
- oldestNode.set(CU.oldest(nodes));
-
- assert oldestNode.get() != null;
+ oldestNode.set(node);
if (log.isDebugEnabled())
- log.debug("Creating exchange future [localNode=" + cctx.localNodeId() +
- ", fut=" + this + ']');
+ log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']');
}
/** {@inheritDoc} */
@@ -263,6 +255,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
+ * @return Skip preload flag.
+ */
+ public boolean skipPreload() {
+ return skipPreload;
+ }
+
+ /**
* @return Dummy flag.
*/
public boolean dummy() {
@@ -415,13 +414,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
- * @return Exchange id.
- */
- GridDhtPartitionExchangeId key() {
- return exchId;
- }
-
- /**
* @return Exchange ID.
*/
public GridDhtPartitionExchangeId exchangeId() {
@@ -429,13 +421,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
- * @return Init future.
- */
- IgniteInternalFuture<?> initFuture() {
- return initFut;
- }
-
- /**
* @return {@code true} if entered to busy state.
*/
private boolean enterBusy() {
@@ -464,8 +449,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (isDone())
return;
- assert oldestNode.get() != null;
-
if (init.compareAndSet(false, true)) {
if (isDone())
return;
@@ -475,63 +458,98 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
// will return corresponding nodes.
U.await(evtLatch);
- if (!dummy && !forcePreload && F.isEmpty(reqs)) { // If exchange initiated by node join or leave.
- assert discoEvt != null;
+ assert discoEvt != null : this;
+ assert !dummy && !forcePreload : this;
+ startCaches();
+
+ // True if client node joined or failed.
+ boolean clientNodeEvt;
+
+ if (F.isEmpty(reqs)) {
int type = discoEvt.type();
assert type == EVT_NODE_JOINED || type == EVT_NODE_LEFT || type == EVT_NODE_FAILED : discoEvt;
- ClusterNode node = discoEvt.eventNode();
+ clientNodeEvt = CU.clientNode(discoEvt.eventNode());
+ }
+ else {
+ assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt;
+
+ clientNodeEvt = false;
+ }
- if (!node.isLocal()) {
- boolean affNode = false;
+ if (clientNodeEvt) {
+ ClusterNode node = discoEvt.eventNode();
+ if (!node.isLocal()) { // Client need to initialize affinity for local join event.
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
continue;
- if (CU.affinityNode(node, cacheCtx.config().getNodeFilter())) {
- affNode = true;
+ cacheCtx.affinity().clientNodeTopologyChange(node, exchId.topologyVersion());
- break;
- }
+ GridDhtPartitionTopology top = cacheCtx.topology();
+
+ GridDhtPartitionMap parts = top.partitions(node.id());
+
+ assert parts == null || parts.size() == 0 : parts;
+
+ top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
}
- if (!affNode) {
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (cacheCtx.isLocal())
- continue;
+ if (exchId.isLeft())
+ cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
- cacheCtx.affinity().clientNodeTopologyChange(node, exchId.topologyVersion());
+ onDone(exchId.topologyVersion());
- GridDhtPartitionTopology top = cacheCtx.topology();
+ skipPreload = true;
- GridDhtPartitionMap parts = top.partitions(node.id());
+ return;
+ }
+ }
- assert parts == null || parts.size() == 0 : parts;
+ if (cctx.kernalContext().clientNode()) {
+ skipPreload = true;
- top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
- }
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal())
+ continue;
- if (!exchId.isLeft()) {
- rmtNodes = new ConcurrentLinkedQueue<>(F.asList(node));
+ GridDhtPartitionTopology top = cacheCtx.topology();
- rmtIds = F.asList(node.id());
- }
+ top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
+ }
- ready.set(true);
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal())
+ continue;
- initFut.onDone(true);
+ initTopology(cacheCtx);
+ }
- onDone(exchId.topologyVersion());
+ if (oldestNode.get() != null) {
+ rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx,
+ exchId.topologyVersion()));
- return;
- }
+ rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes)));
+
+ ready.set(true);
+
+ initFut.onDone(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Initialized future: " + this);
+
+ sendPartitions();
}
+ else
+ onDone(exchId.topologyVersion());
+
+ return;
}
- startCaches();
+ assert oldestNode.get() != null;
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (isCacheAdded(cacheCtx.cacheId())) {
@@ -614,7 +632,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
// Grab all alive remote nodes with order of equal or less than last joined node.
- rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx,
+ rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx,
exchId.topologyVersion()));
rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes)));
@@ -821,7 +839,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @throws IgniteCheckedException If failed.
*/
private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException {
- GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last());
+ GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
+ cctx.kernalContext().clientNode(), cctx.versions().last());
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (!cacheCtx.isLocal())
@@ -1091,9 +1110,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
log.debug("Received full partition map from unexpected node [oldest=" + curOldest.id() +
", unexpectedNodeId=" + nodeId + ']');
- ClusterNode sender = cctx.discovery().node(nodeId);
+ ClusterNode snd = cctx.discovery().node(nodeId);
- if (sender == null) {
+ if (snd == null) {
if (log.isDebugEnabled())
log.debug("Sender node left grid, will ignore message from unexpected node [nodeId=" + nodeId +
", exchId=" + msg.exchangeId() + ']');
@@ -1102,7 +1121,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
// Will process message later if sender node becomes oldest node.
- if (sender.order() > curOldest.order())
+ if (snd.order() > curOldest.order())
fullMsgs.put(nodeId, msg);
return;
@@ -1141,8 +1160,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (cacheCtx != null)
cacheCtx.topology().update(exchId, entry.getValue());
- else if (CU.oldest(cctx).isLocal())
- cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue());
+ else {
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+
+ if (oldest != null && oldest.isLocal())
+ cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue());
+ }
}
}
@@ -1201,40 +1224,47 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
boolean set = false;
- ClusterNode newOldest = CU.oldest(cctx, exchId.topologyVersion());
-
- // If local node is now oldest.
- if (newOldest.id().equals(cctx.localNodeId())) {
- synchronized (mux) {
- if (oldestNode.compareAndSet(oldest, newOldest)) {
- // If local node is just joining.
- if (exchId.nodeId().equals(cctx.localNodeId())) {
- try {
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal())
- cacheCtx.topology().beforeExchange(
- GridDhtPartitionsExchangeFuture.this);
+ for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) {
+ if (it.next().id().equals(nodeId))
+ it.remove();
+ }
+
+ ClusterNode newOldest = CU.oldest(rmtNodes);
+
+ if (newOldest != null) {
+ // If local node is now oldest.
+ if (newOldest.id().equals(cctx.localNodeId())) {
+ synchronized (mux) {
+ if (oldestNode.compareAndSet(oldest, newOldest)) {
+ // If local node is just joining.
+ if (exchId.nodeId().equals(cctx.localNodeId())) {
+ try {
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (!cacheCtx.isLocal())
+ cacheCtx.topology().beforeExchange(
+ GridDhtPartitionsExchangeFuture.this);
+ }
}
- }
- catch (IgniteCheckedException e) {
- onDone(e);
+ catch (IgniteCheckedException e) {
+ onDone(e);
- return;
+ return;
+ }
}
- }
- set = true;
+ set = true;
+ }
}
}
- }
- else {
- synchronized (mux) {
- set = oldestNode.compareAndSet(oldest, newOldest);
- }
+ else {
+ synchronized (mux) {
+ set = oldestNode.compareAndSet(oldest, newOldest);
+ }
- if (set && log.isDebugEnabled())
- log.debug("Reassigned oldest node [this=" + cctx.localNodeId() +
- ", old=" + oldest.id() + ", new=" + newOldest.id() + ']');
+ if (set && log.isDebugEnabled())
+ log.debug("Reassigned oldest node [this=" + cctx.localNodeId() +
+ ", old=" + oldest.id() + ", new=" + newOldest.id() + ']');
+ }
}
if (set) {
@@ -1256,9 +1286,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert rmtNodes != null;
- for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); )
+ for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) {
if (it.next().id().equals(nodeId))
it.remove();
+ }
if (allReceived() && ready.get() && replied.compareAndSet(false, true))
if (spreadPartitions())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 8256274..73794ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -59,8 +59,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/**
* @param id Exchange ID.
* @param lastVer Last version.
+ * @param topVer Topology version.
*/
- public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer,
+ public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id,
+ @Nullable GridCacheVersion lastVer,
@NotNull AffinityTopologyVersion topVer) {
super(id, lastVer);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 66140cd..713a80b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -45,6 +45,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** Serialized partitions. */
private byte[] partsBytes;
+ /** */
+ private boolean client;
+
/**
* Required by {@link Externalizable}.
*/
@@ -54,10 +57,22 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/**
* @param exchId Exchange ID.
+ * @param client Client message flag.
* @param lastVer Last version.
*/
- public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer) {
+ public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId,
+ boolean client,
+ @Nullable GridCacheVersion lastVer) {
super(exchId, lastVer);
+
+ this.client = client;
+ }
+
+ /**
+ * @return {@code True} if sent from client node.
+ */
+ public boolean client() {
+ return client;
}
/**
@@ -110,6 +125,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
switch (writer.state()) {
case 5:
+ if (!writer.writeBoolean("client", client))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
if (!writer.writeByteArray("partsBytes", partsBytes))
return false;
@@ -132,6 +153,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
switch (reader.state()) {
case 5:
+ client = reader.readBoolean("client");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
partsBytes = reader.readByteArray("partsBytes");
if (!reader.isLastRead())
@@ -151,7 +180,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 6;
+ return 7;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index d6373f0..61ba8b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -60,10 +60,10 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<K, V>> forceKeyFuts = newMap();
/** Partition suppliers. */
- private GridDhtPartitionSupplyPool<K, V> supplyPool;
+ private GridDhtPartitionSupplyPool supplyPool;
/** Partition demanders. */
- private GridDhtPartitionDemandPool<K, V> demandPool;
+ private GridDhtPartitionDemandPool demandPool;
/** Start future. */
private final GridFutureAdapter<Object> startFut;
@@ -158,8 +158,8 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
}
});
- supplyPool = new GridDhtPartitionSupplyPool<>(cctx, busyLock);
- demandPool = new GridDhtPartitionDemandPool<>(cctx, busyLock);
+ supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock);
+ demandPool = new GridDhtPartitionDemandPool(cctx, busyLock);
cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
@@ -253,12 +253,12 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) {
+ @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
return demandPool.assign(exchFut);
}
/** {@inheritDoc} */
- @Override public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload) {
+ @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
demandPool.addAssignments(assignments, forcePreload);
}
@@ -271,7 +271,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> syncFuture() {
- return demandPool.syncFuture();
+ return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>() : demandPool.syncFuture();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index 369fc68..2f6ef6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@ -27,8 +27,7 @@ import java.util.concurrent.*;
/**
* Partition to node assignments.
*/
-public class GridDhtPreloaderAssignments<K, V> extends
- ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> {
+public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
index c6ede61..d5d80ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
@@ -71,6 +72,16 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
private String sizePropVal;
/** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (testClientNode() && getTestGridName(0).equals(gridName))
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
@Override protected int gridCount() {
return GRID_CNT;
}
@@ -106,9 +117,18 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
}
/**
+ * @return {@code True} if test updates from client node.
+ */
+ protected boolean testClientNode() {
+ return false;
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testPutAndRemove() throws Exception {
+ assertEquals(testClientNode(), (boolean)grid(0).configuration().isClientMode());
+
final IgniteCache<Integer, Integer> sndCache0 = grid(0).cache(null);
final AtomicBoolean stop = new AtomicBoolean();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
new file mode 100644
index 0000000..c233bb9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ *
+ */
+public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ return cfg;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java
deleted file mode 100644
index 66db3c6..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.managers.communication.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.communication.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.eclipse.jetty.util.*;
-
-import java.util.*;
-
-/**
- *
- */
-public class IgniteCacheClientNodeExchangeTest extends GridCommonAbstractTest {
- /** */
- protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
- /** */
- private boolean client;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
-
- cfg.setClientMode(client);
-
- CacheConfiguration ccfg = new CacheConfiguration();
-
- cfg.setCacheConfiguration(ccfg);
-
- cfg.setCommunicationSpi(new TestCommunicationSpi());
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- super.afterTest();
-
- stopAllGrids();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testNoPartitionExchangeForClient() throws Exception {
- Ignite ignite0 = startGrid(0);
-
- TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi();
-
- Ignite ignite1 = startGrid(1);
-
- TestCommunicationSpi spi1 = (TestCommunicationSpi)ignite1.configuration().getCommunicationSpi();
-
- assertEquals(0, spi0.partitionsSingleMessages().size());
- assertEquals(1, spi0.partitionsFullMessages().size());
-
- assertEquals(1, spi1.partitionsSingleMessages().size());
- assertEquals(0, spi1.partitionsFullMessages().size());
-
- spi0.reset();
- spi1.reset();
-
- client = true;
-
- for (int i = 0; i < 3; i++) {
- log.info("Start client node: " + i);
-
- Ignite ignite2 = startGrid(2);
-
- TestCommunicationSpi spi2 = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
-
- assertEquals(0, spi0.partitionsSingleMessages().size());
- assertEquals(1, spi0.partitionsFullMessages().size());
-
- assertEquals(0, spi1.partitionsSingleMessages().size());
- assertEquals(0, spi1.partitionsFullMessages().size());
-
- assertEquals(1, spi2.partitionsSingleMessages().size());
- assertEquals(0, spi2.partitionsFullMessages().size());
-
- spi0.reset();
- spi1.reset();
- spi2.reset();
-
- log.info("Stop client node.");
-
- ignite2.close();
-
- assertEquals(0, spi0.partitionsSingleMessages().size());
- assertEquals(0, spi0.partitionsFullMessages().size());
-
- assertEquals(0, spi1.partitionsSingleMessages().size());
- assertEquals(0, spi1.partitionsFullMessages().size());
- }
- }
-
- /**
- * Test communication SPI.
- */
- private static class TestCommunicationSpi extends TcpCommunicationSpi {
- /** */
- private ConcurrentHashSet<GridDhtPartitionsSingleMessage> partSingleMsgs = new ConcurrentHashSet<>();
-
- /** */
- private ConcurrentHashSet<GridDhtPartitionsFullMessage> partFullMsgs = new ConcurrentHashSet<>();
-
- /** */
- @LoggerResource
- private IgniteLogger log;
-
- /** {@inheritDoc} */
- @Override public void sendMessage(ClusterNode node, Message msg) {
- super.sendMessage(node, msg);
-
- Object msg0 = ((GridIoMessage)msg).message();
-
- if (msg0 instanceof GridDhtPartitionsSingleMessage) {
- if (((GridDhtPartitionsSingleMessage)msg0).exchangeId() != null) {
- log.info("Partitions message: " + msg0.getClass().getSimpleName());
-
- partSingleMsgs.add((GridDhtPartitionsSingleMessage) msg0);
- }
- }
- else if (msg0 instanceof GridDhtPartitionsFullMessage) {
- if (((GridDhtPartitionsFullMessage)msg0).exchangeId() != null) {
- log.info("Partitions message: " + msg0.getClass().getSimpleName());
-
- partFullMsgs.add((GridDhtPartitionsFullMessage) msg0);
- }
- }
- }
-
- /**
- *
- */
- void reset() {
- partSingleMsgs.clear();
- partFullMsgs.clear();
- }
-
- /**
- * @return Sent partitions single messages.
- */
- Collection<GridDhtPartitionsSingleMessage> partitionsSingleMessages() {
- return partSingleMsgs;
- }
-
- /**
- * @return Sent partitions full messages.
- */
- Collection<GridDhtPartitionsFullMessage> partitionsFullMessages() {
- return partFullMsgs;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
new file mode 100644
index 0000000..3fac400
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.affinity.fair.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** */
+ private boolean fairAffinity;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ if (fairAffinity)
+ ccfg.setAffinity(new FairAffinityFunction());
+
+ cfg.setCacheConfiguration(ccfg);
+
+ cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServerNodeLeave() throws Exception {
+ Ignite ignite0 = startGrid(0);
+
+ client = true;
+
+ final Ignite ignite1 = startGrid(1);
+
+ waitForTopologyUpdate(2, 2);
+
+ final Ignite ignite2 = startGrid(2);
+
+ waitForTopologyUpdate(3, 3);
+
+ ignite0.close();
+
+ waitForTopologyUpdate(2, 4);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ignite1.cache(null).get(1);
+
+ return null;
+ }
+ }, CacheServerNotFoundException.class, null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ignite2.cache(null).get(1);
+
+ return null;
+ }
+ }, CacheServerNotFoundException.class, null);
+
+ ignite1.close();
+
+ waitForTopologyUpdate(1, 5);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ignite2.cache(null).get(1);
+
+ return null;
+ }
+ }, CacheServerNotFoundException.class, null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSkipPreload() throws Exception {
+ Ignite ignite0 = startGrid(0);
+
+ final CountDownLatch evtLatch0 = new CountDownLatch(1);
+
+ ignite0.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ log.info("Rebalance event: " + evt);
+
+ evtLatch0.countDown();
+
+ return true;
+ }
+ }, EventType.EVT_CACHE_REBALANCE_STARTED, EventType.EVT_CACHE_REBALANCE_STOPPED);
+
+ client = true;
+
+ Ignite ignite1 = startGrid(1);
+
+ assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
+
+ ignite1.close();
+
+ assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
+
+ ignite1 = startGrid(1);
+
+ final CountDownLatch evtLatch1 = new CountDownLatch(1);
+
+ ignite1.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ log.info("Rebalance event: " + evt);
+
+ evtLatch1.countDown();
+
+ return true;
+ }
+ }, EventType.EVT_CACHE_REBALANCE_STARTED, EventType.EVT_CACHE_REBALANCE_STOPPED);
+
+ assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
+
+ client = false;
+
+ startGrid(2);
+
+ assertTrue(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
+ assertFalse(evtLatch1.await(1000, TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionsExchange() throws Exception {
+ partitionsExchange();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionsExchangeFairAffinity() throws Exception {
+ fairAffinity = true;
+
+ partitionsExchange();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void partitionsExchange() throws Exception {
+ Ignite ignite0 = startGrid(0);
+
+ TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi();
+
+ Ignite ignite1 = startGrid(1);
+
+ waitForTopologyUpdate(2, 2);
+
+ TestCommunicationSpi spi1 = (TestCommunicationSpi)ignite1.configuration().getCommunicationSpi();
+
+ assertEquals(0, spi0.partitionsSingleMessages());
+ assertEquals(1, spi0.partitionsFullMessages());
+
+ assertEquals(1, spi1.partitionsSingleMessages());
+ assertEquals(0, spi1.partitionsFullMessages());
+
+ spi0.reset();
+ spi1.reset();
+
+ client = true;
+
+ log.info("Start client node1.");
+
+ Ignite ignite2 = startGrid(2);
+
+ waitForTopologyUpdate(3, 3);
+
+ TestCommunicationSpi spi2 = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
+
+ assertEquals(0, spi0.partitionsSingleMessages());
+ assertEquals(1, spi0.partitionsFullMessages());
+
+ assertEquals(0, spi1.partitionsSingleMessages());
+ assertEquals(0, spi1.partitionsFullMessages());
+
+ assertEquals(1, spi2.partitionsSingleMessages());
+ assertEquals(0, spi2.partitionsFullMessages());
+
+ spi0.reset();
+ spi1.reset();
+ spi2.reset();
+
+ log.info("Start client node2.");
+
+ Ignite ignite3 = startGrid(3);
+
+ waitForTopologyUpdate(4, 4);
+
+ TestCommunicationSpi spi3 = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi();
+
+ assertEquals(0, spi0.partitionsSingleMessages());
+ assertEquals(1, spi0.partitionsFullMessages());
+
+ assertEquals(0, spi1.partitionsSingleMessages());
+ assertEquals(0, spi1.partitionsFullMessages());
+
+ assertEquals(0, spi2.partitionsSingleMessages());
+ assertEquals(0, spi2.partitionsFullMessages());
+
+ assertEquals(1, spi3.partitionsSingleMessages());
+ assertEquals(0, spi3.partitionsFullMessages());
+
+ spi0.reset();
+ spi1.reset();
+ spi2.reset();
+ spi3.reset();
+
+ log.info("Start one more server node.");
+
+ client = false;
+
+ Ignite ignite4 = startGrid(4);
+
+ waitForTopologyUpdate(5, 5);
+
+ TestCommunicationSpi spi4 = (TestCommunicationSpi)ignite4.configuration().getCommunicationSpi();
+
+ assertEquals(0, spi0.partitionsSingleMessages());
+ assertEquals(4, spi0.partitionsFullMessages());
+
+ assertEquals(1, spi1.partitionsSingleMessages());
+ assertEquals(0, spi1.partitionsFullMessages());
+
+ assertEquals(1, spi2.partitionsSingleMessages());
+ assertEquals(0, spi2.partitionsFullMessages());
+
+ assertEquals(1, spi3.partitionsSingleMessages());
+ assertEquals(0, spi3.partitionsFullMessages());
+
+ assertEquals(1, spi4.partitionsSingleMessages());
+ assertEquals(0, spi4.partitionsFullMessages());
+
+ spi0.reset();
+ spi1.reset();
+ spi2.reset();
+ spi3.reset();
+
+ log.info("Stop server node.");
+
+ ignite4.close();
+
+ waitForTopologyUpdate(4, 6);
+
+ assertEquals(0, spi0.partitionsSingleMessages());
+ assertEquals(3, spi0.partitionsFullMessages());
+
+ assertEquals(1, spi1.partitionsSingleMessages());
+ assertEquals(0, spi1.partitionsFullMessages());
+
+ assertEquals(1, spi2.partitionsSingleMessages());
+ assertEquals(0, spi2.partitionsFullMessages());
+
+ assertEquals(1, spi3.partitionsSingleMessages());
+ assertEquals(0, spi3.partitionsFullMessages());
+
+ spi0.reset();
+ spi1.reset();
+ spi2.reset();
+
+ log.info("Stop client node2.");
+
+ ignite3.close();
+
+ waitForTopologyUpdate(3, 7);
+
+ assertEquals(0, spi0.partitionsSingleMessages());
+ assertEquals(0, spi0.partitionsFullMessages());
+
+ assertEquals(0, spi1.partitionsSingleMessages());
+ assertEquals(0, spi1.partitionsFullMessages());
+
+ assertEquals(0, spi2.partitionsSingleMessages());
+ assertEquals(0, spi2.partitionsFullMessages());
+
+ spi0.reset();
+ spi1.reset();
+
+ log.info("Stop client node1.");
+
+ ignite2.close();
+
+ waitForTopologyUpdate(2, 8);
+
+ assertEquals(0, spi0.partitionsSingleMessages());
+ assertEquals(0, spi0.partitionsFullMessages());
+
+ assertEquals(0, spi1.partitionsSingleMessages());
+ assertEquals(0, spi1.partitionsFullMessages());
+
+ log.info("Stop server node.");
+
+ ignite1.close();
+
+ waitForTopologyUpdate(1, 9);
+
+ assertEquals(0, spi0.partitionsSingleMessages());
+ assertEquals(0, spi0.partitionsFullMessages());
+ }
+
+ /**
+ * @param expNodes Expected number of nodes.
+ * @param topVer Expected topology version.
+ * @throws Exception If failed.
+ */
+ private void waitForTopologyUpdate(int expNodes, int topVer) throws Exception {
+ List<Ignite> nodes = G.allGrids();
+
+ assertEquals(expNodes, nodes.size());
+
+ final AffinityTopologyVersion ver = new AffinityTopologyVersion(topVer, 0);
+
+ for (Ignite ignite : nodes) {
+ final IgniteKernal kernal = (IgniteKernal)ignite;
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return ver.equals(kernal.context().cache().context().exchange().readyAffinityVersion());
+ }
+ }, 10_000);
+
+ assertEquals("Unexpected affinity version for " + ignite.name(),
+ ver,
+ kernal.context().cache().context().exchange().readyAffinityVersion());
+ }
+
+ Iterator<Ignite> it = nodes.iterator();
+
+ Ignite ignite0 = it.next();
+
+ Affinity<Integer> aff0 = ignite0.affinity(null);
+
+ while (it.hasNext()) {
+ Ignite ignite = it.next();
+
+ Affinity<Integer> aff = ignite.affinity(null);
+
+ assertEquals(aff0.partitions(), aff.partitions());
+
+ for (int part = 0; part < aff.partitions(); part++)
+ assertEquals(aff0.mapPartitionToPrimaryAndBackups(part), aff.mapPartitionToPrimaryAndBackups(part));
+ }
+
+ for (Ignite ignite : nodes) {
+ final IgniteKernal kernal = (IgniteKernal)ignite;
+
+ for (IgniteInternalCache cache : kernal.context().cache().caches()) {
+ GridDhtPartitionTopology top = cache.context().topology();
+
+ assertEquals("Unexpected topology version [node=" + ignite.name() + ", cache=" + cache.name() + ']',
+ ver,
+ top.topologyVersion());
+ }
+ }
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
+ * Test communication SPI.
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ private AtomicInteger partSingleMsgs = new AtomicInteger();
+
+ /** */
+ private AtomicInteger partFullMsgs = new AtomicInteger();
+
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) {
+ super.sendMessage(node, msg);
+
+ Object msg0 = ((GridIoMessage)msg).message();
+
+ if (msg0 instanceof GridDhtPartitionsSingleMessage) {
+ if (((GridDhtPartitionsSingleMessage)msg0).exchangeId() != null) {
+ log.info("Partitions message: " + msg0.getClass().getSimpleName());
+
+ partSingleMsgs.incrementAndGet();
+ }
+ }
+ else if (msg0 instanceof GridDhtPartitionsFullMessage) {
+ if (((GridDhtPartitionsFullMessage)msg0).exchangeId() != null) {
+ log.info("Partitions message: " + msg0.getClass().getSimpleName());
+
+ partFullMsgs.incrementAndGet();
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ void reset() {
+ partSingleMsgs.set(0);
+ partFullMsgs.set(0);
+ }
+
+ /**
+ * @return Sent partitions single messages.
+ */
+ int partitionsSingleMessages() {
+ return partSingleMsgs.get();
+ }
+
+ /**
+ * @return Sent partitions full messages.
+ */
+ int partitionsFullMessages() {
+ return partFullMsgs.get();
+ }
+ }
+
+}
[5/8] incubator-ignite git commit: # Minor changes.
Posted by sb...@apache.org.
# Minor changes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ba7fddb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ba7fddb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ba7fddb0
Branch: refs/heads/ignite-23
Commit: ba7fddb004a840fdad66bc9fd127eafda27fee2f
Parents: 15d55b1
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon May 18 17:45:38 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon May 18 17:45:38 2015 +0300
----------------------------------------------------------------------
.../ignite/internal/managers/communication/GridIoManager.java | 6 +++---
.../communication/GridLifecycleAwareMessageFilter.java | 5 ++++-
2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ba7fddb0/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 16d582b..c877d57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1697,10 +1697,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
this.predLsnr = predLsnr;
if (predLsnr != null) {
- ctx.resource().injectGeneric(predLsnr);
-
if (predLsnr instanceof GridLifecycleAwareMessageFilter)
- ((GridLifecycleAwareMessageFilter)predLsnr).initialize();
+ ((GridLifecycleAwareMessageFilter)predLsnr).initialize(ctx);
+ else
+ ctx.resource().injectGeneric(predLsnr);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ba7fddb0/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java
index cb99d2e..f8cd78f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.managers.communication;
+import org.apache.ignite.internal.*;
import org.apache.ignite.lang.*;
/**
@@ -25,8 +26,10 @@ import org.apache.ignite.lang.*;
public interface GridLifecycleAwareMessageFilter<K, V> extends IgniteBiPredicate<K, V> {
/**
* Initializes the filter.
+ *
+ * @param ctx Kernal context.
*/
- public void initialize();
+ public void initialize(GridKernalContext ctx);
/**
* Closes the filter.
[7/8] incubator-ignite git commit: # ignite-23 skip client nodes from
partition exchange
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtClientRemoveFailureTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtClientRemoveFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtClientRemoveFailureTest.java
new file mode 100644
index 0000000..09c643b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtClientRemoveFailureTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+/**
+ *
+ */
+public class GridCacheDhtClientRemoveFailureTest extends GridCacheDhtRemoveFailureTest {
+ /** {@inheritDoc} */
+ @Override protected boolean testClientNode() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicClientInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicClientInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicClientInvalidPartitionHandlingSelfTest.java
new file mode 100644
index 0000000..64414a4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicClientInvalidPartitionHandlingSelfTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+/**
+ *
+ */
+public class GridCacheAtomicClientInvalidPartitionHandlingSelfTest
+ extends GridCacheAtomicInvalidPartitionHandlingSelfTest {
+ /** {@inheritDoc} */
+ @Override protected boolean testClientNode() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicClientRemoveFailureTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicClientRemoveFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicClientRemoveFailureTest.java
new file mode 100644
index 0000000..2edb125
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicClientRemoveFailureTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+/**
+ *
+ */
+public class GridCacheAtomicClientRemoveFailureTest extends GridCacheAtomicRemoveFailureTest {
+ /** {@inheritDoc} */
+ @Override protected boolean testClientNode() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index a68423b..2067af8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@ -78,6 +78,9 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
cfg.setCommunicationSpi(new DelayCommunicationSpi());
+ if (testClientNode() && getTestGridName(0).equals(gridName))
+ cfg.setClientMode(true);
+
return cfg;
}
@@ -109,6 +112,13 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
}
/**
+ * @return {@code True} if test updates from client node.
+ */
+ protected boolean testClientNode() {
+ return false;
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testClockFullSync() throws Exception {
@@ -167,6 +177,8 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
awaitPartitionMapExchange();
try {
+ assertEquals(testClientNode(), (boolean)grid(0).configuration().isClientMode());
+
final IgniteCache<Object, Object> cache = grid(0).cache(null);
final int range = 100_000;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index dd3ce27..428304c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -37,15 +37,18 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
TestSuite suite = new TestSuite("Cache Failover Test Suite");
suite.addTestSuite(GridCacheAtomicInvalidPartitionHandlingSelfTest.class);
+ suite.addTestSuite(GridCacheAtomicClientInvalidPartitionHandlingSelfTest.class);
suite.addTestSuite(GridCacheIncrementTransformTest.class);
// Failure consistency tests.
suite.addTestSuite(GridCacheAtomicRemoveFailureTest.class);
suite.addTestSuite(GridCacheAtomicPrimaryWriteOrderRemoveFailureTest.class);
+ suite.addTestSuite(GridCacheAtomicClientRemoveFailureTest.class);
suite.addTestSuite(GridCacheDhtAtomicRemoveFailureTest.class);
suite.addTestSuite(GridCacheDhtRemoveFailureTest.class);
+ suite.addTestSuite(GridCacheDhtClientRemoveFailureTest.class);
suite.addTestSuite(GridCacheNearRemoveFailureTest.class);
suite.addTestSuite(GridCacheAtomicNearRemoveFailureTest.class);
suite.addTestSuite(GridCacheAtomicPrimaryWriteOrderNearRemoveFailureTest.class);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 2739c6c..80c1f4e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -139,7 +139,7 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(GridCacheNearPrimarySyncSelfTest.class));
suite.addTest(new TestSuite(GridCacheColocatedPrimarySyncSelfTest.class));
- suite.addTest(new TestSuite(IgniteCacheClientNodeExchangeTest.class));
+ suite.addTest(new TestSuite(IgniteCacheClientNodePartitionsExchangeTest.class));
return suite;
}
[6/8] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-5' into ignite-23
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-23
Conflicts:
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d6f5f15c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d6f5f15c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d6f5f15c
Branch: refs/heads/ignite-23
Commit: d6f5f15c6f73dd0a643c9ef98a42f5db4b36771b
Parents: a2a6f31 ba7fddb
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 19 09:06:02 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 19 09:06:02 2015 +0300
----------------------------------------------------------------------
.../ignite/internal/managers/communication/GridIoManager.java | 6 +++---
.../communication/GridLifecycleAwareMessageFilter.java | 5 ++++-
.../org/apache/ignite/testsuites/IgniteCacheTestSuite2.java | 1 +
3 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d6f5f15c/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------