You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/02 14:10:41 UTC

[1/7] ignite git commit: ignite-4154

Repository: ignite
Updated Branches:
  refs/heads/ignite-4154-3 [created] e59a532e5


ignite-4154


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a8ed7a74
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a8ed7a74
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a8ed7a74

Branch: refs/heads/ignite-4154-3
Commit: a8ed7a740ebf86731831d0125db49f54cedd11d5
Parents: 76126bb
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 1 16:59:44 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 1 16:59:44 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 72 ++++++++++----------
 1 file changed, 35 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a8ed7a74/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0277061..d03ba5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2140,8 +2140,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             this.discardId = discardId;
             this.customDiscardId = customDiscardId;
-
-            cleanup();
         }
 
         /**
@@ -2163,41 +2161,41 @@ class ServerImpl extends TcpDiscoveryImpl {
          *
          */
         void cleanup() {
-//            Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
-//
-//            boolean skipMsg = discardId != null;
-//            boolean skipCustomMsg = customDiscardId != null;
-//
-//            while (msgIt.hasNext()) {
-//                TcpDiscoveryAbstractMessage msg0 = msgIt.next();
-//
-//                if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
-//                    if (skipCustomMsg) {
-//                        assert customDiscardId != null;
-//
-//                        if (F.eq(customDiscardId, msg0.id()))
-//                            skipCustomMsg = false;
-//                        else
-//                            msgIt.remove();
-//
-//                        continue;
-//                    }
-//                }
-//                else {
-//                    if (skipMsg) {
-//                        assert discardId != null;
-//
-//                        if (F.eq(discardId, msg0.id()))
-//                            skipMsg = false;
-//                        else
-//                            msgIt.remove();
-//
-//                        continue;
-//                    }
-//                }
-//
-//                break;
-//            }
+            Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+
+            boolean skipMsg = discardId != null;
+            boolean skipCustomMsg = customDiscardId != null;
+
+            while (msgIt.hasNext()) {
+                TcpDiscoveryAbstractMessage msg0 = msgIt.next();
+
+                if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
+                    if (skipCustomMsg) {
+                        assert customDiscardId != null;
+
+                        if (F.eq(customDiscardId, msg0.id()))
+                            skipCustomMsg = false;
+                        else
+                            msgIt.remove();
+
+                        continue;
+                    }
+                }
+                else {
+                    if (skipMsg) {
+                        assert discardId != null;
+
+                        if (F.eq(discardId, msg0.id()))
+                            skipMsg = false;
+                        else
+                            msgIt.remove();
+
+                        continue;
+                    }
+                }
+
+                break;
+            }
         }
 
         /**


[4/7] ignite git commit: ignite-4154

Posted by sb...@apache.org.
ignite-4154


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f74c9f4e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f74c9f4e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f74c9f4e

Branch: refs/heads/ignite-4154-3
Commit: f74c9f4e24dc49176fc19e84069bfc10c53133e5
Parents: 17b8291
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 2 09:57:25 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 2 09:57:25 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 53 ++++----------------
 .../messages/TcpDiscoveryNodeAddedMessage.java  |  7 +++
 2 files changed, 17 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f74c9f4e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 9179ddb..e182177 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2159,56 +2159,23 @@ class ServerImpl extends TcpDiscoveryImpl {
         void discard(IgniteUuid id, boolean custom) {
             if (custom)
                 customDiscardId = id;
-            else
+            else {
                 discardId = id;
 
-            cleanup();
-        }
-
-        /**
-         *
-         */
-        void cleanup() {
-            if (discardId != null && !msgs.containsKey(discardId))
-                return;
-
-            if (customDiscardId != null && !msgs.containsKey(customDiscardId))
-                return;
-
-            Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator();
-
-            boolean skipMsg = discardId != null;
-            boolean skipCustomMsg = customDiscardId != null;
+                TcpDiscoveryAbstractMessage msg = msgs.get(id);
 
-            while (msgIt.hasNext()) {
-                TcpDiscoveryAbstractMessage msg0 = msgIt.next();
+                if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                    TcpDiscoveryNodeAddedMessage msg0 = (TcpDiscoveryNodeAddedMessage)msg;
 
-                if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
-                    if (skipCustomMsg) {
-                        assert customDiscardId != null;
-
-                        if (F.eq(customDiscardId, msg0.id()))
-                            skipCustomMsg = false;
-                        else
-                            msgIt.remove();
-
-                        continue;
-                    }
+                    msg0.oldNodesDiscoveryData(null);
+                    msg0.newNodeDiscoveryData(null);
                 }
-                else {
-                    if (skipMsg) {
-                        assert discardId != null;
+                else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
+                    TcpDiscoveryNodeAddFinishedMessage msg0 = (TcpDiscoveryNodeAddFinishedMessage)msg;
 
-                        if (F.eq(discardId, msg0.id()))
-                            skipMsg = false;
-                        else
-                            msgIt.remove();
-
-                        continue;
-                    }
+                    msg0.clientDiscoData(null);
+                    msg0.clientNodeAttributes(null);
                 }
-
-                break;
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f74c9f4e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index bd52c04..7b8e5c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -229,6 +229,13 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * @param newNodeDiscoData Discovery data from new node.
+     */
+    public void newNodeDiscoveryData(Map<Integer, byte[]> newNodeDiscoData) {
+        this.newNodeDiscoData = newNodeDiscoData;
+    }
+
+    /**
      * @return Discovery data from old nodes.
      */
     public Map<UUID, Map<Integer, byte[]>> oldNodesDiscoveryData() {


[3/7] ignite git commit: ignite-4154

Posted by sb...@apache.org.
ignite-4154


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/17b82918
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/17b82918
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/17b82918

Branch: refs/heads/ignite-4154-3
Commit: 17b82918ad37c19fd6574ee1b5870c25fd9d540b
Parents: d4568ff
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 2 07:31:07 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 2 07:31:07 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 49 +++++++++++++-------
 .../TcpDiscoveryNodeAddFinishedMessage.java     | 11 +++++
 2 files changed, 42 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/17b82918/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index ee58421..9179ddb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -40,6 +40,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -1486,7 +1487,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 nodeAddedMsg.topology(topToSnd);
-                nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId);
+                nodeAddedMsg.messages(msgs != null ? new ArrayList<>(msgs) : msgs, discardMsgId, discardCustomMsgId);
 
                 Map<Long, Collection<ClusterNode>> hist;
 
@@ -1901,6 +1902,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                 TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg;
 
                 if (addFinishMsg.clientDiscoData() != null) {
+                    addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg);
+
+                    msg = addFinishMsg;
+
                     Map<UUID, Map<Integer, byte[]>> discoData = addFinishMsg.clientDiscoData();
 
                     Set<UUID> replaced = null;
@@ -1960,6 +1965,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     if (addFinishMsg.clientDiscoData() != null && clientId.equals(addFinishMsg.nodeId())) {
                         addFinishMsg.clientDiscoData(null);
+                        addFinishMsg.clientNodeAttributes(null);
 
                         break;
                     }
@@ -2091,7 +2097,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         private static final int MAX = 1024;
 
         /** Pending messages. */
-        private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+        private final LinkedHashMap<IgniteUuid, TcpDiscoveryAbstractMessage> msgs = U.newLinkedHashMap(MAX * 2);
 
         /** Processed custom message IDs. */
         private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<>(MAX * 2);
@@ -2109,10 +2115,10 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to add.
          */
         void add(TcpDiscoveryAbstractMessage msg) {
-            msgs.add(msg);
+            msgs.put(msg.id(), msg);
 
             while (msgs.size() > MAX) {
-                TcpDiscoveryAbstractMessage polled = msgs.poll();
+                TcpDiscoveryAbstractMessage polled = msgs.remove(msgs.keySet().iterator().next());
 
                 assert polled != null;
 
@@ -2135,8 +2141,10 @@ class ServerImpl extends TcpDiscoveryImpl {
         ) {
             this.msgs.clear();
 
-            if (msgs != null)
-                this.msgs.addAll(msgs);
+            if (msgs != null) {
+                for (TcpDiscoveryAbstractMessage msg : msgs)
+                    this.msgs.put(msg.id(), msg);
+            }
 
             this.discardId = discardId;
             this.customDiscardId = customDiscardId;
@@ -2148,21 +2156,26 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param id Discarded message ID.
          * @param custom {@code True} if discard for {@link TcpDiscoveryCustomEventMessage}.
          */
-        void discard(IgniteUuid id, boolean custom, boolean cleanup) {
+        void discard(IgniteUuid id, boolean custom) {
             if (custom)
                 customDiscardId = id;
             else
                 discardId = id;
 
-            if (cleanup)
-                cleanup();
+            cleanup();
         }
 
         /**
          *
          */
         void cleanup() {
-            Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+            if (discardId != null && !msgs.containsKey(discardId))
+                return;
+
+            if (customDiscardId != null && !msgs.containsKey(customDiscardId))
+                return;
+
+            Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator();
 
             boolean skipMsg = discardId != null;
             boolean skipCustomMsg = customDiscardId != null;
@@ -2219,7 +2232,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             private boolean skipCustomMsg = customDiscardId != null;
 
             /** Internal iterator. */
-            private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+            private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.values().iterator();
 
             /** Next message. */
             private TcpDiscoveryAbstractMessage next;
@@ -2837,7 +2850,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
                                     long tstamp = U.currentTimeMillis();
 
-                                    prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
+                                    prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs.values(),
                                         pendingMsgs.discardId, pendingMsgs.customDiscardId);
 
                                     if (timeoutHelper == null)
@@ -2881,8 +2894,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
                             }
                             else
-                                prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId,
-                                    pendingMsgs.customDiscardId);
+                                prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs.values(),
+                                    pendingMsgs.discardId, pendingMsgs.customDiscardId);
 
                             try {
                                 long tstamp = U.currentTimeMillis();
@@ -3045,8 +3058,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                         debugLog(msg, "Pending messages will be resent to local node");
 
                     for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
-                        prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
-                            pendingMsgs.customDiscardId);
+                        prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs.values(),
+                            pendingMsgs.discardId, pendingMsgs.customDiscardId);
 
                         pendingMsg.senderNodeId(locNodeId);
 
@@ -3106,7 +3119,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (pendingMsgs.msgs.isEmpty())
                 return false;
 
-            for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
+            for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs.values()) {
                 if (pendingMsg instanceof TcpDiscoveryNodeAddedMessage) {
                     TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg;
 
@@ -4933,7 +4946,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             if (msg.verified())
-                pendingMsgs.discard(msgId, msg.customMessageDiscard(), spiState == CONNECTED);
+                pendingMsgs.discard(msgId, msg.customMessageDiscard());
 
             if (ring.hasRemoteNodes())
                 sendMessageAcrossRing(msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/17b82918/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index 1b99a56..80f4565 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -59,6 +59,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
     }
 
     /**
+     * @param msg Message.
+     */
+    public TcpDiscoveryNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
+        super(msg);
+
+        nodeId = msg.nodeId;
+        clientDiscoData = msg.clientDiscoData;
+        clientNodeAttrs = msg.clientNodeAttrs;
+    }
+
+    /**
      * Gets ID of the node added.
      *
      * @return ID of the node added.


[7/7] ignite git commit: ignite-4154 zip

Posted by sb...@apache.org.
ignite-4154 zip


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e59a532e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e59a532e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e59a532e

Branch: refs/heads/ignite-4154-3
Commit: e59a532e59dbeeb089897e4f52a3daa78afc3923
Parents: d5d58f0
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 2 16:07:15 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 2 17:10:26 2016 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 15 +++++-
 .../GridDhtPartitionsAbstractMessage.java       | 34 ++++++++++++-
 .../GridDhtPartitionsExchangeFuture.java        | 14 +++++-
 .../preloader/GridDhtPartitionsFullMessage.java | 53 ++++++++++++++++----
 .../GridDhtPartitionsSingleMessage.java         | 52 ++++++++++++++-----
 .../GridDhtPartitionsSingleRequest.java         |  4 +-
 .../ignite/internal/util/IgniteUtils.java       | 52 +++++++++++++++++++
 7 files changed, 194 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e59a532e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index a901e2a..a81bf0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
@@ -765,12 +766,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE);
 
         boolean useOldApi = false;
+        boolean compress = true;
 
         for (ClusterNode node : nodes) {
-            if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
+            if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
                 useOldApi = true;
+                compress = false;
+
+                break;
+            }
+            else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0)
+                compress = false;
         }
 
+        m.compress(compress);
+
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal() && cacheCtx.started()) {
                 GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
@@ -817,7 +827,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
         GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
             cctx.kernalContext().clientNode(),
-            cctx.versions().last());
+            cctx.versions().last(),
+            node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0);
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e59a532e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 4e714ed..a3bb5f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
@@ -29,7 +30,13 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Request for single partition info.
  */
-abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
+public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
+    /** */
+    public static final IgniteProductVersion PART_MAP_COMPRESS_SINCE = IgniteProductVersion.fromString("1.6.11");
+
+    /** */
+    protected static final byte COMPRESSED_FLAG_MASK = 1;
+
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -39,6 +46,9 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
     /** Last used cache version. */
     private GridCacheVersion lastVer;
 
+    /** */
+    private byte flags;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -79,6 +89,14 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
         return lastVer;
     }
 
