You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/19 10:13:30 UTC

[1/6] ignite git commit: ignite-5578

Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 a7cb82962 -> 3db3266c2


ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dd44df9a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dd44df9a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dd44df9a

Branch: refs/heads/ignite-5578
Commit: dd44df9a76c8fbb5511a1bb415691b2e1973391d
Parents: f006bfb
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 19 11:39:08 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 19 11:39:08 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheAffinitySharedManager.java         | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dd44df9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 8a293ae..a59f5d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -518,11 +518,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         for (DynamicCacheDescriptor desc : startDescs) {
-            CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
+            if (desc.cacheConfiguration().getCacheMode() != LOCAL) {
+                CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
 
-            assert grp != null;
+                assert grp != null;
 
-            grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topVer), true);
+                grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topVer), true);
+            }
         }
 
         cctx.cache().initCacheProxies(topVer, null);


[4/6] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-5578-locJoin

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-5578-locJoin


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ba46cbd5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ba46cbd5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ba46cbd5

Branch: refs/heads/ignite-5578
Commit: ba46cbd59f7bade48e91c3c4d5514c3bfef9a153
Parents: 8fb60ff cde3da4
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 19 12:59:43 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 19 12:59:43 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  20 +++
 .../communication/GridIoMessageFactory.java     |   2 +-
 .../MetaPageUpdatePartitionDataRecord.java      |   2 +
 .../cache/CacheObjectsReleaseFuture.java        |  60 +++++++++
 .../cache/GridCacheExplicitLockSpan.java        |  10 +-
 .../processors/cache/GridCacheMvccManager.java  |  24 +++-
 .../cache/GridCacheSharedContext.java           |   4 +-
 ...idNearAtomicAbstractSingleUpdateRequest.java |   9 +-
 ...GridNearAtomicSingleUpdateInvokeRequest.java |   6 +
 .../atomic/GridNearAtomicUpdateResponse.java    |   2 +-
 .../GridDhtPartitionsExchangeFuture.java        |  30 +++--
 .../persistence/GridCacheOffheapManager.java    |   2 +-
 .../cache/transactions/IgniteTxAdapter.java     |  44 ++++++-
 .../cache/transactions/IgniteTxManager.java     |   5 +-
 .../service/GridServiceProcessor.java           |   4 +-
 .../processors/service/ServiceContextImpl.java  |   2 +
 .../util/future/GridCompoundFuture.java         |   6 +-
 .../util/tostring/GridToStringBuilder.java      |   2 +-
 .../spi/IgniteSpiOperationTimeoutHelper.java    |   8 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 131 ++++++++++++-------
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  72 +++++++---
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  16 ++-
 .../ignite/internal/GridVersionSelfTest.java    |   2 +
 .../IgniteClientReconnectAbstractTest.java      |   5 +
 ...UpdateNotifierPerClusterSettingSelfTest.java |   2 +
 .../cache/GridCachePutAllFailoverSelfTest.java  |  15 +++
 .../cache/IgniteCachePutAllRestartTest.java     |  15 +++
 .../CacheGetInsideLockChangingTopologyTest.java |   5 +
 .../GridCacheAbstractNodeRestartSelfTest.java   |  12 +-
 ...titionEvictionDuringReadThroughSelfTest.java |   1 +
 ...niteBinaryMetadataUpdateNodeRestartTest.java |  10 ++
 .../distributed/IgniteCacheGetRestartTest.java  |   5 +
 .../IgniteCacheNearRestartRollbackSelfTest.java |  15 +++
 ...teSynchronizationModesMultithreadedTest.java |   5 +
 .../IgniteCachePutRetryAbstractSelfTest.java    |   5 +
 .../GridCacheReplicatedNodeRestartSelfTest.java |   2 +-
 .../processors/database/BPlusTreeSelfTest.java  |   4 +-
 .../database/FreeListImplSelfTest.java          |   4 +-
 .../org/apache/ignite/spi/GridTcpForwarder.java |  26 ++++
 .../tcp/TcpCommunicationSpiDropNodesTest.java   |  15 +++
 .../TcpCommunicationSpiFaultyClientTest.java    |  20 ++-
 .../IgniteCacheRestartTestSuite2.java           |   2 +
 .../cache/IgnitePutTxLoadBenchmark.java         |   3 +-
 43 files changed, 511 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ba46cbd5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 003c2f0,29c89a5..97e06bf
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@@ -870,16 -869,10 +870,16 @@@ public class GridIoMessageFactory imple
  
                  break;
  
 -            // [-3..119] [124..127] [-23..-27] [-36..-55]- this
 +            case 128:
 +                msg = new CacheGroupAffinityMessage();
 +
 +                break;
 +
 +
 +            // [-3..119] [124..128] [-23..-27] [-36..-55]- this
              // [120..123] - DR
              // [-4..-22, -30..-35] - SQL
