You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/13 14:35:38 UTC
[09/50] ignite git commit: ignite-5578
ignite-5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/df15780c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/df15780c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/df15780c
Branch: refs/heads/ignite-5578
Commit: df15780c2ee43337db2ee6e8b9be766d88430473
Parents: 80c4ddb
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jul 11 12:57:30 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jul 11 13:29:57 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 23 ++++++++
.../GridCachePartitionExchangeManager.java | 55 +++++++++++++++++---
.../GridDhtPartitionsExchangeFuture.java | 24 ++++++---
.../CacheExchangeCoalescingTest.java | 6 ++-
4 files changed, 93 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/df15780c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 548d795..822eb3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1259,6 +1259,29 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
+ * @param crd Coordinator flag.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onLocalJoin(boolean crd) throws IgniteCheckedException {
+
+ }
+
+ public void processDiscoveryEvents(ExchangeEvents evts) {
+ AffinityTopologyVersion topVer = evts.topologyVersion();
+
+ if (evts.serverLeft()) {
+
+ }
+ else if (evts.serverJoin()) {
+
+ }
+ else {
+
+ }
+ }
+
+
+ /**
* Called on exchange initiated by server node join.
*
* @param fut Exchange future.
http://git-wip-us.apache.org/repos/asf/ignite/blob/df15780c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7bfcd6e..aebd0ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1728,14 +1728,55 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return node.version().compareToIgnoreTimestamp(EXCHANGE_COALESCING_SINCE) >= 0;
}
- public ExchangeEvents checkExchangeCoalescing(GridDhtPartitionsExchangeFuture curFut) {
+ /** */
+ private volatile AffinityTopologyVersion coalesceTestWaitVer;
+
+ /**
+ * @param coalesceTestWaitVer
+ */
+ public void coalesceTestWaitVersion(AffinityTopologyVersion coalesceTestWaitVer) {
+ this.coalesceTestWaitVer = coalesceTestWaitVer;
+ }
+
+ public ExchangeEvents coalesceExchanges(GridDhtPartitionsExchangeFuture curFut) {
ExchangeEvents evts = null;
- try {
- U.sleep(1000);
- }
- catch (Exception e) {
- e.printStackTrace();
+ AffinityTopologyVersion coalesceTestWaitVer = this.coalesceTestWaitVer;
+
+ if (coalesceTestWaitVer != null) {
+ log.info("Coalesce test, waiting for version [exch=" + curFut.topologyVersion() +
+ ", waitVer=" + coalesceTestWaitVer + ']');
+
+ long end = U.currentTimeMillis() + 10_000;
+
+ while (U.currentTimeMillis() < end) {
+ boolean found = false;
+
+ for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
+ if (task instanceof GridDhtPartitionsExchangeFuture) {
+ GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
+
+ if (coalesceTestWaitVer.equals(fut.topologyVersion())) {
+ log.info("Coalesce test, found awaited version: " + coalesceTestWaitVer);
+
+ found = true;
+
+ break;
+ }
+ }
+ }
+
+ if (found)
+ break;
+ else {
+ try {
+ U.sleep(100);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ break;
+ }
+ }
+ }
}
for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
@@ -1758,6 +1799,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
evts = new ExchangeEvents();
evts.init(fut);
+
+ exchWorker.futQ.remove(fut);
}
else
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/df15780c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d0ea8e7..907a032 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -102,7 +102,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-import static org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.*;
+import static org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.EXCHANGE_COALESCING_SINCE;
/**
* Future for exchanging partition maps.
@@ -881,7 +881,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
ExchangeEvents mergedEvts = null;
if (crd.isLocal())
- mergedEvts = cctx.exchange().checkExchangeCoalescing(this);
+ mergedEvts = cctx.exchange().coalesceExchanges(this);
if (crd.isLocal()) {
if (remaining.isEmpty())
@@ -1379,6 +1379,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public void mergeWithFuture(final GridDhtPartitionsExchangeFuture fut) {
+ log.info("Merge exchange future [fut=" + topologyVersion() + ", mergeWith=" + fut.topologyVersion() + ']');
+
List<T2<ClusterNode, GridDhtPartitionsSingleMessage>> pendingMsgs = null;
synchronized (this) {
@@ -1447,17 +1449,23 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
sendAllPartitions(node.id(), cctx.gridConfig().getNetworkSendRetryCount());
}
else {
+ GridDhtPartitionsExchangeFuture mergedWith0 = null;
+
synchronized (this) {
- if (mergedWith != null) {
- mergedWith.onReceive(node, msg);
+ if (mergedWith != null)
+ mergedWith0 = mergedWith;
+ else {
+ if (pendingMsgs == null)
+ pendingMsgs = new ArrayList<>();
- return;
+ pendingMsgs.add(new T2<>(node, msg));
}
+ }
- if (pendingMsgs == null)
- pendingMsgs = new ArrayList<>();
+ if (mergedWith0 != null) {
+ mergedWith0.onReceive(node, msg);
- pendingMsgs.add(new T2<>(node, msg));
+ return;
}
initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/df15780c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
index 5b915f6..dbd3971 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -52,7 +54,9 @@ public class CacheExchangeCoalescingTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testConcurrentJoin1() throws Exception {
- startGrid(0);
+ IgniteEx srv0 = startGrid(0);
+
+ srv0.context().cache().context().exchange().coalesceTestWaitVersion(new AffinityTopologyVersion(3, 0));
final AtomicInteger idx = new AtomicInteger(1);