You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2019/03/29 13:33:07 UTC
[ignite] branch master updated: IGNITE-11465 Multiple client
leave/join events may wipe affinity assignment history and cause
transactions fail - Fixes #6217.
This is an automated email from the ASF dual-hosted git repository.
irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e2c198d IGNITE-11465 Multiple client leave/join events may wipe affinity assignment history and cause transactions fail - Fixes #6217.
e2c198d is described below
commit e2c198d59842e048f04486affc29d64cc0e4299a
Author: Ivan Rakov <ir...@apache.org>
AuthorDate: Fri Mar 29 16:28:38 2019 +0300
IGNITE-11465 Multiple client leave/join events may wipe affinity assignment history and cause transactions fail - Fixes #6217.
Signed-off-by: Ivan Rakov <ir...@apache.org>
---
.../jol/GridAffinityAssignmentJolBenchmark.java | 5 +-
.../affinity/GridAffinityAssignmentCache.java | 108 +++++--
.../affinity/HistoryAffinityAssignment.java | 358 ++-------------------
...ent.java => HistoryAffinityAssignmentImpl.java} | 18 +-
.../HistoryAffinityAssignmentShallowCopy.java | 107 ++++++
.../cache/affinity/AffinityHistoryCleanupTest.java | 58 +++-
.../GridAffinityProcessorMemoryLeakTest.java | 27 +-
.../GridHistoryAffinityAssignmentTest.java | 4 +-
.../cache/CacheNoAffinityExchangeTest.java | 85 +++++
9 files changed, 381 insertions(+), 389 deletions(-)
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java
index b89cbe6..f154341 100644
--- a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentV2;
import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
import org.apache.ignite.internal.processors.affinity.HistoryAffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.HistoryAffinityAssignmentImpl;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
@@ -252,7 +253,7 @@ public class GridAffinityAssignmentJolBenchmark {
AffinityTopologyVersion topVer = new AffinityTopologyVersion(i + 1, 0);
GridAffinityAssignmentV2 a = new GridAffinityAssignmentV2(topVer, lateAssignmemnt, idealAssignment);
- HistoryAffinityAssignment h = new HistoryAffinityAssignment(a, backups);
+ HistoryAffinityAssignment h = new HistoryAffinityAssignmentImpl(a, backups);
if (!lateAssignmemnt.equals(h.assignment()))
throw new RuntimeException();
@@ -273,7 +274,7 @@ public class GridAffinityAssignmentJolBenchmark {
}
GridAffinityAssignmentV2 a0 = new GridAffinityAssignmentV2(topVer0, assignment, idealAssignment);
- HistoryAffinityAssignment h0 = new HistoryAffinityAssignment(a0, backups);
+ HistoryAffinityAssignment h0 = new HistoryAffinityAssignmentImpl(a0, backups);
if (!assignment.equals(h0.assignment()))
throw new RuntimeException();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 0335552..0f468ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -66,7 +66,10 @@ import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVE
*/
public class GridAffinityAssignmentCache {
/** Cleanup history size. */
- private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 500);
+ private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 50);
+
+ /** Cleanup history links size (calculated by both real entries and shallow copies). */
+ private final int MAX_HIST_LINKS_SIZE = MAX_HIST_SIZE * 10;
/** Partition distribution. */
private final float partDistribution = getFloat(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, 50f);
@@ -207,7 +210,9 @@ public class GridAffinityAssignmentCache {
GridAffinityAssignmentV2 assignment = new GridAffinityAssignmentV2(topVer, affAssignment, idealAssignment);
- HistoryAffinityAssignment hAff = affCache.put(topVer, new HistoryAffinityAssignment(assignment, backups));
+ HistoryAffinityAssignmentImpl newHistEntry = new HistoryAffinityAssignmentImpl(assignment, backups);
+
+ HistoryAffinityAssignment existing = affCache.put(topVer, newHistEntry);
head.set(assignment);
@@ -221,9 +226,7 @@ public class GridAffinityAssignmentCache {
}
}
- // In case if value was replaced there is no sense to clean the history.
- if (hAff == null)
- onHistoryAdded();
+ onHistoryAdded(existing, newHistEntry);
if (log.isTraceEnabled()) {
log.trace("New affinity assignment [grp=" + cacheOrGrpName
@@ -491,7 +494,17 @@ public class GridAffinityAssignmentCache {
GridAffinityAssignmentV2 assignmentCpy = new GridAffinityAssignmentV2(topVer, aff);
- HistoryAffinityAssignment hAff = affCache.put(topVer, new HistoryAffinityAssignment(assignmentCpy, backups));
+ AffinityTopologyVersion prevVer = topVer.minorTopologyVersion() == 0 ?
+ new AffinityTopologyVersion(topVer.topologyVersion() - 1, Integer.MAX_VALUE) :
+ new AffinityTopologyVersion(topVer.topologyVersion(), topVer.minorTopologyVersion() - 1);
+
+ Map.Entry<AffinityTopologyVersion, HistoryAffinityAssignment> prevHistEntry = affCache.floorEntry(prevVer);
+
+ HistoryAffinityAssignment newHistEntry = (prevHistEntry == null) ?
+ new HistoryAffinityAssignmentImpl(assignmentCpy, backups) :
+ new HistoryAffinityAssignmentShallowCopy(prevHistEntry.getValue().origin(), topVer);
+
+ HistoryAffinityAssignment existing = affCache.put(topVer, newHistEntry);
head.set(assignmentCpy);
@@ -505,9 +518,7 @@ public class GridAffinityAssignmentCache {
}
}
- // In case if value was replaced there is no sense to clean the history.
- if (hAff == null)
- onHistoryAdded();
+ onHistoryAdded(existing, newHistEntry);
}
/**
@@ -680,10 +691,14 @@ public class GridAffinityAssignmentCache {
/**
* Get cached affinity for specified topology version.
*
- * @param topVer Topology version.
+ * @param topVer Topology version for which affinity assignment is requested.
+ * @param lastAffChangeTopVer Topology version of last affinity assignment change.
* @return Cached affinity.
*/
- public AffinityAssignment cachedAffinity(AffinityTopologyVersion topVer, AffinityTopologyVersion lastAffChangeTopVer) {
+ public AffinityAssignment cachedAffinity(
+ AffinityTopologyVersion topVer,
+ AffinityTopologyVersion lastAffChangeTopVer
+ ) {
if (topVer.equals(AffinityTopologyVersion.NONE))
topVer = lastAffChangeTopVer = lastVersion();
else {
@@ -705,11 +720,23 @@ public class GridAffinityAssignmentCache {
if (e != null)
cache = e.getValue();
- if (cache == null || cache.topologyVersion().compareTo(topVer) > 0) {
+ if (cache == null) {
+ throw new IllegalStateException("Getting affinity for too old topology version that is already " +
+ "out of history [locNode=" + ctx.discovery().localNode() +
+ ", grp=" + cacheOrGrpName +
+ ", topVer=" + topVer +
+ ", lastAffChangeTopVer=" + lastAffChangeTopVer +
+ ", head=" + head.get().topologyVersion() +
+ ", history=" + affCache.keySet() +
+ ']');
+ }
+
+ if (cache.topologyVersion().compareTo(topVer) > 0) {
throw new IllegalStateException("Getting affinity for topology version earlier than affinity is " +
"calculated [locNode=" + ctx.discovery().localNode() +
", grp=" + cacheOrGrpName +
", topVer=" + topVer +
+ ", lastAffChangeTopVer=" + lastAffChangeTopVer +
", head=" + head.get().topologyVersion() +
", history=" + affCache.keySet() +
']');
@@ -810,23 +837,62 @@ public class GridAffinityAssignmentCache {
/**
* Cleaning the affinity history.
+ *
+ * @param replaced Replaced entry in case history item was already present, null otherwise.
+ * @param added New history item.
*/
- private void onHistoryAdded() {
- if (fullHistSize.incrementAndGet() > MAX_HIST_SIZE) {
- Iterator<HistoryAffinityAssignment> it = affCache.values().iterator();
+ private void onHistoryAdded(
+ HistoryAffinityAssignment replaced,
+ HistoryAffinityAssignment added
+ ) {
+ boolean cleanupNeeded = false;
- int rmvCnt = MAX_HIST_SIZE / 2;
+ if (replaced == null) {
+ cleanupNeeded = true;
+
+ if (added.requiresHistoryCleanup())
+ fullHistSize.incrementAndGet();
+ }
+ else {
+ if (replaced.requiresHistoryCleanup() != added.requiresHistoryCleanup()) {
+ if (added.requiresHistoryCleanup()) {
+ cleanupNeeded = true;
+
+ fullHistSize.incrementAndGet();
+ }
+ else
+ fullHistSize.decrementAndGet();
+ }
+ }
+
+ if (!cleanupNeeded)
+ return;
+
+ int fullSize = fullHistSize.get();
+
+ int linksSize = affCache.size();
+
+ int fullRmvCnt = fullSize > MAX_HIST_SIZE ? (MAX_HIST_SIZE / 2) : 0;
+
+ int linksRmvCnt = linksSize > MAX_HIST_LINKS_SIZE ? (MAX_HIST_LINKS_SIZE / 2) : 0;
+
+ if (fullRmvCnt > 0 || linksRmvCnt > 0) {
+ Iterator<HistoryAffinityAssignment> it = affCache.values().iterator();
AffinityTopologyVersion topVerRmv = null;
- while (it.hasNext() && rmvCnt > 0) {
- AffinityAssignment aff0 = it.next();
+ while (it.hasNext() && (fullRmvCnt > 0 || linksRmvCnt > 0)) {
+ HistoryAffinityAssignment aff0 = it.next();
- it.remove();
+ if (aff0.requiresHistoryCleanup()) { // Don't decrement counter in case of fullHistoryCleanupRequired copy remove.
+ fullRmvCnt--;
+
+ fullHistSize.decrementAndGet();
+ }
- rmvCnt--;
+ linksRmvCnt--;
- fullHistSize.decrementAndGet();
+ it.remove();
topVerRmv = aff0.topologyVersion();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
index c6c0783..19bd88c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
@@ -1,343 +1,37 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.apache.ignite.internal.processors.affinity;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.util.BitSetIntSet;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
/**
- *
+ * Interface for historical calculated affinity assignment.
*/
-@SuppressWarnings("ForLoopReplaceableByForEach")
-public class HistoryAffinityAssignment implements AffinityAssignment {
- /** */
- private final AffinityTopologyVersion topVer;
-
- /** */
- private final List<List<ClusterNode>> assignment;
-
- /** */
- private final List<List<ClusterNode>> idealAssignment;
-
- /** */
- private final ClusterNode[] nodes;
-
- /** Ideal assignments are stored as sequences of indexes in nodes array. */
- private final char[] idealParts;
-
- /** Diff with ideal. */
- private final Map<Integer, char[]> assignmentDiff;
-
+public interface HistoryAffinityAssignment extends AffinityAssignment {
/**
- * @param assign Assignment.
- * @param backups Backups.
+ * Should return true if instance is "heavy" and should be taken into account during history size management.
+ *
+ * @return <code>true</code> if adding this instance to history should trigger size check and possible cleanup.
*/
- public HistoryAffinityAssignment(AffinityAssignment assign, int backups) {
- topVer = assign.topologyVersion();
-
- if (IGNITE_DISABLE_AFFINITY_MEMORY_OPTIMIZATION || backups > IGNITE_AFFINITY_BACKUPS_THRESHOLD) {
- assignment = assign.assignment();
-
- idealAssignment = assign.idealAssignment();
-
- nodes = null;
-
- idealParts = null;
-
- assignmentDiff = null;
-
- return;
- }
-
- List<List<ClusterNode>> assignment = assign.assignment();
- List<List<ClusterNode>> idealAssignment = assign.idealAssignment();
-
- int min = Integer.MAX_VALUE;
- int max = 0;
-
- for (List<ClusterNode> nodes : idealAssignment) { // Estimate required size.
- int size = nodes.size();
-
- if (size > max)
- max = size;
-
- if (size < min)
- min = size;
- }
-
- if (max != min) {
- this.assignment = assign.assignment();
-
- this.idealAssignment = assign.idealAssignment();
-
- nodes = null;
-
- idealParts = null;
-
- assignmentDiff = null;
-
- return;
- }
-
- int cpys = max;
-
- boolean same = assignment == idealAssignment;
-
- int partsCnt = assignment.size();
-
- idealParts = new char[partsCnt * cpys];
-
- Map<ClusterNode, Character> orderMap = new HashMap<>();
-
- char order = 1; // Char type is used as unsigned short to avoid conversions.
-
- assignmentDiff = new HashMap<>();
-
- for (int p = 0; p < assignment.size(); p++) {
- List<ClusterNode> curr = assignment.get(p);
- List<ClusterNode> ideal = idealAssignment.get(p);
-
- for (int i = 0; i < ideal.size(); i++) {
- ClusterNode node = ideal.get(i);
-
- Character nodeOrder = orderMap.get(node);
-
- if (nodeOrder == null)
- orderMap.put(node, (nodeOrder = order++));
-
- idealParts[p * cpys + i] = nodeOrder;
- }
-
- if (!same && !curr.equals(ideal)) {
- char[] idx = new char[curr.size()];
-
- assignmentDiff.put(p, idx);
-
- for (int i = 0; i < curr.size(); i++) {
- ClusterNode node = curr.get(i);
-
- Character nodeOrder = orderMap.get(node);
-
- if (nodeOrder == null)
- orderMap.put(node, (nodeOrder = order++));
-
- idx[i] = nodeOrder;
- }
- }
- }
-
- // Fill array according to assigned order.
- nodes = orderMap.keySet().stream().toArray(ClusterNode[]::new);
-
- Arrays.sort(nodes, (o1, o2) -> orderMap.get(o1).compareTo(orderMap.get(o2)));
-
- this.idealAssignment = new AbstractList<List<ClusterNode>>() {
- @Override public List<ClusterNode> get(int idx) {
- return partitionNodes(idx, true, cpys);
- }
-
- @Override public int size() {
- return partsCnt;
- }
- };
-
- this.assignment = same ? this.idealAssignment : new AbstractList<List<ClusterNode>>() {
- @Override public List<ClusterNode> get(int idx) {
- return partitionNodes(idx, false, cpys);
- }
-
- @Override public int size() {
- return partsCnt;
- }
- };
-
- assert this.assignment.equals(assign.assignment()) : "new=" + this.assignment + ", old=" + assign.assignment();
-
- assert this.idealAssignment.equals(assign.idealAssignment()) :
- "new=" + this.idealAssignment + ", old=" + assign.idealAssignment();
- }
+ public boolean requiresHistoryCleanup();
/**
- * @param p Partion.
- * @param ideal {@code True} for ideal assignment.
- * @param cpys Copies.
+ * In case this instance is lightweight wrapper of another instance, this method should return reference
+ * to an original one. Otherwise, it should return <code>this</code> reference.
+ *
+ * @return Original instance of <code>this</code> if not applicable.
*/
- private List<ClusterNode> partitionNodes(int p, boolean ideal, int cpys) {
- char[] order;
-
- if (!ideal && (order = assignmentDiff.get(p)) != null) {
- List<ClusterNode> ret = new ArrayList<>(order.length);
-
- for (int i = 0; i < order.length; i++)
- ret.add(nodes[order[i] - 1]);
-
- return ret;
- }
-
- List<ClusterNode> ret = new ArrayList<>(cpys);
-
- for (int i = 0; i < cpys; i++) {
- char ord = idealParts[p * cpys + i];
-
- if (ord == 0) // Zero
- break;
-
- ret.add(nodes[ord - 1]);
- }
-
- return ret;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public List<List<ClusterNode>> idealAssignment() {
- return idealAssignment;
- }
-
- /** {@inheritDoc} */
- @Override public List<List<ClusterNode>> assignment() {
- return assignment;
- }
-
- /** {@inheritDoc} */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /** {@inheritDoc} */
- @Override public List<ClusterNode> get(int part) {
- assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
- " [part=" + part + ", partitions=" + assignment.size() + ']';
-
- return assignment.get(part);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<UUID> getIds(int part) {
- assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
- " [part=" + part + ", partitions=" + assignment.size() + ']';
-
- if (IGNITE_DISABLE_AFFINITY_MEMORY_OPTIMIZATION)
- return assignments2ids(assignment.get(part));
- else {
- List<ClusterNode> nodes = assignment.get(part);
-
- return nodes.size() > AffinityAssignment.IGNITE_AFFINITY_BACKUPS_THRESHOLD
- ? assignments2ids(nodes)
- : F.viewReadOnly(nodes, F.node2id());
- }
- }
-
- /** {@inheritDoc} */
- @Override public Set<ClusterNode> nodes() {
- Set<ClusterNode> res = new HashSet<>();
-
- for (int p = 0; p < assignment.size(); p++) {
- List<ClusterNode> nodes = assignment.get(p);
-
- if (!F.isEmpty(nodes))
- res.addAll(nodes);
- }
-
- return Collections.unmodifiableSet(res);
- }
-
- /** {@inheritDoc} */
- @Override public Set<ClusterNode> primaryPartitionNodes() {
- Set<ClusterNode> res = new HashSet<>();
-
- for (int p = 0; p < assignment.size(); p++) {
- List<ClusterNode> nodes = assignment.get(p);
-
- if (!F.isEmpty(nodes))
- res.add(nodes.get(0));
- }
-
- return Collections.unmodifiableSet(res);
- }
-
- /** {@inheritDoc} */
- @Override public Set<Integer> primaryPartitions(UUID nodeId) {
- Set<Integer> res = IGNITE_DISABLE_AFFINITY_MEMORY_OPTIMIZATION ? new HashSet<>() : new BitSetIntSet();
-
- for (int p = 0; p < assignment.size(); p++) {
- List<ClusterNode> nodes = assignment.get(p);
-
- if (!F.isEmpty(nodes) && nodes.get(0).id().equals(nodeId))
- res.add(p);
- }
-
- return Collections.unmodifiableSet(res);
- }
-
- /** {@inheritDoc} */
- @Override public Set<Integer> backupPartitions(UUID nodeId) {
- Set<Integer> res = IGNITE_DISABLE_AFFINITY_MEMORY_OPTIMIZATION ? new HashSet<>() : new BitSetIntSet();
-
- for (int p = 0; p < assignment.size(); p++) {
- List<ClusterNode> nodes = assignment.get(p);
-
- for (int i = 1; i < nodes.size(); i++) {
- ClusterNode node = nodes.get(i);
-
- if (node.id().equals(nodeId)) {
- res.add(p);
-
- break;
- }
- }
- }
-
- return Collections.unmodifiableSet(res);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return topVer.hashCode();
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("SimplifiableIfStatement")
- @Override public boolean equals(Object o) {
- if (o == this)
- return true;
-
- if (o == null || !(o instanceof AffinityAssignment))
- return false;
-
- return topVer.equals(((AffinityAssignment)o).topologyVersion());
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HistoryAffinityAssignment.class, this);
- }
+ public HistoryAffinityAssignment origin();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java
similarity index 95%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java
index c6c0783..1eda706 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java
@@ -34,10 +34,10 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
/**
- *
+ * Heap-space optimized version of calculated affinity assignment.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
-public class HistoryAffinityAssignment implements AffinityAssignment {
+public class HistoryAffinityAssignmentImpl implements HistoryAffinityAssignment {
/** */
private final AffinityTopologyVersion topVer;
@@ -60,7 +60,7 @@ public class HistoryAffinityAssignment implements AffinityAssignment {
* @param assign Assignment.
* @param backups Backups.
*/
- public HistoryAffinityAssignment(AffinityAssignment assign, int backups) {
+ public HistoryAffinityAssignmentImpl(AffinityAssignment assign, int backups) {
topVer = assign.topologyVersion();
if (IGNITE_DISABLE_AFFINITY_MEMORY_OPTIMIZATION || backups > IGNITE_AFFINITY_BACKUPS_THRESHOLD) {
@@ -320,6 +320,16 @@ public class HistoryAffinityAssignment implements AffinityAssignment {
}
/** {@inheritDoc} */
+ @Override public boolean requiresHistoryCleanup() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HistoryAffinityAssignment origin() {
+ return this;
+ }
+
+ /** {@inheritDoc} */
@Override public int hashCode() {
return topVer.hashCode();
}
@@ -338,6 +348,6 @@ public class HistoryAffinityAssignment implements AffinityAssignment {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(HistoryAffinityAssignment.class, this);
+ return S.toString(HistoryAffinityAssignmentImpl.class, this);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java
new file mode 100644
index 0000000..4fcea72
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java
@@ -0,0 +1,107 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.affinity;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Shallow copy that contains reference to delegate {@link HistoryAffinityAssignment}.
+ */
+public class HistoryAffinityAssignmentShallowCopy implements HistoryAffinityAssignment {
+ /** History assignment. */
+ private final HistoryAffinityAssignment histAssignment;
+
+ /** Topology version. */
+ private final AffinityTopologyVersion topVer;
+
+ /**
+ * @param histAssignment History assignment.
+ * @param topVer Topology version.
+ */
+ public HistoryAffinityAssignmentShallowCopy(
+ HistoryAffinityAssignment histAssignment,
+ AffinityTopologyVersion topVer
+ ) {
+ this.histAssignment = histAssignment;
+ this.topVer = topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean requiresHistoryCleanup() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> idealAssignment() {
+ return histAssignment.idealAssignment();
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> assignment() {
+ return histAssignment.assignment();
+ }
+
+ /** {@inheritDoc} */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<ClusterNode> get(int part) {
+ return histAssignment.get(part);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<UUID> getIds(int part) {
+ return histAssignment.getIds(part);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<ClusterNode> nodes() {
+ return histAssignment.nodes();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<ClusterNode> primaryPartitionNodes() {
+ return histAssignment.primaryPartitionNodes();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<Integer> primaryPartitions(UUID nodeId) {
+ return histAssignment.primaryPartitions(nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<Integer> backupPartitions(UUID nodeId) {
+ return histAssignment.backupPartitions(nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HistoryAffinityAssignment origin() {
+ return histAssignment;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HistoryAffinityAssignmentShallowCopy.class, this);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java
index 6de9228..12096f9 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java
@@ -21,7 +21,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -119,11 +118,13 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
stopGrid(4);
checkHistory(ignite, F.asList(
+ topVer(2, 1), // FullHistSize = 3.
+ topVer(3, 0), // FullHistSize = 4.
topVer(3, 1), // FullHistSize = 5.
topVer(4, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
topVer(4, 1), // FullHistSize = 5.
- topVer(5, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
- topVer(6, 0)), // FullHistSize = 5.
+ topVer(5, 0), // Client event ->FullHistSize = 5.
+ topVer(6, 0)), // Client event ->FullHistSize = 5.
5);
startGrid(4);
@@ -131,11 +132,15 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
stopGrid(4);
checkHistory(ignite, F.asList(
+ topVer(2, 1), // FullHistSize = 3.
+ topVer(3, 0), // FullHistSize =4.
+ topVer(3, 1), // FullHistSize = 5.
+ topVer(4, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
topVer(4, 1), // FullHistSize = 5.
- topVer(5, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
- topVer(6, 0), // FullHistSize = 5.
- topVer(7, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
- topVer(8, 0)), // FullHistSize = 5.
+ topVer(5, 0), // Client event -> FullHistSize = 5.
+ topVer(6, 0), // Client event -> FullHistSize = 5.
+ topVer(7, 0), // Client event -> FullHistSize = 5.
+ topVer(8, 0)), // Client event ->FullHistSize = 5.
5);
startGrid(4);
@@ -143,11 +148,17 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
stopGrid(4);
checkHistory(ignite, F.asList(
- topVer(6, 0), // FullHistSize = 5.
- topVer(7, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
- topVer(8, 0), // FullHistSize = 5.
- topVer(9, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
- topVer(10, 0)), // FullHistSize = 5.
+ topVer(2, 1), // FullHistSize = 3.
+ topVer(3, 0), // FullHistSize = 4.
+ topVer(3, 1), // FullHistSize =5.
+ topVer(4, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+ topVer(4, 1), // FullHistSize = 5.
+ topVer(5, 0), // Client event -> FullHistSize = 5.
+ topVer(6, 0), // Client event -> FullHistSize = 5.
+ topVer(7, 0), // Client event -> FullHistSize = 5.
+ topVer(8, 0), // Client event -> FullHistSize = 5.
+ topVer(9, 0), // Client event -> FullHistSize = 5.
+ topVer(10, 0)), // Client event ->FullHistSize = 5.
5);
client = false;
@@ -155,12 +166,29 @@ public class AffinityHistoryCleanupTest extends GridCommonAbstractTest {
startGrid(4);
checkHistory(ignite, F.asList(
- topVer(8, 0), // FullHistSize = 5.
- topVer(9, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
- topVer(10, 0), // FullHistSize = 5.
+ topVer(3, 1), // FullHistSize = 5.
+ topVer(4, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+ topVer(4, 1), // FullHistSize = 5.
+ topVer(5, 0), // Client event -> FullHistSize = 5.
+ topVer(6, 0), // Client event -> FullHistSize = 5.
+ topVer(7, 0), // Client event -> FullHistSize = 5.
+ topVer(8, 0), // Client event -> FullHistSize = 5.
+ topVer(9, 0), // Client event -> FullHistSize = 5.
+ topVer(10, 0), // Client event -> FullHistSize = 5.
topVer(11, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
topVer(11, 1)), // FullHistSize = 5.
5);
+
+ stopGrid(4);
+
+ startGrid(4);
+
+ checkHistory(ignite, F.asList(
+ topVer(11, 1), // FullHistSize = 5.
+ topVer(12, 0), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+ topVer(13, 0), // FullHistSize = 5.
+ topVer(13, 1)), // FullHistSize = (6 - IGNITE_AFFINITY_HISTORY_SIZE(5)/2) = 4.
+ 4);
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
index b841a87..1bb309f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
@@ -21,27 +21,23 @@ import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE;
-import static org.apache.ignite.IgniteSystemProperties.getInteger;
-
/**
* Tests for {@link GridAffinityProcessor}.
*/
@GridCommonTest(group = "Affinity Processor")
public class GridAffinityProcessorMemoryLeakTest extends GridCommonAbstractTest {
- /** Max value for affinity history size name. Should be the same as in GridAffinityAssignmentCache.MAX_HIST_SIZE */
- private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 500);
-
/** Cache name. */
private static final String CACHE_NAME = "cache";
@@ -75,7 +71,10 @@ public class GridAffinityProcessorMemoryLeakTest extends GridCommonAbstractTest
* @throws Exception In case of any exception.
*/
@Test
+ @WithSystemProperty(key = IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE, value = "10")
public void testAffinityProcessor() throws Exception {
+ int maxHistSize = 10;
+
Ignite ignite = startGrid(0);
IgniteKernal grid = (IgniteKernal)grid(0);
@@ -86,21 +85,23 @@ public class GridAffinityProcessorMemoryLeakTest extends GridCommonAbstractTest
IgniteDataStreamer<String, String> globalStreamer;
- int count = MAX_HIST_SIZE * 4;
+ int cnt = maxHistSize * 30;
+
+ int expLimit = cnt / 2;
int size;
do {
try {
- cache = createLocalCache(ignite, count);
+ cache = createLocalCache(ignite, cnt);
- cache.put("Key" + count, "Value" + count);
+ cache.put("Key" + cnt, "Value" + cnt);
cache.destroy();
globalStreamer = createGlobalStreamer(ignite, globalCache);
- globalStreamer.addData("GlobalKey" + count, "GlobalValue" + count);
+ globalStreamer.addData("GlobalKey" + cnt, "GlobalValue" + cnt);
globalStreamer.flush();
@@ -108,14 +109,14 @@ public class GridAffinityProcessorMemoryLeakTest extends GridCommonAbstractTest
size = ((ConcurrentSkipListMap)GridTestUtils.getFieldValue(grid.context().affinity(), "affMap")).size();
- assertTrue("Cache has size that bigger then expected [size=" + size + "" +
- ", expLimit=" + MAX_HIST_SIZE * 3 + "]", size < MAX_HIST_SIZE * 3);
+ assertTrue("Cache has size that bigger then expected [size=" + size +
+ ", expLimit=" + expLimit + "]", size < expLimit);
}
catch (Exception e) {
fail("Error was handled [" + e.getMessage() + "]");
}
}
- while (count-- > 0);
+ while (cnt-- > 0);
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridHistoryAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridHistoryAffinityAssignmentTest.java
index 9fa209a..cf6eb68 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridHistoryAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridHistoryAffinityAssignmentTest.java
@@ -62,13 +62,13 @@ public class GridHistoryAffinityAssignmentTest extends GridCommonAbstractTest {
AffinityTopologyVersion topVer = new AffinityTopologyVersion(1, 0);
HistoryAffinityAssignment lateAssign =
- new HistoryAffinityAssignment(new GridAffinityAssignmentV2(topVer, curr, ideal), 1);
+ new HistoryAffinityAssignmentImpl(new GridAffinityAssignmentV2(topVer, curr, ideal), 1);
assertEquals("Late", curr, lateAssign.assignment());
assertEquals("Ideal late", ideal, lateAssign.idealAssignment());
HistoryAffinityAssignment idealAssign = new
- HistoryAffinityAssignment(new GridAffinityAssignmentV2(topVer, ideal, ideal), 1);
+ HistoryAffinityAssignmentImpl(new GridAffinityAssignmentV2(topVer, ideal, ideal), 1);
assertSame(idealAssign.assignment(), idealAssign.idealAssignment());
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
index 66013b1..61d6c06 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
@@ -19,17 +19,23 @@ package org.apache.ignite.internal.processors.cache;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.Lock;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
@@ -45,7 +51,9 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessag
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.MvccFeatureChecker;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
/**
@@ -77,6 +85,11 @@ public class CacheNoAffinityExchangeTest extends GridCommonAbstractTest {
cfg.setDiscoverySpi(new TestDiscoverySpi().setIpFinder(IP_FINDER));
+ cfg.setActiveOnStart(false);
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setMaxSize(200 * 1024 * 1024)));
+
if (startClient) {
cfg.setClientMode(true);
@@ -91,6 +104,8 @@ public class CacheNoAffinityExchangeTest extends GridCommonAbstractTest {
@Override protected void afterTest() throws Exception {
stopAllGrids();
+ startClient = false;
+
super.afterTest();
}
@@ -222,6 +237,8 @@ public class CacheNoAffinityExchangeTest extends GridCommonAbstractTest {
try {
Ignite ig = startGridsMultiThreaded(4);
+ ig.cluster().active(true);
+
IgniteCache<Integer, Integer> atomicCache = ig.createCache(new CacheConfiguration<Integer, Integer>()
.setName("atomic").setAtomicityMode(CacheAtomicityMode.ATOMIC).setCacheMode(CacheMode.REPLICATED));
@@ -287,6 +304,74 @@ public class CacheNoAffinityExchangeTest extends GridCommonAbstractTest {
}
/**
+ * Tests that multiple client events won't fail transactions due to affinity assignment history expiration.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE, value = "10")
+ public void testMulipleClientLeaveJoin() throws Exception {
+ Ignite ig = startGrids(2);
+
+ ig.cluster().active(true);
+
+ startClient = true;
+
+ IgniteEx stableClient = startGrid(2);
+
+ IgniteCache<Integer, Integer> stableClientTxCacheProxy = stableClient.createCache(
+ new CacheConfiguration<Integer, Integer>()
+ .setName("tx")
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setBackups(1)
+ .setAffinity(new RendezvousAffinityFunction(false, 32)));
+
+ awaitPartitionMapExchange();
+
+ IgniteInternalFuture fut = GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ for (int i = 0; i < 10; i++) {
+ try {
+ startGrid(3);
+
+ stopGrid(3);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ });
+
+ CountDownLatch clientTxLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture loadFut = GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ try (Transaction tx = stableClient.transactions().txStart()) {
+ ThreadLocalRandom r = ThreadLocalRandom.current();
+
+ stableClientTxCacheProxy.put(r.nextInt(100), r.nextInt());
+
+ try {
+ clientTxLatch.await();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+
+ tx.commit();
+ }
+ }
+ });
+
+ fut.get();
+
+ clientTxLatch.countDown();
+
+ loadFut.get();
+ }
+
+ /**
*
*/
public static class TestDiscoverySpi extends TcpDiscoverySpi {