-             // [-54..-60] - Snapshots
+             // [2048..2053] - Snapshots
              default:
                  if (ext != null) {
                      for (MessageFactory factory : ext) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ba46cbd5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 2c9119f,c4a4f83..4f572df
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -898,16 -896,18 +903,17 @@@ public class GridDhtPartitionsExchangeF
          try {
              long start = U.currentTimeMillis();
  
 -            IgniteInternalFuture fut = cctx.snapshot()
 -                .tryStartLocalSnapshotOperation(discoEvt);
 +            IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(discoEvt);
  
-             if (fut != null)
+             if (fut != null) {
                  fut.get();
  
-             long end = U.currentTimeMillis();
+                 long end = U.currentTimeMillis();
  
-             if (log.isInfoEnabled())
-                 log.info("Snapshot initialization completed [topVer=" + exchangeId().topologyVersion() +
-                     ", time=" + (end - start) + "ms]");
+                 if (log.isInfoEnabled())
+                     log.info("Snapshot initialization completed [topVer=" + exchangeId().topologyVersion() +
+                         ", time=" + (end - start) + "ms]");
+             }
          }
          catch (IgniteCheckedException e) {
              U.error(log, "Error while starting snapshot operation", e);


[6/6] ignite git commit: Merge branch 'ignite-5578-locJoin' into ignite-5578

Posted by sb...@apache.org.
Merge branch 'ignite-5578-locJoin' into ignite-5578

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
#	modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3db3266c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3db3266c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3db3266c

Branch: refs/heads/ignite-5578
Commit: 3db3266c2290948da2cfda302a844cf64727b356
Parents: 5978213
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 19 13:13:21 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 19 13:13:21 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 46 +-------------------
 .../preloader/CacheGroupAffinityMessage.java    |  2 +-
 2 files changed, 2 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3db3266c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 1fc59bb..7f55e79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1289,7 +1289,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         });
     }
 
-    public void onJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg)
+    public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg)
         throws IgniteCheckedException {
         final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin();
 
@@ -1370,50 +1370,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
-     * @param fut Exchange future.
-     * @param msg Message.
-     */
-    public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg) {
-        final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin();
-
-        if (F.isEmpty(affReq))
-            return;
-
-        final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
-
-        final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity();
-
-        assert !F.isEmpty(joinedNodeAff) : msg;
-        assert joinedNodeAff.size() >= affReq.size();
-
-        forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
-            @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
-                if (affReq.contains(aff.groupId())) {
-                    assert AffinityTopologyVersion.NONE.equals(aff.lastVersion());
-
-                    CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId());
-
-                    assert affMsg != null;
-
-                    List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, fut.discoCache());
-
-                    // Calculate ideal assignments.
-                    if (!aff.centralizedAffinityFunction())
-                        aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
-
-                    aff.initialize(fut.topologyVersion(), assignments);
-
-                    CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
-
-                    assert grp != null;
-
-                    grp.topology().initPartitions(fut);
-                }
-            }
-        });
-    }
-
-    /**
      * Called on exchange initiated by server node join.
      *
      * @param fut Exchange future.

http://git-wip-us.apache.org/repos/asf/ignite/blob/3db3266c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
index b310308..fcfec1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -250,7 +250,7 @@ public class CacheGroupAffinityMessage implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 1;
+        return 2;
     }
 
     /** {@inheritDoc} */


[2/6] ignite git commit: ignite-5578 join

Posted by sb...@apache.org.
ignite-5578 join


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/056847c0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/056847c0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/056847c0

Branch: refs/heads/ignite-5578
Commit: 056847c091d091d678f6c96432d00e196115c3e7
Parents: dd44df9
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 19 12:19:03 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 19 12:19:03 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 17 -----
 .../preloader/CacheGroupAffinityMessage.java    | 75 ++++++++------------
 .../GridDhtPartitionsExchangeFuture.java        |  4 +-
 .../preloader/GridDhtPartitionsFullMessage.java | 37 +++-------
 4 files changed, 43 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/056847c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index cfc3671..80121e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -2256,23 +2256,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             return super.values();
         }
 