+    protected final boolean compressed() {
+        return (flags & COMPRESSED_FLAG_MASK) != 0;
+    }
+
+    protected final void compressed(boolean compressed) {
+        flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : (byte)(flags & ~COMPRESSED_FLAG_MASK);
+    }
+
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
@@ -101,6 +119,12 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
                 writer.incrementState();
 
             case 4:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
                 if (!writer.writeMessage("lastVer", lastVer))
                     return false;
 
@@ -131,6 +155,14 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
                 reader.incrementState();
 
             case 4:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
                 lastVer = reader.readMessage("lastVer");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/e59a532e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 80b3768..6a17583 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -935,7 +935,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         throws IgniteCheckedException {
         GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
             clientOnlyExchange,
-            cctx.versions().last());
+            cctx.versions().last(),
+            node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0);
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal()) {
@@ -974,14 +975,23 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             topologyVersion());
 
         boolean useOldApi = false;
+        boolean compress = true;
 
         if (nodes != null) {
             for (ClusterNode node : nodes) {
-                if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
+                if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
                     useOldApi = true;
+                    compress = false;
+
+                    break;
+                }
+                else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0)
+                    compress = false;
             }
         }
 
+        m.compress(compress);
+
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal()) {
                 AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e59a532e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index a4ff04b..ea51f6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -61,6 +61,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
+    /** */
+    @GridDirectTransient
+    private boolean compress;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -83,6 +87,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         this.topVer = topVer;
     }
 
