You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/22 11:40:12 UTC
[23/50] [abbrv] ignite git commit: ignite-4154 Optimize amount of
data stored in discovery history Discovery history optimizations: - remove
discarded message for discovery pending messages - remove duplicated data
from TcpDiscoveryNodeAddedMessage.oldNo
ignite-4154 Optimize amount of data stored in discovery history
Discovery history optimizations:
- remove discarded message for discovery pending messages
- remove duplicated data from TcpDiscoveryNodeAddedMessage.oldNodesDiscoData
- do not store unnecessary data in discovery EnsuredMessageHistory
- use special property for EnsuredMessageHistory size instead of IGNITE_DISCOVERY_HISTORY_SIZE
Affinity history optimizations:
- do not store calculated primary/backup maps in history
- try save the same assignments instance for caches with similar affinity
Exchange messages optimizations:
- do not send duplicated partition state maps for caches with similar affinity
- use zip compression for data sent in exchange messages
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7128a395
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7128a395
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7128a395
Branch: refs/heads/master
Commit: 7128a395085b60e86436f807b4bdbca83627d41a
Parents: 8bb8bdd
Author: sboikov <sb...@gridgain.com>
Authored: Fri Nov 11 15:29:38 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Nov 11 15:29:38 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 4 +
.../processors/affinity/AffinityAssignment.java | 88 +++++
.../affinity/GridAffinityAssignment.java | 8 +-
.../affinity/GridAffinityAssignmentCache.java | 35 +-
.../affinity/GridAffinityProcessor.java | 89 ++++-
.../processors/affinity/GridAffinityUtils.java | 8 +-
.../affinity/HistoryAffinityAssignment.java | 169 ++++++++
.../cache/CacheAffinitySharedManager.java | 57 ++-
.../cache/DynamicCacheChangeBatch.java | 7 +
.../cache/GridCacheAffinityManager.java | 6 +-
.../GridCachePartitionExchangeManager.java | 284 ++++++++++++--
.../processors/cache/GridCacheProcessor.java | 5 +-
.../dht/GridClientPartitionTopology.java | 33 +-
.../dht/GridDhtPartitionTopology.java | 3 +-
.../dht/GridDhtPartitionTopologyImpl.java | 31 +-
.../dht/preloader/GridDhtPartitionFullMap.java | 18 +
.../dht/preloader/GridDhtPartitionMap2.java | 53 ++-
.../GridDhtPartitionsAbstractMessage.java | 40 +-
.../GridDhtPartitionsExchangeFuture.java | 84 +---
.../preloader/GridDhtPartitionsFullMessage.java | 150 ++++++-
.../GridDhtPartitionsSingleMessage.java | 132 ++++++-
.../GridDhtPartitionsSingleRequest.java | 4 +-
.../dht/preloader/GridDhtPreloader.java | 4 +-
.../continuous/GridContinuousProcessor.java | 4 +-
.../ignite/internal/util/IgniteUtils.java | 64 +++
.../ignite/spi/discovery/tcp/ClientImpl.java | 26 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 234 +++++++++--
.../TcpDiscoveryNodeAddFinishedMessage.java | 11 +
.../messages/TcpDiscoveryNodeAddedMessage.java | 33 +-
...CacheExchangeMessageDuplicatedStateTest.java | 393 +++++++++++++++++++
.../cache/IgniteCachePeekModesAbstractTest.java | 2 +-
.../distributed/IgniteCacheGetRestartTest.java | 3 +
...cingDelayedPartitionMapExchangeSelfTest.java | 8 +-
.../GridCacheRebalancingSyncSelfTest.java | 18 +-
.../GridCacheSyncReplicatedPreloadSelfTest.java | 3 -
.../IgniteCacheSyncRebalanceModeSelfTest.java | 4 +-
...ContinuousQueryFailoverAbstractSelfTest.java | 2 +-
.../IgniteNoCustomEventsOnNodeStart.java | 7 +
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 134 ++++++-
.../junits/common/GridCommonAbstractTest.java | 25 +-
.../testsuites/IgniteCacheTestSuite2.java | 3 +
.../cache/IgniteCacheOffheapEvictQueryTest.java | 7 +
...lientQueryReplicatedNodeRestartSelfTest.java | 7 +
...butedQueryStopOnCancelOrTimeoutSelfTest.java | 7 +
.../query/h2/sql/GridQueryParsingTest.java | 11 +-
.../src/test/config/incorrect-store-cache.xml | 2 +
.../src/test/config/jdbc-pojo-store-builtin.xml | 3 +
.../src/test/config/jdbc-pojo-store-obj.xml | 3 +
modules/spring/src/test/config/node.xml | 2 +
modules/spring/src/test/config/node1.xml | 2 +
.../test/config/pojo-incorrect-store-cache.xml | 2 +
modules/spring/src/test/config/store-cache.xml | 2 +
modules/spring/src/test/config/store-cache1.xml | 2 +
53 files changed, 2061 insertions(+), 275 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index ab6403f..a75027b 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -383,6 +383,10 @@ public final class IgniteSystemProperties {
/** Maximum size for discovery messages history. */
public static final String IGNITE_DISCOVERY_HISTORY_SIZE = "IGNITE_DISCOVERY_HISTORY_SIZE";
+ /** Maximum number of discovery message history used to support client reconnect. */
+ public static final String IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE =
+ "IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE";
+
/** Number of cache operation retries in case of topology exceptions. */
public static final String IGNITE_CACHE_RETRIES_COUNT = "IGNITE_CACHE_RETRIES_COUNT";
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
new file mode 100644
index 0000000..06207d3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cluster.ClusterNode;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Cached affinity calculations.
+ */
+public interface AffinityAssignment {
+ /**
+ * @return {@code True} if related discovery event did not not cause affinity assignment change and
+ * this assignment is just reference to the previous one.
+ */
+ public boolean clientEventChange();
+
+ /**
+ * @return Affinity assignment computed by affinity function.
+ */
+ public List<List<ClusterNode>> idealAssignment();
+
+ /**
+ * @return Affinity assignment.
+ */
+ public List<List<ClusterNode>> assignment();
+
+ /**
+ * @return Topology version.
+ */
+ public AffinityTopologyVersion topologyVersion();
+
+ /**
+ * Get affinity nodes for partition.
+ *
+ * @param part Partition.
+ * @return Affinity nodes.
+ */
+ public List<ClusterNode> get(int part);
+
+ /**
+ * Get affinity node IDs for partition.
+ *
+ * @param part Partition.
+ * @return Affinity nodes IDs.
+ */
+ public HashSet<UUID> getIds(int part);
+
+ /**
+ * @return Nodes having primary partitions assignments.
+ */
+ public Set<ClusterNode> primaryPartitionNodes();
+
+ /**
+ * Get primary partitions for specified node ID.
+ *
+ * @param nodeId Node ID to get primary partitions for.
+ * @return Primary partitions for specified node ID.
+ */
+ public Set<Integer> primaryPartitions(UUID nodeId);
+
+ /**
+ * Get backup partitions for specified node ID.
+ *
+ * @param nodeId Node ID to get backup partitions for.
+ * @return Backup partitions for specified node ID.
+ */
+ public Set<Integer> backupPartitions(UUID nodeId);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index 568e4e8..35130a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
/**
* Cached affinity calculations.
*/
-public class GridAffinityAssignment implements Serializable {
+public class GridAffinityAssignment implements AffinityAssignment, Serializable {
/** */
private static final long serialVersionUID = 0L;
@@ -86,7 +86,7 @@ public class GridAffinityAssignment implements Serializable {
this.topVer = topVer;
this.assignment = assignment;
- this.idealAssignment = idealAssignment;
+ this.idealAssignment = idealAssignment.equals(assignment) ? assignment : idealAssignment;
primary = new HashMap<>();
backup = new HashMap<>();
@@ -274,10 +274,10 @@ public class GridAffinityAssignment implements Serializable {
if (o == this)
return true;
- if (o == null || getClass() != o.getClass())
+ if (o == null || !(o instanceof AffinityAssignment))
return false;
- return topVer.equals(((GridAffinityAssignment)o).topVer);
+ return topVer.equals(((AffinityAssignment)o).topologyVersion());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a81b34d..a388c7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -78,7 +78,7 @@ public class GridAffinityAssignmentCache {
private final int partsCnt;
/** Affinity calculation results cache: topology version => partition => nodes. */
- private final ConcurrentNavigableMap<AffinityTopologyVersion, GridAffinityAssignment> affCache;
+ private final ConcurrentNavigableMap<AffinityTopologyVersion, HistoryAffinityAssignment> affCache;
/** */
private List<List<ClusterNode>> idealAssignment;
@@ -107,6 +107,9 @@ public class GridAffinityAssignmentCache {
/** Full history size. */
private final AtomicInteger fullHistSize = new AtomicInteger();
+ /** */
+ private final Object similarAffKey;
+
/**
* Constructs affinity cached calculations.
*
@@ -127,6 +130,7 @@ public class GridAffinityAssignmentCache {
{
assert ctx != null;
assert aff != null;
+ assert nodeFilter != null;
this.ctx = ctx;
this.aff = aff;
@@ -142,6 +146,17 @@ public class GridAffinityAssignmentCache {
partsCnt = aff.partitions();
affCache = new ConcurrentSkipListMap<>();
head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
+
+ similarAffKey = ctx.affinity().similaryAffinityKey(aff, nodeFilter, backups, partsCnt);
+
+ assert similarAffKey != null;
+ }
+
+ /**
+ * @return Key to find caches with similar affinity.
+ */
+ public Object similarAffinityKey() {
+ return similarAffKey;
}
/**
@@ -170,7 +185,7 @@ public class GridAffinityAssignmentCache {
GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment);
- affCache.put(topVer, assignment);
+ affCache.put(topVer, new HistoryAffinityAssignment(assignment));
head.set(assignment);
for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
@@ -300,7 +315,7 @@ public class GridAffinityAssignmentCache {
GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, aff);
- affCache.put(topVer, assignmentCpy);
+ affCache.put(topVer, new HistoryAffinityAssignment(assignmentCpy));
head.set(assignmentCpy);
for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
@@ -328,7 +343,7 @@ public class GridAffinityAssignmentCache {
* @return Affinity assignment.
*/
public List<List<ClusterNode>> assignments(AffinityTopologyVersion topVer) {
- GridAffinityAssignment aff = cachedAffinity(topVer);
+ AffinityAssignment aff = cachedAffinity(topVer);
return aff.assignment();
}
@@ -427,7 +442,7 @@ public class GridAffinityAssignmentCache {
* @param topVer Topology version.
* @return Cached affinity.
*/
- public GridAffinityAssignment cachedAffinity(AffinityTopologyVersion topVer) {
+ public AffinityAssignment cachedAffinity(AffinityTopologyVersion topVer) {
if (topVer.equals(AffinityTopologyVersion.NONE))
topVer = lastVersion();
else
@@ -435,7 +450,7 @@ public class GridAffinityAssignmentCache {
assert topVer.topologyVersion() >= 0 : topVer;
- GridAffinityAssignment cache = head.get();
+ AffinityAssignment cache = head.get();
if (!cache.topologyVersion().equals(topVer)) {
cache = affCache.get(topVer);
@@ -463,7 +478,7 @@ public class GridAffinityAssignmentCache {
* @return {@code True} if primary changed or required affinity version not found in history.
*/
public boolean primaryChanged(int part, AffinityTopologyVersion startVer, AffinityTopologyVersion endVer) {
- GridAffinityAssignment aff = affCache.get(startVer);
+ AffinityAssignment aff = affCache.get(startVer);
if (aff == null)
return false;
@@ -475,7 +490,7 @@ public class GridAffinityAssignmentCache {
ClusterNode primary = nodes.get(0);
- for (GridAffinityAssignment assignment : affCache.tailMap(startVer, false).values()) {
+ for (AffinityAssignment assignment : affCache.tailMap(startVer, false).values()) {
List<ClusterNode> nodes0 = assignment.assignment().get(part);
if (nodes0.isEmpty())
@@ -549,10 +564,10 @@ public class GridAffinityAssignmentCache {
}
if (rmvCnt > 0) {
- Iterator<GridAffinityAssignment> it = affCache.values().iterator();
+ Iterator<HistoryAffinityAssignment> it = affCache.values().iterator();
while (it.hasNext() && rmvCnt > 0) {
- GridAffinityAssignment aff0 = it.next();
+ AffinityAssignment aff0 = it.next();
it.remove();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 1726d02..b9182ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -385,10 +386,16 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
}
try {
+ AffinityAssignment assign0 = cctx.affinity().assignment(topVer);
+
+ GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ?
+ (GridAffinityAssignment)assign0 :
+ new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment());
+
AffinityInfo info = new AffinityInfo(
cctx.config().getAffinity(),
cctx.config().getAffinityMapper(),
- new GridAffinityAssignment(topVer, cctx.affinity().assignment(topVer)),
+ assign,
cctx.cacheObjectContext());
IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, new GridFinishedFuture<>(info));
@@ -562,6 +569,20 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
return nodes.iterator().next();
}
+ /**
+ * @param aff Affinity function.
+ * @param nodeFilter Node class.
+ * @param backups Number of backups.
+ * @param parts Number of partitions.
+ * @return Key to find caches with similar affinity.
+ */
+ public Object similaryAffinityKey(AffinityFunction aff,
+ IgnitePredicate<ClusterNode> nodeFilter,
+ int backups,
+ int parts) {
+ return new SimilarAffinityKey(aff.getClass(), nodeFilter.getClass(), backups, parts);
+ }
+
/** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>>");
@@ -960,4 +981,70 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
return aff;
}
}
+
+ /**
+ *
+ */
+ private static class SimilarAffinityKey {
+ /** */
+ private final int backups;
+
+ /** */
+ private final Class<?> affFuncCls;
+
+ /** */
+ private final Class<?> filterCls;
+
+ /** */
+ private final int partsCnt;
+
+ /** */
+ private final int hash;
+
+ /**
+ * @param affFuncCls Affinity function class.
+ * @param filterCls Node filter class.
+ * @param backups Number of backups.
+ * @param partsCnt Number of partitions.
+ */
+ SimilarAffinityKey(Class<?> affFuncCls, Class<?> filterCls, int backups, int partsCnt) {
+ this.backups = backups;
+ this.affFuncCls = affFuncCls;
+ this.filterCls = filterCls;
+ this.partsCnt = partsCnt;
+
+ int hash = backups;
+ hash = 31 * hash + affFuncCls.hashCode();
+ hash = 31 * hash + filterCls.hashCode();
+ hash= 31 * hash + partsCnt;
+
+ this.hash = hash;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return hash;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (o == this)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ SimilarAffinityKey key = (SimilarAffinityKey)o;
+
+ return backups == key.backups &&
+ affFuncCls == key.affFuncCls &&
+ filterCls == key.filterCls &&
+ partsCnt == key.partsCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SimilarAffinityKey.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
index c24dd2d..abd5292 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
@@ -180,10 +180,16 @@ class GridAffinityUtils {
cctx.affinity().affinityReadyFuture(topVer).get();
+ AffinityAssignment assign0 = cctx.affinity().assignment(topVer);
+
+ GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ?
+ (GridAffinityAssignment)assign0 :
+ new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment());
+
return F.t(
affinityMessage(ctx, cctx.config().getAffinity()),
affinityMessage(ctx, cctx.config().getAffinityMapper()),
- new GridAffinityAssignment(topVer, cctx.affinity().assignment(topVer)));
+ assign);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
new file mode 100644
index 0000000..e502dd5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ *
+ */
+public class HistoryAffinityAssignment implements AffinityAssignment {
+ /** */
+ private final AffinityTopologyVersion topVer;
+
+ /** */
+ private final List<List<ClusterNode>> assignment;
+
+ /** */
+ private final List<List<ClusterNode>> idealAssignment;
+
+ /** */
+ private final boolean clientEvtChange;
+
+ /**
+ * @param assign Assignment.
+ */
+ public HistoryAffinityAssignment(GridAffinityAssignment assign) {
+ this.topVer = assign.topologyVersion();
+ this.assignment = assign.assignment();
+ this.idealAssignment = assign.idealAssignment();
+ this.clientEvtChange = assign.clientEventChange();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean clientEventChange() {
+ return clientEvtChange;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> idealAssignment() {
+ return idealAssignment;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> assignment() {
+ return assignment;
+ }
+
+ /** {@inheritDoc} */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<ClusterNode> get(int part) {
+ assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
+ " [part=" + part + ", partitions=" + assignment.size() + ']';
+
+ return assignment.get(part);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HashSet<UUID> getIds(int part) {
+ assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
+ " [part=" + part + ", partitions=" + assignment.size() + ']';
+
+ List<ClusterNode> nodes = assignment.get(part);
+
+ HashSet<UUID> ids = U.newHashSet(nodes.size());
+
+ for (int i = 0; i < nodes.size(); i++)
+ ids.add(nodes.get(i).id());
+
+ return ids;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<ClusterNode> primaryPartitionNodes() {
+ Set<ClusterNode> res = new HashSet<>();
+
+ for (int p = 0; p < assignment.size(); p++) {
+ List<ClusterNode> nodes = assignment.get(p);
+
+ if (!F.isEmpty(nodes))
+ res.add(nodes.get(0));
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<Integer> primaryPartitions(UUID nodeId) {
+ Set<Integer> res = new HashSet<>();
+
+ for (int p = 0; p < assignment.size(); p++) {
+ List<ClusterNode> nodes = assignment.get(p);
+
+ if (!F.isEmpty(nodes) && nodes.get(0).id().equals(nodeId))
+ res.add(p);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<Integer> backupPartitions(UUID nodeId) {
+ Set<Integer> res = new HashSet<>();
+
+ for (int p = 0; p < assignment.size(); p++) {
+ List<ClusterNode> nodes = assignment.get(p);
+
+ for (int i = 1; i < nodes.size(); i++) {
+ ClusterNode node = nodes.get(i);
+
+ if (node.id().equals(nodeId)) {
+ res.add(p);
+
+ break;
+ }
+ }
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return topVer.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("SimplifiableIfStatement")
+ @Override public boolean equals(Object o) {
+ if (o == this)
+ return true;
+
+ if (o == null || !(o instanceof AffinityAssignment))
+ return false;
+
+ return topVer.equals(((AffinityAssignment)o).topologyVersion());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HistoryAffinityAssignment.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 1aedf4e..2890887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -127,7 +127,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param node Event node.
* @param topVer Topology version.
*/
- public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
+ void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
if (type == EVT_NODE_JOINED && node.isLocal()) {
// Clean-up in case of client reconnect.
registeredCaches.clear();
@@ -153,7 +153,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param msg Customer message.
* @return {@code True} if minor topology version should be increased.
*/
- public boolean onCustomEvent(CacheAffinityChangeMessage msg) {
+ boolean onCustomEvent(CacheAffinityChangeMessage msg) {
assert lateAffAssign : msg;
if (msg.exchangeId() != null) {
@@ -219,7 +219,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param top Topology.
* @param checkCacheId Cache ID.
*/
- public void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) {
+ void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) {
if (!lateAffAssign)
return;
@@ -508,6 +508,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert assignment != null;
+ final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
List<List<ClusterNode>> idealAssignment = aff.idealAssignment();
@@ -527,7 +529,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
else
newAssignment = idealAssignment;
- aff.initialize(topVer, newAssignment);
+ aff.initialize(topVer, cachedAssignment(aff, newAssignment, affCache));
}
});
}
@@ -562,6 +564,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
final Map<Integer, IgniteUuid> deploymentIds = msg.cacheDeploymentIds();
+ final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
AffinityTopologyVersion affTopVer = aff.lastVersion();
@@ -602,7 +606,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assignment.set(part, nodes);
}
- aff.initialize(topVer, assignment);
+ aff.initialize(topVer, cachedAssignment(aff, assignment, affCache));
}
else
aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer);
@@ -1206,6 +1210,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
throws IgniteCheckedException {
AffinityTopologyVersion topVer = fut.topologyVersion();
+ final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
if (!crd) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
@@ -1213,7 +1219,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
boolean latePrimary = cacheCtx.rebalanceEnabled();
- initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary);
+ initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache);
}
return null;
@@ -1227,7 +1233,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
boolean latePrimary = cache.rebalanceEnabled;
- initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary);
+ initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary, affCache);
}
});
@@ -1240,12 +1246,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param aff Affinity.
* @param rebalanceInfo Rebalance information.
* @param latePrimary If {@code true} delays primary assignment if it is not owner.
+ * @param affCache Already calculated assignments (to reduce data stored in history).
* @throws IgniteCheckedException If failed.
*/
private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut,
GridAffinityAssignmentCache aff,
WaitRebalanceInfo rebalanceInfo,
- boolean latePrimary)
+ boolean latePrimary,
+ Map<Object, List<List<ClusterNode>>> affCache)
throws IgniteCheckedException
{
assert lateAffAssign;
@@ -1292,7 +1300,26 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (newAssignment == null)
newAssignment = idealAssignment;
- aff.initialize(fut.topologyVersion(), newAssignment);
+ aff.initialize(fut.topologyVersion(), cachedAssignment(aff, newAssignment, affCache));
+ }
+
+ /**
+ * @param aff Assignment cache.
+ * @param assign Assignment.
+ * @param affCache Assignments already calculated for other caches.
+ * @return Assignment.
+ */
+ private List<List<ClusterNode>> cachedAssignment(GridAffinityAssignmentCache aff,
+ List<List<ClusterNode>> assign,
+ Map<Object, List<List<ClusterNode>>> affCache) {
+ List<List<ClusterNode>> assign0 = affCache.get(aff.similarAffinityKey());
+
+ if (assign0 != null && assign0.equals(assign))
+ assign = assign0;
+ else
+ affCache.put(aff.similarAffinityKey(), assign);
+
+ return assign;
}
/**
@@ -1367,7 +1394,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @return Affinity assignment.
* @throws IgniteCheckedException If failed.
*/
- public Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut)
+ private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut)
throws IgniteCheckedException {
final AffinityTopologyVersion topVer = fut.topologyVersion();
@@ -1554,7 +1581,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param aff Affinity cache.
* @param initAff Existing affinity cache.
*/
- public CacheHolder(boolean rebalanceEnabled, GridAffinityAssignmentCache aff, @Nullable GridAffinityAssignmentCache initAff) {
+ CacheHolder(boolean rebalanceEnabled,
+ GridAffinityAssignmentCache aff,
+ @Nullable GridAffinityAssignmentCache initAff) {
this.aff = aff;
if (initAff != null)
@@ -1606,7 +1635,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
/**
* Created cache is started on coordinator.
*/
- class CacheHolder1 extends CacheHolder {
+ private class CacheHolder1 extends CacheHolder {
/** */
private final GridCacheContext cctx;
@@ -1614,7 +1643,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param cctx Cache context.
* @param initAff Current affinity.
*/
- public CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) {
+ CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) {
super(cctx.rebalanceEnabled(), cctx.affinity().affinityCache(), initAff);
assert !cctx.isLocal() : cctx.name();
@@ -1651,7 +1680,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
/**
* Created if cache is not started on coordinator.
*/
- static class CacheHolder2 extends CacheHolder {
+ private static class CacheHolder2 extends CacheHolder {
/** */
private final GridCacheSharedContext cctx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index e10e5aa..4dcff9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -62,6 +62,13 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
}
/**
+ * @param id Message ID.
+ */
+ public void id(IgniteUuid id) {
+ this.id = id;
+ }
+
+ /**
* @return Collection of change requests.
*/
public Collection<DynamicCacheChangeRequest> requests() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 71ae5c9..c6e7ee6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -25,8 +25,8 @@ import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -48,7 +48,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
private static final AffinityTopologyVersion LOC_CACHE_TOP_VER = new AffinityTopologyVersion(1);
/** */
- public static final String FAILED_TO_FIND_CACHE_ERR_MSG = "Failed to find cache (cache was not started " +
+ private static final String FAILED_TO_FIND_CACHE_ERR_MSG = "Failed to find cache (cache was not started " +
"yet or cache was already stopped): ";
/** Affinity cached function. */
@@ -265,7 +265,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
* @param topVer Topology version.
* @return Affinity assignment.
*/
- public GridAffinityAssignment assignment(AffinityTopologyVersion topVer) {
+ public AffinityAssignment assignment(AffinityTopologyVersion topVer) {
if (cctx.isLocal())
topVer = LOC_CACHE_TOP_VER;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 00d2d16..503b334 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -44,7 +44,9 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -56,6 +58,7 @@ import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -64,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
@@ -71,6 +75,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridListSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -80,6 +85,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.GPC;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -531,8 +537,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (top != null)
return top;
+ Object affKey = null;
+
+ DynamicCacheDescriptor desc = cctx.cache().cacheDescriptor(cacheId);
+
+ if (desc != null) {
+ CacheConfiguration ccfg = desc.cacheConfiguration();
+
+ AffinityFunction aff = ccfg.getAffinity();
+
+ affKey = cctx.kernalContext().affinity().similaryAffinityKey(aff,
+ ccfg.getNodeFilter(),
+ ccfg.getBackups(),
+ aff.partitions());
+ }
+
GridClientPartitionTopology old = clientTops.putIfAbsent(cacheId,
- top = new GridClientPartitionTopology(cctx, cacheId, exchFut));
+ top = new GridClientPartitionTopology(cctx, cacheId, exchFut, affKey));
return old != null ? old : top;
}
@@ -761,40 +782,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param nodes Nodes.
* @return {@code True} if message was sent, {@code false} if node left grid.
*/
- private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) {
- GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE);
-
- boolean useOldApi = false;
-
- for (ClusterNode node : nodes) {
- if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
- useOldApi = true;
- }
-
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal() && cacheCtx.started()) {
- GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
-
- if (useOldApi) {
- locMap = new GridDhtPartitionFullMap(locMap.nodeId(),
- locMap.nodeOrder(),
- locMap.updateSequence(),
- locMap);
- }
-
- m.addFullPartitionsMap(cacheCtx.cacheId(), locMap);
- }
- }
-
- // It is important that client topologies be added after contexts.
- for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
- m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true));
+ private boolean sendAllPartitions(Collection<ClusterNode> nodes) {
+ GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, true);
if (log.isDebugEnabled())
log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
for (ClusterNode node : nodes) {
try {
+ assert !node.equals(cctx.localNode());
+
cctx.io().sendNoRetry(node, m, SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException ignore) {
@@ -811,31 +808,140 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param node Node.
- * @param id ID.
+ * @param nodes Target nodes.
+ * @param exchId Non-null exchange ID if message is created for exchange.
+ * @param lastVer Last version.
+ * @param compress {@code True} if it is possible to use compression for message.
+ * @return Message.
*/
- private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
- GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
- cctx.kernalContext().clientNode(),
- cctx.versions().last());
+ public GridDhtPartitionsFullMessage createPartitionsFullMessage(Collection<ClusterNode> nodes,
+ @Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable GridCacheVersion lastVer,
+ boolean compress) {
+ GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
+ lastVer,
+ exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
+
+ boolean useOldApi = false;
+
+ if (nodes != null) {
+ for (ClusterNode node : nodes) {
+ if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
+ useOldApi = true;
+ compress = false;
+
+ break;
+ }
+ else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0)
+ compress = false;
+ }
+ }
+
+ m.compress(compress);
+
+ Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (!cacheCtx.isLocal()) {
- GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap();
+ boolean ready;
- if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
- locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map());
+ if (exchId != null) {
+ AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
+
+ ready = startTopVer == null || startTopVer.compareTo(exchId.topologyVersion()) <= 0;
+ }
+ else
+ ready = cacheCtx.started();
+
+ if (ready) {
+ GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
- m.addLocalPartitionMap(cacheCtx.cacheId(), locMap);
+ if (useOldApi) {
+ locMap = new GridDhtPartitionFullMap(locMap.nodeId(),
+ locMap.nodeOrder(),
+ locMap.updateSequence(),
+ locMap);
+ }
+
+ addFullPartitionsMap(m,
+ dupData,
+ compress,
+ cacheCtx.cacheId(),
+ locMap,
+ cacheCtx.affinity().affinityCache().similarAffinityKey());
+
+ if (exchId != null)
+ m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
+ }
}
}
- for (GridClientPartitionTopology top : clientTops.values()) {
- GridDhtPartitionMap2 locMap = top.localPartitionMap();
+ // It is important that client topologies be added after contexts.
+ for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
+ GridDhtPartitionFullMap map = top.partitionMap(true);
+
+ addFullPartitionsMap(m,
+ dupData,
+ compress,
+ top.cacheId(),
+ map,
+ top.similarAffinityKey());
+
+ if (exchId != null)
+ m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters(true));
+ }
+
+ return m;
+ }
- m.addLocalPartitionMap(top.cacheId(), locMap);
+ /**
+ * @param m Message.
+ * @param dupData Duplicated data map.
+ * @param compress {@code True} if need check for duplicated partition state data.
+ * @param cacheId Cache ID.
+ * @param map Map to add.
+ * @param affKey Cache affinity key.
+ */
+ private void addFullPartitionsMap(GridDhtPartitionsFullMessage m,
+ Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData,
+ boolean compress,
+ Integer cacheId,
+ GridDhtPartitionFullMap map,
+ Object affKey) {
+ Integer dupDataCache = null;
+
+ if (compress && affKey != null && !m.containsCache(cacheId)) {
+ T2<Integer, GridDhtPartitionFullMap> state0 = dupData.get(affKey);
+
+ if (state0 != null && state0.get2().partitionStateEquals(map)) {
+ GridDhtPartitionFullMap map0 = new GridDhtPartitionFullMap(map.nodeId(),
+ map.nodeOrder(),
+ map.updateSequence());
+
+ for (Map.Entry<UUID, GridDhtPartitionMap2> e : map.entrySet())
+ map0.put(e.getKey(), e.getValue().emptyCopy());
+
+ map = map0;
+
+ dupDataCache = state0.get1();
+ }
+ else
+ dupData.put(affKey, new T2<>(cacheId, map));
}
+ m.addFullPartitionsMap(cacheId, map, dupDataCache);
+ }
+
+ /**
+ * @param node Node.
+ * @param id ID.
+ */
+ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
+ GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node,
+ id,
+ cctx.kernalContext().clientNode(),
+ false);
+
if (log.isDebugEnabled())
log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']');
@@ -853,6 +959,98 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * @param targetNode Target node.
+ * @param exchangeId ID.
+ * @param clientOnlyExchange Client exchange flag.
+ * @param sndCounters {@code True} if need send partition update counters.
+ * @return Message.
+ */
+ public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(ClusterNode targetNode,
+ @Nullable GridDhtPartitionExchangeId exchangeId,
+ boolean clientOnlyExchange,
+ boolean sndCounters)
+ {
+ boolean compress =
+ targetNode.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0;
+
+ GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId,
+ clientOnlyExchange,
+ cctx.versions().last(),
+ compress);
+
+ Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData = new HashMap<>();
+
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (!cacheCtx.isLocal()) {
+ GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap();
+
+ if (targetNode.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
+ locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map());
+
+ addPartitionMap(m,
+ dupData,
+ compress,
+ cacheCtx.cacheId(),
+ locMap,
+ cacheCtx.affinity().affinityCache().similarAffinityKey());
+
+ if (sndCounters)
+ m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
+ }
+ }
+
+ for (GridClientPartitionTopology top : clientTops.values()) {
+ if (m.partitions() != null && m.partitions().containsKey(top.cacheId()))
+ continue;
+
+ GridDhtPartitionMap2 locMap = top.localPartitionMap();
+
+ addPartitionMap(m,
+ dupData,
+ compress,
+ top.cacheId(),
+ locMap,
+ top.similarAffinityKey());
+
+ if (sndCounters)
+ m.partitionUpdateCounters(top.cacheId(), top.updateCounters(true));
+ }
+
+ return m;
+ }
+
+ /**
+ * @param m Message.
+ * @param dupData Duplicated data map.
+ * @param compress {@code True} if need check for duplicated partition state data.
+ * @param cacheId Cache ID.
+ * @param map Map to add.
+ * @param affKey Cache affinity key.
+ */
+ private void addPartitionMap(GridDhtPartitionsSingleMessage m,
+ Map<Object, T2<Integer, Map<Integer, GridDhtPartitionState>>> dupData,
+ boolean compress,
+ Integer cacheId,
+ GridDhtPartitionMap2 map,
+ Object affKey) {
+ Integer dupDataCache = null;
+
+ if (compress) {
+ T2<Integer, Map<Integer, GridDhtPartitionState>> state0 = dupData.get(affKey);
+
+ if (state0 != null && state0.get2().equals(map.map())) {
+ dupDataCache = state0.get1();
+
+ map = map.emptyCopy();
+ }
+ else
+ dupData.put(affKey, new T2<>(cacheId, map.map()));
+ }
+
+ m.addLocalPartitionMap(cacheId, map, dupDataCache);
+ }
+
+ /**
* @param nodeId Cause node ID.
* @param topVer Topology version.
* @param evt Event type.
@@ -869,7 +1067,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param affChangeMsg Affinity change message.
* @return Exchange future.
*/
- GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
+ private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
@Nullable DiscoveryEvent discoEvt,
@Nullable Collection<DynamicCacheChangeRequest> reqs,
@Nullable CacheAffinityChangeMessage affChangeMsg) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index fd6abbd..5e777fd 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1958,8 +1958,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.template(true);
- req.deploymentId(desc.deploymentId());
-
reqs.add(req);
}
@@ -1972,6 +1970,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
batch.clientReconnect(reconnect);
+ // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same.
+ batch.id(null);
+
return batch;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 58933b7..5efb317 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -61,6 +61,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** Flag to control amount of output for full map. */
private static final boolean FULL_MAP_DEBUG = false;
+ /** */
+ private static final Long ZERO = 0L;
+
/** Cache shared context. */
private GridCacheSharedContext cctx;
@@ -97,18 +100,24 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** Partition update counters. */
private Map<Integer, Long> cntrMap = new HashMap<>();
+ /** */
+ private final Object similarAffKey;
+
/**
* @param cctx Context.
* @param cacheId Cache ID.
* @param exchFut Exchange ID.
+ * @param similarAffKey Key to find caches with similar affinity.
*/
public GridClientPartitionTopology(
GridCacheSharedContext cctx,
int cacheId,
- GridDhtPartitionsExchangeFuture exchFut
+ GridDhtPartitionsExchangeFuture exchFut,
+ Object similarAffKey
) {
this.cctx = cctx;
this.cacheId = cacheId;
+ this.similarAffKey = similarAffKey;
topVer = exchFut.topologyVersion();
@@ -125,6 +134,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/**
+ * @return Key to find caches with similar affinity.
+ */
+ @Nullable public Object similarAffinityKey() {
+ return similarAffKey;
+ }
+
+ /**
* @return Full map string representation.
*/
@SuppressWarnings( {"ConstantConditions"})
@@ -873,11 +889,22 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public Map<Integer, Long> updateCounters() {
+ @Override public Map<Integer, Long> updateCounters(boolean skipZeros) {
lock.readLock().lock();
try {
- return new HashMap<>(cntrMap);
+ if (skipZeros) {
+ Map<Integer, Long> res = U.newHashMap(cntrMap.size());
+
+ for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+ if (!e.getValue().equals(ZERO))
+ res.put(e.getKey(), e.getValue());
+ }
+
+ return res;
+ }
+ else
+ return new HashMap<>(cntrMap);
}
finally {
lock.readLock().unlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 6e9b907..4ae4e47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -220,9 +220,10 @@ public interface GridDhtPartitionTopology {
@Nullable Map<Integer, Long> cntrMap);
/**
+ * @param skipZeros If {@code true} then filters out zero counters.
* @return Partition update counters.
*/
- public Map<Integer, Long> updateCounters();
+ public Map<Integer, Long> updateCounters(boolean skipZeros);
/**
* @param part Partition to own.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 50f7f0f..f3751ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -35,8 +35,8 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -71,6 +71,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** Flag to control amount of output for full map. */
private static final boolean FULL_MAP_DEBUG = false;
+ /** */
+ private static final Long ZERO = 0L;
+
/** Context. */
private final GridCacheContext<?, ?> cctx;
@@ -859,7 +862,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
- GridAffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
+ AffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
List<ClusterNode> affNodes = affAssignment.get(p);
@@ -1500,11 +1503,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public Map<Integer, Long> updateCounters() {
+ @Override public Map<Integer, Long> updateCounters(boolean skipZeros) {
lock.readLock().lock();
try {
- Map<Integer, Long> res = new HashMap<>(cntrMap);
+ Map<Integer, Long> res;
+
+ if (skipZeros) {
+ res = U.newHashMap(cntrMap.size());
+
+ for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+ Long cntr = e.getValue();
+
+ if (ZERO.equals(cntr))
+ continue;
+
+ res.put(e.getKey(), cntr);
+ }
+ }
+ else
+ res = new HashMap<>(cntrMap);
for (int i = 0; i < locParts.length; i++) {
GridDhtLocalPartition part = locParts[i];
@@ -1513,7 +1531,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
continue;
Long cntr0 = res.get(part.id());
- Long cntr1 = part.updateCounter();
+ long cntr1 = part.updateCounter();
+
+ if (skipZeros && cntr1 == 0L)
+ continue;
if (cntr0 == null || cntr1 > cntr0)
res.put(part.id(), cntr1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
index 498d492..8f5ad17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
@@ -158,6 +158,24 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
}
/**
+ * @param fullMap Map.
+ * @return {@code True} if this map and given map contain the same data.
+ */
+ public boolean partitionStateEquals(GridDhtPartitionFullMap fullMap) {
+ if (size() != fullMap.size())
+ return false;
+
+ for (Map.Entry<UUID, GridDhtPartitionMap2> e : entrySet()) {
+ GridDhtPartitionMap2 m = fullMap.get(e.getKey());
+
+ if (m == null || !m.map().equals(e.getValue().map()))
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
* @param updateSeq New update sequence value.
* @return Old update sequence value.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
index 15b5a2e..ce36a11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
@@ -61,27 +61,24 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
private volatile int moving;
/**
- * @param nodeId Node ID.
- * @param updateSeq Update sequence number.
+ * Empty constructor required for {@link Externalizable}.
*/
- public GridDhtPartitionMap2(UUID nodeId, long updateSeq) {
- assert nodeId != null;
- assert updateSeq > 0;
-
- this.nodeId = nodeId;
- this.updateSeq = updateSeq;
-
- map = new HashMap<>();
+ public GridDhtPartitionMap2() {
+ // No-op.
}
/**
* @param nodeId Node ID.
* @param updateSeq Update sequence number.
+ * @param top Topology version.
* @param m Map to copy.
* @param onlyActive If {@code true}, then only active states will be included.
*/
- public GridDhtPartitionMap2(UUID nodeId, long updateSeq, AffinityTopologyVersion top,
- Map<Integer, GridDhtPartitionState> m, boolean onlyActive) {
+ public GridDhtPartitionMap2(UUID nodeId,
+ long updateSeq,
+ AffinityTopologyVersion top,
+ Map<Integer, GridDhtPartitionState> m,
+ boolean onlyActive) {
assert nodeId != null;
assert updateSeq > 0;
@@ -100,10 +97,33 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
}
/**
- * Empty constructor required for {@link Externalizable}.
+ * @param nodeId Node ID.
+ * @param updateSeq Update sequence number.
+ * @param top Topology version.
+ * @param map Map.
+ * @param moving Number of moving partitions.
*/
- public GridDhtPartitionMap2() {
- // No-op.
+ private GridDhtPartitionMap2(UUID nodeId,
+ long updateSeq,
+ AffinityTopologyVersion top,
+ Map<Integer, GridDhtPartitionState> map,
+ int moving) {
+ this.nodeId = nodeId;
+ this.updateSeq = updateSeq;
+ this.top = top;
+ this.map = map;
+ this.moving = moving;
+ }
+
+ /**
+ * @return Copy with empty partition state map.
+ */
+ public GridDhtPartitionMap2 emptyCopy() {
+ return new GridDhtPartitionMap2(nodeId,
+ updateSeq,
+ top,
+ U.<Integer, GridDhtPartitionState>newHashMap(0),
+ 0);
}
/**
@@ -277,9 +297,8 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
long ver = in.readLong();
int minorVer = in.readInt();
- if (ver != 0) {
+ if (ver != 0)
top = new AffinityTopologyVersion(ver, minorVer);
- }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 4e714ed..6e69161 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -29,7 +30,13 @@ import org.jetbrains.annotations.Nullable;
/**
* Request for single partition info.
*/
-abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
+public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
+ /** */
+ public static final IgniteProductVersion PART_MAP_COMPRESS_SINCE = IgniteProductVersion.fromString("1.6.11");
+
+ /** */
+ protected static final byte COMPRESSED_FLAG_MASK = 1;
+
/** */
private static final long serialVersionUID = 0L;
@@ -39,6 +46,9 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
/** Last used cache version. */
private GridCacheVersion lastVer;
+ /** */
+ private byte flags;
+
/**
* Required by {@link Externalizable}.
*/
@@ -79,6 +89,20 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
return lastVer;
}
+ /**
+ * @return {@code True} if message data is compressed.
+ */
+ protected final boolean compressed() {
+ return (flags & COMPRESSED_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @param compressed {@code True} if message data is compressed.
+ */
+ protected final void compressed(boolean compressed) {
+ flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : (byte)(flags & ~COMPRESSED_FLAG_MASK);
+ }
+
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -101,6 +125,12 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
writer.incrementState();
case 4:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
if (!writer.writeMessage("lastVer", lastVer))
return false;
@@ -131,6 +161,14 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
reader.incrementState();
case 4:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
lastVer = reader.readMessage("lastVer");
if (!reader.isLastRead())
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 80b3768..f391265 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -64,6 +65,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -544,7 +546,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
if (updateTop && clientTop != null)
- cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters());
+ cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
}
top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId()));
@@ -668,7 +670,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (top.cacheId() == cacheCtx.cacheId()) {
cacheCtx.topology().update(exchId,
top.partitionMap(true),
- top.updateCounters());
+ top.updateCounters(false));
break;
}
@@ -678,7 +680,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
else {
if (!centralizedAff)
- sendLocalPartitions(crd, exchId);
+ sendLocalPartitions(crd);
initDone();
@@ -928,27 +930,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/**
* @param node Node.
- * @param id ID.
* @throws IgniteCheckedException If failed.
*/
- private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id)
+ private void sendLocalPartitions(ClusterNode node)
throws IgniteCheckedException {
- GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
+ GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage(node,
+ exchangeId(),
clientOnlyExchange,
- cctx.versions().last());
-
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal()) {
- GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap();
-
- if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
- locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map());
-
- m.addLocalPartitionMap(cacheCtx.cacheId(), locMap);
-
- m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
- }
- }
+ true);
if (log.isDebugEnabled())
log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']');
@@ -964,51 +953,16 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/**
* @param nodes Target nodes.
- * @return Message;
+ * @param compress {@code True} if it is possible to use compression for message.
+ * @return Message.
*/
- private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes) {
+ private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes, boolean compress) {
GridCacheVersion last = lastVer.get();
- GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchangeId(),
+ return cctx.exchange().createPartitionsFullMessage(nodes,
+ exchangeId(),
last != null ? last : cctx.versions().last(),
- topologyVersion());
-
- boolean useOldApi = false;
-
- if (nodes != null) {
- for (ClusterNode node : nodes) {
- if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
- useOldApi = true;
- }
- }
-
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal()) {
- AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
-
- boolean ready = startTopVer == null || startTopVer.compareTo(topologyVersion()) <= 0;
-
- if (ready) {
- GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
-
- if (useOldApi)
- locMap = new GridDhtPartitionFullMap(locMap.nodeId(), locMap.nodeOrder(), locMap.updateSequence(), locMap);
-
- m.addFullPartitionsMap(cacheCtx.cacheId(), locMap);
-
- m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
- }
- }
- }
-
- // It is important that client topologies be added after contexts.
- for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
- m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true));
-
- m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters());
- }
-
- return m;
+ compress);
}
/**
@@ -1016,7 +970,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @throws IgniteCheckedException If failed.
*/
private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException {
- GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes);
+ GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, true);
+
+ assert !nodes.contains(cctx.localNode());
if (log.isDebugEnabled())
log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
@@ -1030,7 +986,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
*/
private void sendPartitions(ClusterNode oldestNode) {
try {
- sendLocalPartitions(oldestNode, exchId);
+ sendLocalPartitions(oldestNode);
}
catch (ClusterTopologyCheckedException ignore) {
if (log.isDebugEnabled())
@@ -1234,7 +1190,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
- GridDhtPartitionsFullMessage m = createPartitionsMessage(null);
+ GridDhtPartitionsFullMessage m = createPartitionsMessage(null, false);
CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);