-        /**
-         * @param exchangeId Exchange ID.
-         * @return Future.
-         */
-        public synchronized GridDhtPartitionsExchangeFuture find(GridDhtPartitionExchangeId exchangeId) {
-            ListIterator<GridDhtPartitionsExchangeFuture> it = listIterator(size() - 1);
-
-            while (it.hasPrevious()) {
-                GridDhtPartitionsExchangeFuture fut0 = it.previous();
-
-                if (fut0.exchangeId().equals(exchangeId))
-                    return fut0;
-            }
-
-            return null;
-        }
-
         /** {@inheritDoc} */
         @Override public synchronized String toString() {
             return S.toString(ExchangeFutureSet.class, this, super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/056847c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
index 5cd5d26..ee4ef45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -45,9 +45,6 @@ public class CacheGroupAffinityMessage implements Message {
     private static final long serialVersionUID = 0L;
 
     /** */
-    private int grpId;
-
-    /** */
     @GridDirectCollection(GridLongList.class)
     private List<GridLongList> assigns;
 
@@ -59,12 +56,9 @@ public class CacheGroupAffinityMessage implements Message {
     }
 
     /**
-     * @param grpId Group ID.
      * @param assign0 Assignment.
      */
-    private CacheGroupAffinityMessage(int grpId, List<List<ClusterNode>> assign0) {
-        this.grpId = grpId;
-
+    private CacheGroupAffinityMessage(List<List<ClusterNode>> assign0) {
         assigns = new ArrayList<>(assign0.size());
 
         for (int i = 0; i < assign0.size(); i++) {
@@ -80,13 +74,6 @@ public class CacheGroupAffinityMessage implements Message {
     }
 
     /**
-     * @return Cache group ID.
-     */
-    int groupId() {
-        return grpId;
-    }
-
-    /**
      * @param cctx Context.
      * @param topVer Topology version.
      * @param affReq Cache group IDs.
@@ -115,34 +102,46 @@ public class CacheGroupAffinityMessage implements Message {
     }
 
     /**
+     * @param assign Nodes orders.
      * @param nodesByOrder Nodes by order cache.
      * @param discoCache Discovery data cache.
-     * @return Assignments.
+     * @return Nodes list.
      */
-    List<List<ClusterNode>> createAssignments(Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) {
-        List<List<ClusterNode>> assignments0 = new ArrayList<>(assigns.size());
+    public static List<ClusterNode> toNodes(GridLongList assign, Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) {
+        List<ClusterNode> assign0 = new ArrayList<>(assign.size());
 
-        for (int p = 0; p < assigns.size(); p++) {
-            GridLongList assign = assigns.get(p);
-            List<ClusterNode> assign0 = new ArrayList<>(assign.size());
+        for (int n = 0; n < assign.size(); n++) {
+            long order = assign.get(n);
 
-            for (int n = 0; n < assign.size(); n++) {
-                long order = assign.get(n);
+            ClusterNode affNode = nodesByOrder.get(order);
 
-                ClusterNode affNode = nodesByOrder.get(order);
+            if (affNode == null) {
+                affNode = discoCache.serverNodeByOrder(order);
 
-                if (affNode == null) {
-                    affNode = discoCache.serverNodeByOrder(order);
+                assert affNode != null : "Failed to find node by order [order=" + order +
+                    ", topVer=" + discoCache.version() + ']';
 
-                    assert affNode != null : order;
+                nodesByOrder.put(order, affNode);
+            }
 
-                    nodesByOrder.put(order, affNode);
-                }
+            assign0.add(affNode);
+        }
 
-                assign0.add(affNode);
-            }
+        return assign0;
+    }
 
-            assignments0.add(assign0);
+    /**
+     * @param nodesByOrder Nodes by order cache.
+     * @param discoCache Discovery data cache.
+     * @return Assignments.
+     */
+    public List<List<ClusterNode>> createAssignments(Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) {
+        List<List<ClusterNode>> assignments0 = new ArrayList<>(assigns.size());
+
+        for (int p = 0; p < assigns.size(); p++) {
+            GridLongList assign = assigns.get(p);
+
+            assignments0.add(toNodes(assign, nodesByOrder, discoCache));
         }
 
         return assignments0;
@@ -167,12 +166,6 @@ public class CacheGroupAffinityMessage implements Message {
 
                 writer.incrementState();
 
-            case 1:
-                if (!writer.writeInt("grpId", grpId))
-                    return false;
-
-                writer.incrementState();
-
         }
 
         return true;
@@ -194,14 +187,6 @@ public class CacheGroupAffinityMessage implements Message {
 
                 reader.incrementState();
 
-            case 1:
-                grpId = reader.readInt("grpId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
         }
 
         return reader.afterMessageRead(CacheGroupAffinityMessage.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/056847c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index fa30fa2..6da7876 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -227,6 +227,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private ExchangeContext exchCtx;
 
     /** */
+    @GridToStringExclude
     private FinishState finishState;
 
     /**
@@ -873,7 +874,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
                 continue;
 
-            if (!localJoinExchange() || grp.affinity().lastVersion().topologyVersion() > 0)
+            // It is possible affinity is not initialized yet if node joins to cluster.
+            if (grp.affinity().lastVersion().topologyVersion() > 0)
                 grp.topology().beforeExchange(this, !centralizedAff);
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/056847c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index edc9c9e..1ea8757 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -103,8 +103,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     private transient boolean compress;
 
     /** */
-    @GridDirectCollection(CacheGroupAffinityMessage.class)
-    private Collection<CacheGroupAffinityMessage> cachesAff;
+    @GridDirectMap(keyType = Integer.class, valueType = CacheGroupAffinityMessage.class)
+    private Map<Integer, CacheGroupAffinityMessage> joinedNodeAff;
 
     /**
      * Required by {@link Externalizable}.
@@ -148,37 +148,32 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         cp.partsToReload = partsToReload;
         cp.partsToReloadBytes = partsToReloadBytes;
         cp.topVer = topVer;
-        cp.cachesAff = cachesAff;
+        cp.joinedNodeAff = joinedNodeAff;
     }
 
     /**
-     * @param cachesAff Affinity.
      * @return Message copy.
      */
-    GridDhtPartitionsFullMessage copyWithAffinity(Collection<CacheGroupAffinityMessage> cachesAff) {
-        assert !F.isEmpty(cachesAff) : cachesAff;
-
+    GridDhtPartitionsFullMessage copy() {
         GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage();
 
         copyStateTo(cp);
 
-        cp.cachesAff = cachesAff;
-
         return cp;
     }
 
     /**
-     * @return Affinity.
+     * @return Caches affinity for joining nodes.
      */
-    @Nullable Collection<CacheGroupAffinityMessage> cachesAffinity() {
-        return cachesAff;
+    @Nullable public Map<Integer, CacheGroupAffinityMessage> joinedNodeAffinity() {
+        return joinedNodeAff;
     }
 
     /**
-     * @param cachesAff Affinity.
+     * @param joinedNodeAff Caches affinity for joining nodes.
      */
-    void cachesAffinity(Collection<CacheGroupAffinityMessage> cachesAff) {
-        this.cachesAff = cachesAff;
+    void joinedNodeAffinity(Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) {
+        this.joinedNodeAff = joinedNodeAff;
     }
 
     /** {@inheritDoc} */
@@ -461,11 +456,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         }
 
         switch (writer.state()) {
-            case 5:
-                if (!writer.writeCollection("cachesAff", cachesAff, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
 
             case 6:
                 if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
@@ -525,13 +515,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             return false;
 
         switch (reader.state()) {
-            case 5:
-                cachesAff = reader.readCollection("cachesAff", MessageCollectionItemType.MSG);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
 
             case 6:
                 dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);


[3/6] ignite git commit: ignite-5578

Posted by sb...@apache.org.
ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8fb60ffc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8fb60ffc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8fb60ffc

Branch: refs/heads/ignite-5578
Commit: 8fb60ffcff592b7128ae88151f0fa07c016e0126
Parents: 056847c
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 19 12:50:36 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 19 12:50:36 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java | 14 ++++
 .../discovery/GridDiscoveryManager.java         | 31 +++++++--
 .../cache/CacheAffinitySharedManager.java       | 46 ++++++++++++
 .../preloader/CacheGroupAffinityMessage.java    |  4 +-
 .../GridDhtPartitionsExchangeFuture.java        | 73 +++++---------------
 .../preloader/GridDhtPartitionsFullMessage.java | 20 ++++--
 6 files changed, 120 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index f63c5f6..1d8cfdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -85,7 +86,11 @@ public class DiscoCache {
     /** */
     private final IgniteProductVersion minNodeVer;
 
+    /** */
+    private final AffinityTopologyVersion topVer;
+
     /**
+     * @param topVer Topology version.
      * @param state Current cluster state.
      * @param loc Local node.
      * @param rmtNodes Remote nodes.
@@ -101,6 +106,7 @@ public class DiscoCache {
      * @param alives Alive nodes.
      */
     DiscoCache(
+        AffinityTopologyVersion topVer,
         DiscoveryDataClusterState state,
         ClusterNode loc,
         List<ClusterNode> rmtNodes,
@@ -114,6 +120,7 @@ public class DiscoCache {
         Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
         Map<UUID, ClusterNode> nodeMap,
         Set<UUID> alives) {
+        this.topVer = topVer;
         this.state = state;
         this.loc = loc;
         this.rmtNodes = rmtNodes;
@@ -143,6 +150,13 @@ public class DiscoCache {
     }
 
     /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion version() {
+        return topVer;
+    }
+
+    /**
      * @return Minimum node version.
      */
     public IgniteProductVersion minimumNodeVersion() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 347f6fe..1e34f0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -620,7 +620,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 ChangeGlobalStateFinishMessage stateFinishMsg = null;
 
                 if (locJoinEvt) {
-                    discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+                    discoCache = createDiscoCache(new AffinityTopologyVersion(topVer, minorTopVer),
+                        ctx.state().clusterState(),
+                        locNode,
+                        topSnapshot);
 
                     transitionWaitFut = ctx.state().onLocalJoin(discoCache);
                 }
@@ -643,7 +646,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     else if (customMsg instanceof ChangeGlobalStateFinishMessage) {
                         ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg);
 
-                        discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+                        discoCache = createDiscoCache(topSnap.get().topVer,
+                            ctx.state().clusterState(),
+                            locNode,
+                            topSnapshot);
 
                         topSnap.set(new Snapshot(topSnap.get().topVer, discoCache));
 
@@ -691,7 +697,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 // event notifications, since SPI notifies manager about all events from this listener.
                 if (verChanged) {
                     if (discoCache == null)
-                        discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+                        discoCache = createDiscoCache(
+                            nextTopVer,
+                            ctx.state().clusterState(),
+                            locNode,
+                            topSnapshot);
 
                     discoCacheHist.put(nextTopVer, discoCache);
 
@@ -761,8 +771,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                     topHist.clear();
 
-                    topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
-                        createDiscoCache(ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet())));
+                    topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, createDiscoCache(
+                        AffinityTopologyVersion.ZERO,
+                        ctx.state().clusterState(),
+                        locNode,
+                        Collections.<ClusterNode>emptySet())));
                 }
                 else if (type == EVT_CLIENT_NODE_RECONNECTED) {
                     assert locNode.isClient() : locNode;
@@ -2170,12 +2183,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /**
      * Called from discovery thread.
      *
+     * @param topVer Topology version.
      * @param state Current state.
      * @param loc Local node.
      * @param topSnapshot Topology snapshot.
      * @return Newly created discovery cache.
      */
-    @NotNull private DiscoCache createDiscoCache(DiscoveryDataClusterState state,
+    @NotNull private DiscoCache createDiscoCache(
+        AffinityTopologyVersion topVer,
+        DiscoveryDataClusterState state,
         ClusterNode loc,
         Collection<ClusterNode> topSnapshot) {
         HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
@@ -2252,6 +2268,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         }
 
         return new DiscoCache(
+            topVer,
             state,
             loc,
             Collections.unmodifiableList(rmtNodes),
@@ -2394,7 +2411,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         discoWrk.addEvent(EVT_NODE_SEGMENTED,
                             AffinityTopologyVersion.NONE,
                             node,
-                            createDiscoCache(null, node, empty),
+                            createDiscoCache(AffinityTopologyVersion.NONE, null, node, empty),
                             empty,
                             null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index a59f5d7..bb27613 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -51,8 +51,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -1246,6 +1248,50 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param fut Exchange future.
+     * @param msg Message.
+     */
+    public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg) {
+        final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin();
+
+        if (F.isEmpty(affReq))
+            return;
+
+        final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
+
+        final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity();
+
+        assert !F.isEmpty(joinedNodeAff) : msg;
+        assert joinedNodeAff.size() >= affReq.size();
+
+        forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+            @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
+                if (affReq.contains(aff.groupId())) {
+                    assert AffinityTopologyVersion.NONE.equals(aff.lastVersion());
+
+                    CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId());
+
+                    assert affMsg != null;
+
+                    List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, fut.discoCache());
+
+                    // Calculate ideal assignments.
+                    if (!aff.centralizedAffinityFunction())
+                        aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+
+                    aff.initialize(fut.topologyVersion(), assignments);
+
+                    CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
+
+                    assert grp != null;
+
+                    grp.topology().initPartitions(fut);
+                }
+            }
+        });
+    }
+
+    /**
      * Called on exchange initiated by server node join.
      *
      * @param fut Exchange future.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
index ee4ef45..726054d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -94,7 +94,7 @@ public class CacheGroupAffinityMessage implements Message {
             if (!cachesAff.containsKey(grpId)) {
                 List<List<ClusterNode>> assign = cctx.affinity().affinity(grpId).assignments(topVer);
 
-                cachesAff.put(grpId, new CacheGroupAffinityMessage(grpId, assign));
+                cachesAff.put(grpId, new CacheGroupAffinityMessage(assign));
             }
         }
 
@@ -199,7 +199,7 @@ public class CacheGroupAffinityMessage implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 2;
+        return 1;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 6da7876..2c9119f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1189,10 +1189,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
     /**
      * @param nodes Nodes.
-     * @param cachesAff Affinity if was requested by some nodes.
+     * @param joinedNodeAff Affinity if was requested by some nodes.
      * @throws IgniteCheckedException If failed.
      */
-    private void sendAllPartitions(Collection<ClusterNode> nodes, Collection<CacheGroupAffinityMessage> cachesAff)
+    private void sendAllPartitions(Collection<ClusterNode> nodes, Map<Integer, CacheGroupAffinityMessage> joinedNodeAff)
         throws IgniteCheckedException {
         boolean singleNode = nodes.size() == 1;
 
@@ -1212,15 +1212,18 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         for (ClusterNode node : nodes) {
             GridDhtPartitionsFullMessage sndMsg = msg;
 
-            if (cachesAff != null) {
+            if (joinedNodeAff != null) {
                 if (singleNode)
-                    msg.cachesAffinity(cachesAff);
+                    msg.joinedNodeAffinity(joinedNodeAff);
                 else {
                     GridDhtPartitionsSingleMessage singleMsg = msgs.get(node.id());
 
                     if (singleMsg != null && singleMsg.cacheGroupsAffinityRequest() != null) {
-                        if (msgWithAff == null)
-                            msgWithAff = msg.copyWithAffinity(cachesAff);
+                        if (msgWithAff == null) {
+                            msgWithAff = msg.copy();
+
+                            msgWithAff.joinedNodeAffinity(joinedNodeAff);
+                        }
 
                         sndMsg = msgWithAff;
                     }
@@ -1747,7 +1750,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
-            Map<Integer, CacheGroupAffinityMessage> cachesAff = null;
+            Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null;
 
             for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
                 GridDhtPartitionsSingleMessage msg = e.getValue();
@@ -1770,10 +1773,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
 
                 if (affReq != null) {
-                    cachesAff = CacheGroupAffinityMessage.createAffinityMessages(cctx,
+                    joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(cctx,
                         topologyVersion(),
                         affReq,
-                        cachesAff);
+                        joinedNodeAff);
 
                     UUID nodeId = e.getKey();
 
@@ -1880,7 +1883,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
 
                 if (!nodes.isEmpty())
-                    sendAllPartitions(nodes, cachesAff != null ? cachesAff.values() : null);
+                    sendAllPartitions(nodes, joinedNodeAff);
 
                 onDone(exchangeId().topologyVersion(), err);
             }
@@ -1919,19 +1922,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (n != null) {
                 Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
 
-                Collection<CacheGroupAffinityMessage> cachesAff = null;
+                Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null;
 
                 if (affReq != null) {
-                    Map<Integer, CacheGroupAffinityMessage> affMap = CacheGroupAffinityMessage.createAffinityMessages(
+                    joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(
                         cctx,
                         msg.exchangeId().topologyVersion(),
                         affReq,
                         null);
-
-                    cachesAff = affMap.values();
                 }
 
-                sendAllPartitions(F.asList(n), cachesAff);
+                sendAllPartitions(F.asList(n), joinedNodeAff);
             }
         }
         catch (IgniteCheckedException e) {
@@ -2055,46 +2056,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin();
 
-        if (localJoinExchange() && affReq != null) {
-            Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
-
-            Collection<CacheGroupAffinityMessage> cachesAff = msg.cachesAffinity();
-
-            assert !F.isEmpty(cachesAff) : msg;
-            assert cachesAff.size() >= affReq.size();
-
-            int cnt = 0;
-
-            for (CacheGroupAffinityMessage aff : cachesAff) {
-                if (affReq.contains(aff.groupId())) {
-                    CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
-
-                    assert grp != null : aff.groupId();
-                    assert AffinityTopologyVersion.NONE.equals(grp.affinity().lastVersion());
-
-                    List<List<ClusterNode>> assignments = aff.createAssignments(nodesByOrder, discoCache);
-
-                    // Calculate ideal assignments.
-                    if (!grp.affinity().centralizedAffinityFunction())
-                        grp.affinity().calculate(topologyVersion(), discoEvt, discoCache);
-
-                    grp.affinity().initialize(topologyVersion(), assignments);
-
-                    try {
-                        grp.topology().initPartitions(this);
-                    }
-                    catch (IgniteInterruptedCheckedException e) {
-                        U.warn(log, "Interrupted when initialize local partitions.");
-
-                        return;
-                    }
-
-                    cnt++;
-                }
-            }
-
-            assert affReq.size() == cnt : cnt;
-        }
+        if (localJoinExchange() && affReq != null)
+            cctx.affinity().onLocalJoin(this, msg);
 
         updatePartitionFullMap(msg);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 1ea8757..a4258c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -456,15 +456,20 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         }
 
         switch (writer.state()) {
+            case 5:
+                if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
 
             case 6:
-                if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
+                if (!writer.writeByteArray("errsBytes", errsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeByteArray("errsBytes", errsBytes))
+                if (!writer.writeMap("joinedNodeAff", joinedNodeAff, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
@@ -515,9 +520,16 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             return false;
 
         switch (reader.state()) {
+            case 5:
+                dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
 
             case 6:
-                dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
+                errsBytes = reader.readByteArray("errsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -525,7 +537,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 7:
-                errsBytes = reader.readByteArray("errsBytes");
+                joinedNodeAff = reader.readMap("joinedNodeAff", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
 
                 if (!reader.isLastRead())
                     return false;


[5/6] ignite git commit: Merge branch 'ignite-5578-locJoin' into ignite-5578

Posted by sb...@apache.org.
Merge branch 'ignite-5578-locJoin' into ignite-5578

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
#	modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5978213e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5978213e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5978213e

Branch: refs/heads/ignite-5578
Commit: 5978213e1e8c8d81d4b33ced4e38bec1edd5cee5
Parents: a7cb829 ba46cbd
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 19 13:13:15 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 19 13:13:15 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java |  3 +-
 .../discovery/GridDiscoveryManager.java         |  5 +-
 .../cache/CacheAffinitySharedManager.java       | 52 ++++++++++++++++++--
 .../GridCachePartitionExchangeManager.java      | 17 -------
 .../preloader/CacheGroupAffinityMessage.java    |  2 +-
 .../GridDhtPartitionsExchangeFuture.java        |  7 ++-
 .../preloader/GridDhtPartitionsFullMessage.java |  4 +-
 7 files changed, 62 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 6ec9b73,1e34f0c..8d309ed
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@@ -697,7 -697,11 +697,7 @@@ public class GridDiscoveryManager exten
                  // event notifications, since SPI notifies manager about all events from this listener.
                  if (verChanged) {
                      if (discoCache == null)
-                         discoCache = createDiscoCache(nextTopVer, ctx.state().clusterState(), locNode, topSnapshot);
 -                        discoCache = createDiscoCache(
 -                            nextTopVer,
 -                            ctx.state().clusterState(),
 -                            locNode,
 -                            topSnapshot);
++                        discoCache = createDiscoCache(nextTopVer,ctx.state().clusterState(), locNode, topSnapshot);
  
                      discoCacheHist.put(nextTopVer, discoCache);
  
@@@ -767,8 -771,11 +767,8 @@@
  
                      topHist.clear();
  
 -                    topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, createDiscoCache(
 -                        AffinityTopologyVersion.ZERO,
 -                        ctx.state().clusterState(),
 -                        locNode,
 -                        Collections.<ClusterNode>emptySet())));
 +                    topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
-                         createDiscoCache(AffinityTopologyVersion.ZERO, ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet())));
++                        createDiscoCache(AffinityTopologyVersion.ZERO,ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet())));
                  }
                  else if (type == EVT_CLIENT_NODE_RECONNECTED) {
                      assert locNode.isClient() : locNode;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index ba6a22b,bb27613..1fc59bb
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@@ -1244,130 -1247,51 +1246,174 @@@ public class CacheAffinitySharedManager
          return grpHolder.affinity();
      }
  
 +    public void mergeExchangesOnServerLeft(final GridDhtPartitionsExchangeFuture fut, final GridDhtPartitionsFullMessage msg) {
 +        final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
 +
 +        final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
 +
 +        log.info("mergeExchangesOnServerLeft [topVer=" + fut.context().events().discoveryCache().version() + ']');
 +
 +        forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
 +            @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
 +                ExchangeDiscoveryEvents evts = fut.context().events();
 +
 +                Map<Integer, CacheGroupAffinityMessage> idealAffDiff = msg.idealAffinityDiff();
 +
 +                List<List<ClusterNode>> idealAssignment =
 +                    aff.calculate(evts.topologyVersion(), evts.lastEvent(), evts.discoveryCache());
 +
 +                CacheGroupAffinityMessage affMsg = idealAffDiff != null ? idealAffDiff.get(aff.groupId()) : null;
 +
 +                List<List<ClusterNode>> newAssignment;
 +
 +                if (affMsg != null) {
 +                    Map<Integer, GridLongList> diff = affMsg.assignmentsDiff();
 +
 +                    assert !F.isEmpty(diff);
 +
 +                    newAssignment = new ArrayList<>(idealAssignment);
 +
 +                    for (Map.Entry<Integer, GridLongList> e : diff.entrySet()) {
 +                        GridLongList assign = e.getValue();
 +
 +                        newAssignment.set(e.getKey(), CacheGroupAffinityMessage.toNodes(assign,
 +                            nodesByOrder,
 +                            evts.discoveryCache()));
 +                    }
 +                }
 +                else
 +                    newAssignment = idealAssignment;
 +
 +                aff.initialize(evts.topologyVersion(), cachedAssignment(aff, newAssignment, affCache));
 +            }
 +        });
 +    }
 +
 +    public void onJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg)
 +        throws IgniteCheckedException {
 +        final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin();
 +
 +        final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
 +
 +        final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity();
 +
 +        assert !F.isEmpty(joinedNodeAff) : msg;
 +        assert joinedNodeAff.size() >= affReq.size();
 +
 +        forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
 +            @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
 +                if (affReq.contains(aff.groupId())) {
 +                    assert AffinityTopologyVersion.NONE.equals(aff.lastVersion());
 +
 +                    CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId());
 +
 +                    assert affMsg != null;
 +
 +                    ExchangeDiscoveryEvents evts = fut.context().events();
 +
 +                    List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, evts.discoveryCache());
 +
 +                    // Calculate ideal assignments.
 +                    if (!aff.centralizedAffinityFunction())
 +                        aff.calculate(evts.topologyVersion(), evts.lastEvent(), evts.discoveryCache());
 +
 +                    aff.initialize(evts.topologyVersion(), assignments);
 +
 +                    CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
 +
 +                    assert grp != null;
 +
 +                    grp.topology().initPartitions(fut);
 +                }
 +            }
 +        });
 +    }
 +
 +    public void mergeExchangesOnServerJoin(GridDhtPartitionsExchangeFuture fut, boolean crd)
 +        throws IgniteCheckedException {
 +        final ExchangeDiscoveryEvents evts = fut.context().events();
 +
 +        log.info("mergeExchangesInitAffinityOnServerLeft [topVer=" + evts.discoveryCache().version() + ']');
 +
 +        assert evts.serverJoin() && !evts.serverLeft();
 +
 +        WaitRebalanceInfo waitRebalanceInfo = initAffinityOnNodeJoin(evts, crd);
 +
 +        setWaitRebalanceInfo(waitRebalanceInfo, evts.waitRebalanceEventVersion(), crd);
 +    }
 +
 +    public  Map<Integer, CacheGroupAffinityMessage> mergeExchangesInitAffinityOnServerLeft(
 +        GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException
 +    {
 +        final ExchangeDiscoveryEvents evts = fut.context().events();
 +
 +        assert evts.serverLeft();
 +
 +        log.info("mergeExchangesInitAffinityOnServerLeft [topVer=" + evts.discoveryCache().version() + ']');
 +
 +        forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
 +            @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
 +                AffinityTopologyVersion topVer = evts.topologyVersion();
 +
 +                CacheGroupHolder cache = groupHolder(topVer, desc);
 +
 +                cache.affinity().calculate(topVer, evts.lastEvent(), evts.discoveryCache());
 +            }
 +        });
 +
 +        Map<Integer, Map<Integer, List<Long>>> diff = initAffinityOnNodeLeft0(evts.topologyVersion(),
 +            fut,
 +            NODE_TO_ORDER,
 +            true);
 +
 +        return CacheGroupAffinityMessage.createAffinityDiffMessages(diff);
 +    }
 +
      /**
+      * @param fut Exchange future.
+      * @param msg Message.
+      */
+     public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg) {
+         final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin();
+ 
+         if (F.isEmpty(affReq))
+             return;
+ 
+         final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
+ 
+         final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity();
+ 
+         assert !F.isEmpty(joinedNodeAff) : msg;
+         assert joinedNodeAff.size() >= affReq.size();
+ 
+         forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+             @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
+                 if (affReq.contains(aff.groupId())) {
+                     assert AffinityTopologyVersion.NONE.equals(aff.lastVersion());
+ 
+                     CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId());
+ 
+                     assert affMsg != null;
+ 
+                     List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, fut.discoCache());
+ 
+                     // Calculate ideal assignments.
+                     if (!aff.centralizedAffinityFunction())
+                         aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+ 
+                     aff.initialize(fut.topologyVersion(), assignments);
+ 
+                     CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
+ 
+                     assert grp != null;
+ 
+                     grp.topology().initPartitions(fut);
+                 }
+             }
+         });
+     }
+ 
+     /**
       * Called on exchange initiated by server node join.
       *
       * @param fut Exchange future.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 9ae4032,4f572df..17bea14
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -1264,12 -1201,10 +1266,13 @@@ public class GridDhtPartitionsExchangeF
  
      /**
       * @param nodes Nodes.
+      * @param joinedNodeAff Affinity if was requested by some nodes.
       * @throws IgniteCheckedException If failed.
       */
 -    private void sendAllPartitions(Collection<ClusterNode> nodes, Map<Integer, CacheGroupAffinityMessage> joinedNodeAff)
 +    private void sendAllPartitions(Collection<ClusterNode> nodes,
 +        Map<Integer, CacheGroupAffinityMessage> joinedNodeAff,
 +        Map<Integer, CacheGroupAffinityMessage> idealAffDiff,
 +        @Nullable GridDhtPartitionExchangeId msgExchId)
          throws IgniteCheckedException {
          boolean singleNode = nodes.size() == 1;
  
@@@ -2345,68 -2045,43 +2348,68 @@@
       * @param msg Message.
       */
      private void processFullMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) {
 -        assert exchId.equals(msg.exchangeId()) : msg;
 -        assert msg.lastVersion() != null : msg;
 +        try {
 +            assert exchId.equals(msg.exchangeId()) : msg;
 +            assert msg.lastVersion() != null : msg;
  
 -        synchronized (this) {
 -            if (crd == null || finishState != null)
 -                return;
 +            synchronized (this) {
 +                if (crd == null || finishState != null)
 +                    return;
  
 -            if (!crd.equals(node)) {
 -                if (log.isDebugEnabled())
 -                    log.debug("Received full partition map from unexpected node [oldest=" + crd.id() +
 -                        ", nodeId=" + node.id() + ']');
 +                if (!crd.equals(node)) {
 +                    if (log.isDebugEnabled())
 +                        log.debug("Received full partition map from unexpected node [oldest=" + crd.id() +
 +                            ", nodeId=" + node.id() + ']');
  
 -                if (node.order() > crd.order())
 -                    fullMsgs.put(node, msg);
 +                    if (node.order() > crd.order())
 +                        fullMsgs.put(node, msg);
  
 -                return;
 +                    return;
 +                }
 +
 +                finishState = new FinishState(crd.id(), msg.resultTopologyVersion());
              }
  
 -            finishState = new FinishState(crd.id());
 -        }
 +            if (exchCtx.mergeExchanges()) {
 +                if (msg.resultTopologyVersion() != null && !initialVersion().equals(msg.resultTopologyVersion())) {
 +                    log.info("Received full message, need merge [curFut=" + topologyVersion() +
 +                        ", resVer=" + msg.resultTopologyVersion() + ']');
 +
 +                    cctx.exchange().mergeExchanges(this, msg.resultTopologyVersion());
 +                }
  
 -        Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin();
 +                if (localJoinExchange())
-                     cctx.affinity().onJoin(this, msg);
++                    cctx.affinity().onLocalJoin(this, msg);
 +                else {
 +                    if (exchCtx.events().serverLeft())
 +                        cctx.affinity().mergeExchangesOnServerLeft(this, msg);
 +                    else
 +                        cctx.affinity().mergeExchangesOnServerJoin(this, false);
  
 -        if (localJoinExchange() && affReq != null)
 -            cctx.affinity().onLocalJoin(this, msg);
 +                    for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
 +                        if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
 +                            continue;
  
 -        updatePartitionFullMap(msg);
 +                        grp.topology().beforeExchange(this, true);
 +                    }
 +                }
 +            }
  
 -        IgniteCheckedException err = null;
 +            updatePartitionFullMap(msg);
  
 -        if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) {
 -            err = new IgniteCheckedException("Cluster state change failed");
 +            IgniteCheckedException err = null;
  
 -            cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest());
 -        }
 +            if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) {
 +                err = new IgniteCheckedException("Cluster state change failed");
  
 -        onDone(exchId.topologyVersion(), err);
 +                cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest());
 +            }
 +
 +            onDone(exchCtx.events().topologyVersion(), err);
 +        }
 +        catch (IgniteCheckedException e) {
 +            onDone(e);
 +        }
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------