+    public void compress(boolean compress) {
+        this.compress = compress;
+    }
+
     /**
      * @return Local partitions.
      */
@@ -137,6 +145,21 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         if (partCntrs != null && partCntrsBytes == null)
             partCntrsBytes = U.marshal(ctx, partCntrs);
+
+        if (compress && !compressed()) {
+            try {
+                byte[] partsBytesZip = U.zip(partsBytes);
+                byte[] partCntrsBytesZip = U.zip(partCntrsBytes);
+
+                partsBytes = partsBytesZip;
+                partCntrsBytes = partCntrsBytesZip;
+
+                compressed(true);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e);
+            }
+        }
     }
 
     /**
@@ -157,14 +180,22 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (partsBytes != null && parts == null)
-            parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        if (partsBytes != null && parts == null) {
+            if (compressed())
+                parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+            else
+                parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        }
 
         if (parts == null)
             parts = new HashMap<>();
 
-        if (partCntrsBytes != null && partCntrs == null)
-            partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        if (partCntrsBytes != null && partCntrs == null) {
+            if (compressed())
+                partCntrs = U.unmarshalZip(ctx.marshaller(), partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+            else
+                partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        }
 
         if (partCntrs == null)
             partCntrs = new HashMap<>();
@@ -185,19 +216,19 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         }
 
         switch (writer.state()) {
-            case 5:
+            case 6:
                 if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 6:
+            case 7:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 8:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -219,7 +250,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             return false;
 
         switch (reader.state()) {
-            case 5:
+            case 6:
                 partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
@@ -227,7 +258,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
                 reader.incrementState();
 
-            case 6:
+            case 7:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -235,7 +266,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
                 reader.incrementState();
 
-            case 7:
+            case 8:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -255,7 +286,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 8;
+        return 9;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e59a532e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index e4356b1..fdfc485 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -59,6 +59,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     /** */
     private boolean client;
 
