You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/13 14:35:38 UTC

[09/50] ignite git commit: ignite-5578

ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/df15780c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/df15780c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/df15780c

Branch: refs/heads/ignite-5578
Commit: df15780c2ee43337db2ee6e8b9be766d88430473
Parents: 80c4ddb
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jul 11 12:57:30 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jul 11 13:29:57 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 23 ++++++++
 .../GridCachePartitionExchangeManager.java      | 55 +++++++++++++++++---
 .../GridDhtPartitionsExchangeFuture.java        | 24 ++++++---
 .../CacheExchangeCoalescingTest.java            |  6 ++-
 4 files changed, 93 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/df15780c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 548d795..822eb3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1259,6 +1259,29 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param crd Coordinator flag.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onLocalJoin(boolean crd) throws IgniteCheckedException {
+
+    }
+
+    public void processDiscoveryEvents(ExchangeEvents evts) {
+        AffinityTopologyVersion topVer = evts.topologyVersion();
+
+        if (evts.serverLeft()) {
+
+        }
+        else if (evts.serverJoin()) {
+
+        }
+        else {
+
+        }
+    }
+
+
+    /**
      * Called on exchange initiated by server node join.
      *
      * @param fut Exchange future.

http://git-wip-us.apache.org/repos/asf/ignite/blob/df15780c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7bfcd6e..aebd0ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1728,14 +1728,55 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         return node.version().compareToIgnoreTimestamp(EXCHANGE_COALESCING_SINCE) >= 0;
     }
 
-    public ExchangeEvents checkExchangeCoalescing(GridDhtPartitionsExchangeFuture curFut) {
+    /** */
+    private volatile AffinityTopologyVersion coalesceTestWaitVer;
+
+    /**
+     * @param coalesceTestWaitVer
+     */
+    public void coalesceTestWaitVersion(AffinityTopologyVersion coalesceTestWaitVer) {
+        this.coalesceTestWaitVer = coalesceTestWaitVer;
+    }
+
+    public ExchangeEvents coalesceExchanges(GridDhtPartitionsExchangeFuture curFut) {
         ExchangeEvents evts = null;
 
-        try {
-            U.sleep(1000);
-        }
-        catch (Exception e) {
-            e.printStackTrace();
+        AffinityTopologyVersion coalesceTestWaitVer = this.coalesceTestWaitVer;
+
+        if (coalesceTestWaitVer != null) {
+            log.info("Coalesce test, waiting for version [exch=" + curFut.topologyVersion() +
+                ", waitVer=" + coalesceTestWaitVer + ']');
+
+            long end = U.currentTimeMillis() + 10_000;
+
+            while (U.currentTimeMillis() < end) {
+                boolean found = false;
+
+                for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
+                    if (task instanceof GridDhtPartitionsExchangeFuture) {
+                        GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
+
+                        if (coalesceTestWaitVer.equals(fut.topologyVersion())) {
+                            log.info("Coalesce test, found awaited version: " + coalesceTestWaitVer);
+
+                            found = true;
+
+                            break;
+                        }
+                    }
+                }
+
+                if (found)
+                    break;
+                else {
+                    try {
+                        U.sleep(100);
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        break;
+                    }
+                }
+            }
         }
 
         for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
@@ -1758,6 +1799,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         evts = new ExchangeEvents();
 
                     evts.init(fut);
+
+                    exchWorker.futQ.remove(fut);
                 }
                 else
                     break;

http://git-wip-us.apache.org/repos/asf/ignite/blob/df15780c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d0ea8e7..907a032 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -102,7 +102,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-import static org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.*;
+import static org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.EXCHANGE_COALESCING_SINCE;
 
 /**
  * Future for exchanging partition maps.
@@ -881,7 +881,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         ExchangeEvents mergedEvts = null;
 
         if (crd.isLocal())
-            mergedEvts = cctx.exchange().checkExchangeCoalescing(this);
+            mergedEvts = cctx.exchange().coalesceExchanges(this);
 
         if (crd.isLocal()) {
             if (remaining.isEmpty())
@@ -1379,6 +1379,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      */
     @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
     public void mergeWithFuture(final GridDhtPartitionsExchangeFuture fut) {
+        log.info("Merge exchange future [fut=" + topologyVersion() + ", mergeWith=" + fut.topologyVersion() + ']');
+
         List<T2<ClusterNode, GridDhtPartitionsSingleMessage>> pendingMsgs = null;
 
         synchronized (this) {
@@ -1447,17 +1449,23 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 sendAllPartitions(node.id(), cctx.gridConfig().getNetworkSendRetryCount());
         }
         else {
+            GridDhtPartitionsExchangeFuture mergedWith0 = null;
+
             synchronized (this) {
-                if (mergedWith != null) {
-                    mergedWith.onReceive(node, msg);
+                if (mergedWith != null)
+                    mergedWith0 = mergedWith;
+                else {
+                    if (pendingMsgs == null)
+                        pendingMsgs = new ArrayList<>();
 
-                    return;
+                    pendingMsgs.add(new T2<>(node, msg));
                 }
+            }
 
-                if (pendingMsgs == null)
-                    pendingMsgs = new ArrayList<>();
+            if (mergedWith0 != null) {
+                mergedWith0.onReceive(node, msg);
 
-                pendingMsgs.add(new T2<>(node, msg));
+                return;
             }
 
             initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/df15780c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
index 5b915f6..dbd3971 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache.distributed;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -52,7 +54,9 @@ public class CacheExchangeCoalescingTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testConcurrentJoin1() throws Exception {
-        startGrid(0);
+        IgniteEx srv0 = startGrid(0);
+
+        srv0.context().cache().context().exchange().coalesceTestWaitVersion(new AffinityTopologyVersion(3, 0));
 
         final AtomicInteger idx = new AtomicInteger(1);