+    /** */
+    private boolean compress;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -73,10 +76,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
      */
     public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId,
         boolean client,
-        @Nullable GridCacheVersion lastVer) {
+        @Nullable GridCacheVersion lastVer,
+        boolean compress) {
         super(exchId, lastVer);
 
         this.client = client;
+        this.compress = compress;
     }
 
     /**
@@ -141,17 +146,40 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
         if (partCntrsBytes == null && partCntrs != null)
             partCntrsBytes = U.marshal(ctx, partCntrs);
+
+        if (compress && !compressed()) {
+            try {
+                byte[] partsBytesZip = U.zip(partsBytes);
+                byte[] partCntrsBytesZip = U.zip(partCntrsBytes);
+
+                partsBytes = partsBytesZip;
+                partCntrsBytes = partCntrsBytesZip;
+
+                compressed(true);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e);
+            }
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (partsBytes != null && parts == null)
-            parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        if (partsBytes != null && parts == null) {
+            if (compressed())
+                parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+            else
+                parts =U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        }
 
-        if (partCntrsBytes != null && partCntrs == null)
-            partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        if (partCntrsBytes != null && partCntrs == null) {
+            if (compressed())
+                partCntrs = U.unmarshalZip(ctx.marshaller(), partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+            else
+                partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        }
     }
 
     /** {@inheritDoc} */
@@ -169,19 +197,19 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
         }
 
         switch (writer.state()) {
-            case 5:
+            case 6:
                 if (!writer.writeBoolean("client", client))
                     return false;
 
                 writer.incrementState();
 
-            case 6:
+            case 7:
                 if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 8:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -203,7 +231,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
             return false;
 
         switch (reader.state()) {
-            case 5:
+            case 6:
                 client = reader.readBoolean("client");
 
                 if (!reader.isLastRead())
@@ -211,7 +239,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
-            case 6:
+            case 7:
                 partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
@@ -219,7 +247,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
-            case 7:
+            case 8:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -239,7 +267,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 8;
+        return 9;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e59a532e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
index a4106af..850b6d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
@@ -81,11 +81,11 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 5;
+        return 6;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtPartitionsSingleRequest.class, this, super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e59a532e/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 1e8d648..da4edc0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.util;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.Externalizable;
@@ -128,6 +130,8 @@ import java.util.logging.Logger;
 import java.util.regex.Pattern;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
 import javax.management.DynamicMBean;
 import javax.management.JMException;
 import javax.management.MBeanServer;
@@ -9665,6 +9669,25 @@ public abstract class IgniteUtils {
         }
     }
 
+    public static <T> T unmarshalZip(Marshaller marsh, byte[] zipBytes, @Nullable ClassLoader clsLdr) throws IgniteCheckedException {
+        assert marsh != null;
+        assert zipBytes != null;
+
+        try {
+            ZipInputStream in = new ZipInputStream(new ByteArrayInputStream(zipBytes));
+
+            in.getNextEntry();
+
+            return marsh.unmarshal(in, clsLdr);
+        }
+        catch (IgniteCheckedException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
     /**
      * Unmarshals object from the input stream using given class loader.
      * This method should not close given input stream.
@@ -9880,4 +9903,33 @@ public abstract class IgniteUtils {
         if (oldName != curName)
             LOC_IGNITE_NAME.set(oldName);
     }
+
+    public static byte[] zip(@Nullable byte[] bytes) throws IgniteCheckedException {
+        try {
+            if (bytes == null)
+                return null;
+
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+            try (ZipOutputStream zos = new ZipOutputStream(bos)) {
+                ZipEntry entry = new ZipEntry("");
+
+                try {
+                    entry.setSize(bytes.length);
+
+                    zos.putNextEntry(entry);
+
+                    zos.write(bytes);
+                }
+                finally {
+                    zos.closeEntry();
+                }
+            }
+
+            return bos.toByteArray();
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
 }


[2/7] ignite git commit: ignite-4154

Posted by sb...@apache.org.
ignite-4154


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4568ff8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4568ff8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4568ff8

Branch: refs/heads/ignite-4154-3
Commit: d4568ff86cd6af332b1789b52e170f8906c5aee0
Parents: a8ed7a7
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 1 20:18:12 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 1 20:18:12 2016 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java  | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d4568ff8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index d03ba5b..ee58421 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2148,13 +2148,14 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param id Discarded message ID.
          * @param custom {@code True} if discard for {@link TcpDiscoveryCustomEventMessage}.
          */
-        void discard(IgniteUuid id, boolean custom) {
+        void discard(IgniteUuid id, boolean custom, boolean cleanup) {
             if (custom)
                 customDiscardId = id;
             else
                 discardId = id;
 
-            cleanup();
+            if (cleanup)
+                cleanup();
         }
 
         /**
@@ -4932,7 +4933,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             if (msg.verified())
-                pendingMsgs.discard(msgId, msg.customMessageDiscard());
+                pendingMsgs.discard(msgId, msg.customMessageDiscard(), spiState == CONNECTED);
 
             if (ring.hasRemoteNodes())
                 sendMessageAcrossRing(msg);


[5/7] ignite git commit: ignite-4154 affinity

Posted by sb...@apache.org.
ignite-4154 affinity


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/87b09dba
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/87b09dba
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/87b09dba

Branch: refs/heads/ignite-4154-3
Commit: 87b09dba85b5ed7996ba93a10ef4f28eb398c4a8
Parents: f74c9f4
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 2 11:44:24 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 2 14:00:33 2016 +0300

----------------------------------------------------------------------
 .../processors/affinity/AffinityAssignment.java |  88 ++++++++++
 .../affinity/GridAffinityAssignment.java        | 120 ++++++-------
 .../affinity/GridAffinityAssignmentCache.java   |  83 +++++++--
 .../affinity/GridAffinityProcessor.java         |   8 +-
 .../processors/affinity/GridAffinityUtils.java  |   8 +-
 .../affinity/HistoryAffinityAssignment.java     | 169 +++++++++++++++++++
 .../cache/CacheAffinitySharedManager.java       |  38 ++++-
 .../cache/GridCacheAffinityManager.java         |   4 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   4 +-
 .../dht/preloader/GridDhtPreloader.java         |   4 +-
 10 files changed, 433 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
new file mode 100644
index 0000000..06207d3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cluster.ClusterNode;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Cached affinity calculations.
+ */
+public interface AffinityAssignment {
+    /**
+     * @return {@code True} if related discovery event did not not cause affinity assignment change and
+     *    this assignment is just reference to the previous one.
+     */
+    public boolean clientEventChange();
+
+    /**
+     * @return Affinity assignment computed by affinity function.
+     */
+    public List<List<ClusterNode>> idealAssignment();
+
+    /**
+     * @return Affinity assignment.
+     */
+    public List<List<ClusterNode>> assignment();
+
+    /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion topologyVersion();
+
+    /**
+     * Get affinity nodes for partition.
+     *
+     * @param part Partition.
+     * @return Affinity nodes.
+     */
+    public List<ClusterNode> get(int part);
+
+    /**
+     * Get affinity node IDs for partition.
+     *
+     * @param part Partition.
+     * @return Affinity nodes IDs.
+     */
+    public HashSet<UUID> getIds(int part);
+
+    /**
+     * @return Nodes having primary partitions assignments.
+     */
+    public Set<ClusterNode> primaryPartitionNodes();
+
+    /**
+     * Get primary partitions for specified node ID.
+     *
+     * @param nodeId Node ID to get primary partitions for.
+     * @return Primary partitions for specified node ID.
+     */
+    public Set<Integer> primaryPartitions(UUID nodeId);
+
+    /**
+     * Get backup partitions for specified node ID.
+     *
+     * @param nodeId Node ID to get backup partitions for.
+     * @return Backup partitions for specified node ID.
+     */
+    public Set<Integer> backupPartitions(UUID nodeId);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index 568e4e8..2940d92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -27,12 +27,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * Cached affinity calculations.
  */
-public class GridAffinityAssignment implements Serializable {
+public class GridAffinityAssignment implements AffinityAssignment, Serializable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -86,7 +88,7 @@ public class GridAffinityAssignment implements Serializable {
 
         this.topVer = topVer;
         this.assignment = assignment;
-        this.idealAssignment = idealAssignment;
+        this.idealAssignment = idealAssignment.equals(assignment) ? assignment : idealAssignment;
 
         primary = new HashMap<>();
         backup = new HashMap<>();
@@ -139,96 +141,76 @@ public class GridAffinityAssignment implements Serializable {
         return topVer;
     }
 
-    /**
-     * Get affinity nodes for partition.
-     *
-     * @param part Partition.
-     * @return Affinity nodes.
-     */
-    public List<ClusterNode> get(int part) {
+    /** {@inheritDoc} */
+    @Override public List<ClusterNode> get(int part) {
         assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
-            " [part=" + part + ", partitions=" + assignment.size() + ']';
+                " [part=" + part + ", partitions=" + assignment.size() + ']';
 
         return assignment.get(part);
     }
 
-    /**
-     * Get affinity node IDs for partition.
-     *
-     * @param part Partition.
-     * @return Affinity nodes IDs.
-     */
-    public HashSet<UUID> getIds(int part) {
+    /** {@inheritDoc} */
+    @Override public HashSet<UUID> getIds(int part) {
         assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
-            " [part=" + part + ", partitions=" + assignment.size() + ']';
+                " [part=" + part + ", partitions=" + assignment.size() + ']';
 
-        List<HashSet<UUID>> assignmentIds0 = assignmentIds;
+        List<ClusterNode> nodes = assignment.get(part);
 
-        if (assignmentIds0 == null) {
-            assignmentIds0 = new ArrayList<>();
+        HashSet<UUID> ids = U.newHashSet(nodes.size());
 
-            for (List<ClusterNode> assignmentPart : assignment) {
-                HashSet<UUID> partIds = new HashSet<>();
+        for (int i = 0; i < nodes.size(); i++)
+            ids.add(nodes.get(i).id());
 
-                for (ClusterNode node : assignmentPart)
-                    partIds.add(node.id());
+        return ids;
+    }
 
-                assignmentIds0.add(partIds);
-            }
+    /** {@inheritDoc} */
+    @Override public Set<ClusterNode> primaryPartitionNodes() {
+        Set<ClusterNode> res = new HashSet<>();
+
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> nodes = assignment.get(p);
 
-            assignmentIds = assignmentIds0;
+            if (!F.isEmpty(nodes))
+                res.add(nodes.get(0));
         }
 
-        return assignmentIds0.get(part);
+        return res;
     }
 
-    /**
-     * @return Nodes having primary partitions assignments.
-     */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    public Set<ClusterNode> primaryPartitionNodes() {
-        Set<ClusterNode> primaryPartsNodes0 = primaryPartsNodes;
-
-        if (primaryPartsNodes0 == null) {
-            int parts = assignment.size();
-
-            primaryPartsNodes0 = new HashSet<>();
-
-            for (int p = 0; p < parts; p++) {
-                List<ClusterNode> nodes = assignment.get(p);
+    /** {@inheritDoc} */
+    @Override public Set<Integer> primaryPartitions(UUID nodeId) {
+        Set<Integer> res = new HashSet<>();
 
-                if (nodes.size() > 0)
-                    primaryPartsNodes0.add(nodes.get(0));
-            }
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> nodes = assignment.get(p);
 
-            primaryPartsNodes = primaryPartsNodes0;
+            if (!F.isEmpty(nodes) && nodes.get(0).id().equals(nodeId))
+                res.add(p);
         }
 
-        return primaryPartsNodes0;
+        return res;
     }
 
-    /**
-     * Get primary partitions for specified node ID.
-     *
-     * @param nodeId Node ID to get primary partitions for.
-     * @return Primary partitions for specified node ID.
-     */
-    public Set<Integer> primaryPartitions(UUID nodeId) {
-        Set<Integer> set = primary.get(nodeId);
+    /** {@inheritDoc} */
+    @Override public Set<Integer> backupPartitions(UUID nodeId) {
+        Set<Integer> res = new HashSet<>();
 
-        return set == null ? Collections.<Integer>emptySet() : set;
-    }
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> nodes = assignment.get(p);
 
-    /**
-     * Get backup partitions for specified node ID.
-     *
-     * @param nodeId Node ID to get backup partitions for.
-     * @return Backup partitions for specified node ID.
-     */
-    public Set<Integer> backupPartitions(UUID nodeId) {
-        Set<Integer> set = backup.get(nodeId);
+            for (int i = 1; i < nodes.size(); i++) {
+                ClusterNode node = nodes.get(i);
+
+                if (node.id().equals(nodeId)) {
+                    res.add(p);
+
+                    break;
+                }
+            }
+        }
 
-        return set == null ? Collections.<Integer>emptySet() : set;
+        return res;
     }
 
     /**
@@ -274,10 +256,10 @@ public class GridAffinityAssignment implements Serializable {
         if (o == this)
             return true;
 
-        if (o == null || getClass() != o.getClass())
+        if (o == null || !(o instanceof AffinityAssignment))
             return false;
 
-        return topVer.equals(((GridAffinityAssignment)o).topVer);
+        return topVer.equals(((AffinityAssignment)o).topologyVersion());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a81b34d..9166b31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -78,7 +78,7 @@ public class GridAffinityAssignmentCache {
     private final int partsCnt;
 
     /** Affinity calculation results cache: topology version => partition => nodes. */
-    private final ConcurrentNavigableMap<AffinityTopologyVersion, GridAffinityAssignment> affCache;
+    private final ConcurrentNavigableMap<AffinityTopologyVersion, HistoryAffinityAssignment> affCache;
 
     /** */
     private List<List<ClusterNode>> idealAssignment;
@@ -107,6 +107,9 @@ public class GridAffinityAssignmentCache {
     /** Full history size. */
     private final AtomicInteger fullHistSize = new AtomicInteger();
 
+    /** */
+    private final SimilarAffinityKey similarAffKey;
+
     /**
      * Constructs affinity cached calculations.
      *
@@ -127,6 +130,7 @@ public class GridAffinityAssignmentCache {
     {
         assert ctx != null;
         assert aff != null;
+        assert nodeFilter != null;
 
         this.ctx = ctx;
         this.aff = aff;
@@ -142,6 +146,12 @@ public class GridAffinityAssignmentCache {
         partsCnt = aff.partitions();
         affCache = new ConcurrentSkipListMap<>();
         head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
+
+        similarAffKey = new SimilarAffinityKey(aff.getClass(), nodeFilter.getClass(), backups, partsCnt);
+    }
+
+    public Object similarAffinityKey() {
+        return similarAffKey;
     }
 
     /**
@@ -170,7 +180,7 @@ public class GridAffinityAssignmentCache {
 
         GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment);
 
-        affCache.put(topVer, assignment);
+        affCache.put(topVer, new HistoryAffinityAssignment(assignment));
         head.set(assignment);
 
         for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
@@ -300,7 +310,7 @@ public class GridAffinityAssignmentCache {
 
         GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, aff);
 
-        affCache.put(topVer, assignmentCpy);
+        affCache.put(topVer, new HistoryAffinityAssignment(assignmentCpy));
         head.set(assignmentCpy);
 
         for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
@@ -328,7 +338,7 @@ public class GridAffinityAssignmentCache {
      * @return Affinity assignment.
      */
     public List<List<ClusterNode>> assignments(AffinityTopologyVersion topVer) {
-        GridAffinityAssignment aff = cachedAffinity(topVer);
+        AffinityAssignment aff = cachedAffinity(topVer);
 
         return aff.assignment();
     }
@@ -427,7 +437,7 @@ public class GridAffinityAssignmentCache {
      * @param topVer Topology version.
      * @return Cached affinity.
      */
-    public GridAffinityAssignment cachedAffinity(AffinityTopologyVersion topVer) {
+    public AffinityAssignment cachedAffinity(AffinityTopologyVersion topVer) {
         if (topVer.equals(AffinityTopologyVersion.NONE))
             topVer = lastVersion();
         else
@@ -435,7 +445,7 @@ public class GridAffinityAssignmentCache {
 
         assert topVer.topologyVersion() >= 0 : topVer;
 
-        GridAffinityAssignment cache = head.get();
+        AffinityAssignment cache = head.get();
 
         if (!cache.topologyVersion().equals(topVer)) {
             cache = affCache.get(topVer);
@@ -463,7 +473,7 @@ public class GridAffinityAssignmentCache {
      * @return {@code True} if primary changed or required affinity version not found in history.
      */
     public boolean primaryChanged(int part, AffinityTopologyVersion startVer, AffinityTopologyVersion endVer) {
-        GridAffinityAssignment aff = affCache.get(startVer);
+        AffinityAssignment aff = affCache.get(startVer);
 
         if (aff == null)
             return false;
@@ -475,7 +485,7 @@ public class GridAffinityAssignmentCache {
 
         ClusterNode primary = nodes.get(0);
 
-        for (GridAffinityAssignment assignment : affCache.tailMap(startVer, false).values()) {
+        for (AffinityAssignment assignment : affCache.tailMap(startVer, false).values()) {
             List<ClusterNode> nodes0 = assignment.assignment().get(part);
 
             if (nodes0.isEmpty())
@@ -549,10 +559,10 @@ public class GridAffinityAssignmentCache {
         }
 
         if (rmvCnt > 0) {
-            Iterator<GridAffinityAssignment> it = affCache.values().iterator();
+            Iterator<HistoryAffinityAssignment> it = affCache.values().iterator();
 
             while (it.hasNext() && rmvCnt > 0) {
-                GridAffinityAssignment aff0 = it.next();
+                AffinityAssignment aff0 = it.next();
 
                 it.remove();
 
@@ -602,4 +612,57 @@ public class GridAffinityAssignmentCache {
             return S.toString(AffinityReadyFuture.class, this);
         }
     }
+
+    /**
+     *
+     */
+    private static class SimilarAffinityKey {
+        /** */
+        private final int backups;
+
+        /** */
+        private final Class<?> affFuncCls;
+
+        /** */
+        private final Class<?> filterCls;
+
+        /** */
+        private final int partsCnt;
+
+        /** */
+        private final int hash;
+
+        public SimilarAffinityKey(Class<?> affFuncCls, Class<?> filterCls, int backups, int partsCnt) {
+            this.backups = backups;
+            this.affFuncCls = affFuncCls;
+            this.filterCls = filterCls;
+            this.partsCnt = partsCnt;
+
+            int hash = backups;
+            hash = 31 * hash + affFuncCls.hashCode();
+            hash = 31 * hash + filterCls.hashCode();
+            hash= 31 * hash + partsCnt;
+
+            this.hash = hash;
+        }
+
+        @Override public int hashCode() {
+            return hash;
+        }
+
+        @Override public boolean equals(Object o) {
+            if (o == this)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            SimilarAffinityKey key = (SimilarAffinityKey)o;
+
+            return backups == key.backups &&
+                affFuncCls == key.affFuncCls &&
+                filterCls == key.filterCls &&
+                partsCnt == key.partsCnt;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 1726d02..7c22ef5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -385,10 +385,16 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
             }
 
             try {
+                AffinityAssignment assign0 = cctx.affinity().assignment(topVer);
+
+                GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ?
+                        (GridAffinityAssignment)assign0 :
+                        new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment());
+
                 AffinityInfo info = new AffinityInfo(
                     cctx.config().getAffinity(),
                     cctx.config().getAffinityMapper(),
-                    new GridAffinityAssignment(topVer, cctx.affinity().assignment(topVer)),
+                    assign,
                     cctx.cacheObjectContext());
 
                 IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, new GridFinishedFuture<>(info));

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
index c24dd2d..abd5292 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
@@ -180,10 +180,16 @@ class GridAffinityUtils {
 
             cctx.affinity().affinityReadyFuture(topVer).get();
 
+            AffinityAssignment assign0 = cctx.affinity().assignment(topVer);
+
+            GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ?
+                (GridAffinityAssignment)assign0 :
+                new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment());
+
             return F.t(
                 affinityMessage(ctx, cctx.config().getAffinity()),
                 affinityMessage(ctx, cctx.config().getAffinityMapper()),
-                new GridAffinityAssignment(topVer, cctx.affinity().assignment(topVer)));
+                assign);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
new file mode 100644
index 0000000..e502dd5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ *
+ */
+public class HistoryAffinityAssignment implements AffinityAssignment {
+    /** */
+    private final AffinityTopologyVersion topVer;
+
+    /** */
+    private final List<List<ClusterNode>> assignment;
+
+    /** */
+    private final List<List<ClusterNode>> idealAssignment;
+
+    /** */
+    private final boolean clientEvtChange;
+
+    /**
+     * @param assign Assignment.
+     */
+    public HistoryAffinityAssignment(GridAffinityAssignment assign) {
+        this.topVer = assign.topologyVersion();
+        this.assignment = assign.assignment();
+        this.idealAssignment = assign.idealAssignment();
+        this.clientEvtChange = assign.clientEventChange();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean clientEventChange() {
+        return clientEvtChange;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> idealAssignment() {
+        return idealAssignment;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> assignment() {
+        return assignment;
+    }
+
+    /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<ClusterNode> get(int part) {
+        assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
+            " [part=" + part + ", partitions=" + assignment.size() + ']';
+
+        return assignment.get(part);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HashSet<UUID> getIds(int part) {
+        assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
+            " [part=" + part + ", partitions=" + assignment.size() + ']';
+
+        List<ClusterNode> nodes = assignment.get(part);
+
+        HashSet<UUID> ids = U.newHashSet(nodes.size());
+
+        for (int i = 0; i < nodes.size(); i++)
+            ids.add(nodes.get(i).id());
+
+        return ids;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<ClusterNode> primaryPartitionNodes() {
+        Set<ClusterNode> res = new HashSet<>();
+
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> nodes = assignment.get(p);
+
+            if (!F.isEmpty(nodes))
+                res.add(nodes.get(0));
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<Integer> primaryPartitions(UUID nodeId) {
+        Set<Integer> res = new HashSet<>();
+
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> nodes = assignment.get(p);
+
+            if (!F.isEmpty(nodes) && nodes.get(0).id().equals(nodeId))
+                res.add(p);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<Integer> backupPartitions(UUID nodeId) {
+        Set<Integer> res = new HashSet<>();
+
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> nodes = assignment.get(p);
+
+            for (int i = 1; i < nodes.size(); i++) {
+                ClusterNode node = nodes.get(i);
+
+                if (node.id().equals(nodeId)) {
+                    res.add(p);
+
+                    break;
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return topVer.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("SimplifiableIfStatement")
+    @Override public boolean equals(Object o) {
+        if (o == this)
+            return true;
+
+        if (o == null || !(o instanceof AffinityAssignment))
+            return false;
+
+        return topVer.equals(((AffinityAssignment)o).topologyVersion());
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HistoryAffinityAssignment.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 1aedf4e..88f1f97 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -508,6 +508,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         assert assignment != null;
 
+        final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
         forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                 List<List<ClusterNode>> idealAssignment = aff.idealAssignment();
@@ -527,7 +529,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 else
                     newAssignment = idealAssignment;
 
-                aff.initialize(topVer, newAssignment);
+                aff.initialize(topVer, cachedAssignment(aff, newAssignment, affCache));
             }
         });
     }
@@ -562,6 +564,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         final Map<Integer, IgniteUuid> deploymentIds = msg.cacheDeploymentIds();
 
+        final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
         forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                 AffinityTopologyVersion affTopVer = aff.lastVersion();
@@ -602,7 +606,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         assignment.set(part, nodes);
                     }
 
-                    aff.initialize(topVer, assignment);
+                    aff.initialize(topVer, cachedAssignment(aff, assignment, affCache));
                 }
                 else
                     aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer);
@@ -1206,6 +1210,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         throws IgniteCheckedException {
         AffinityTopologyVersion topVer = fut.topologyVersion();
 
+        final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
         if (!crd) {
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                 if (cacheCtx.isLocal())
@@ -1213,7 +1219,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 boolean latePrimary = cacheCtx.rebalanceEnabled();
 
-                initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary);
+                initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache);
             }
 
             return null;
@@ -1227,7 +1233,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                     boolean latePrimary = cache.rebalanceEnabled;
 
-                    initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary);
+                    initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary, affCache);
                 }
             });
 
@@ -1245,7 +1251,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut,
         GridAffinityAssignmentCache aff,
         WaitRebalanceInfo rebalanceInfo,
-        boolean latePrimary)
+        boolean latePrimary,
+        Map<Object, List<List<ClusterNode>>> affCache)
         throws IgniteCheckedException
     {
         assert lateAffAssign;
@@ -1292,7 +1299,26 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         if (newAssignment == null)
             newAssignment = idealAssignment;
 
-        aff.initialize(fut.topologyVersion(), newAssignment);
+        aff.initialize(fut.topologyVersion(), cachedAssignment(aff, newAssignment, affCache));
+    }
+
+    /**
+     * @param aff
+     * @param assign
+     * @param affCache
+     * @return
+     */
+    private List<List<ClusterNode>> cachedAssignment(GridAffinityAssignmentCache aff,
+        List<List<ClusterNode>> assign,
+        Map<Object, List<List<ClusterNode>>> affCache) {
+        List<List<ClusterNode>> assign0 = affCache.get(aff.similarAffinityKey());
+
+        if (assign0 != null && assign0.equals(assign))
+            assign = assign0;
+        else
+            affCache.put(aff.similarAffinityKey(), assign);
+
+        return assign;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 71ae5c9..6e5a28e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -25,8 +25,8 @@ import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -265,7 +265,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
      * @param topVer Topology version.
      * @return Affinity assignment.
      */
-    public GridAffinityAssignment assignment(AffinityTopologyVersion topVer) {
+    public AffinityAssignment assignment(AffinityTopologyVersion topVer) {
         if (cctx.isLocal())
             topVer = LOC_CACHE_TOP_VER;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 50f7f0f..871a084 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -35,8 +35,8 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -859,7 +859,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
-        GridAffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
+        AffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
 
         List<ClusterNode> affNodes = affAssignment.get(p);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/87b09dba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 09aec81..d6865c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -35,8 +35,8 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
 import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -600,7 +600,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                     log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer +
                         ", node=" + node + ']');
 
-                GridAffinityAssignment assignment = cctx.affinity().assignment(topVer);
+                AffinityAssignment assignment = cctx.affinity().assignment(topVer);
 
                 boolean newAffMode = node.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0;
 


[6/7] ignite git commit: Merge branch 'ignite-4154' into ignite-4154-2

Posted by sb...@apache.org.
Merge branch 'ignite-4154' into ignite-4154-2


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d5d58f0a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d5d58f0a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d5d58f0a

Branch: refs/heads/ignite-4154-3
Commit: d5d58f0af5339345f610edfae7be1e179ce16821
Parents: 87b09db 8c624a8
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 2 14:01:04 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 2 14:01:04 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/distributed/IgniteCacheGetRestartTest.java   | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------