You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/04/13 11:55:50 UTC
ignite git commit: ignite-3018 Cache affinity calculation is slow
with large nodes number
Repository: ignite
Updated Branches:
refs/heads/master 76485fc3e -> 027b2c27c
ignite-3018 Cache affinity calculation is slow with large nodes number
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/027b2c27
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/027b2c27
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/027b2c27
Branch: refs/heads/master
Commit: 027b2c27c8824ef1498883dff3af9d5be37a80b5
Parents: 76485fc
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Thu Apr 13 14:55:39 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Thu Apr 13 14:55:39 2017 +0300
----------------------------------------------------------------------
.../rendezvous/RendezvousAffinityFunction.java | 283 +++--
.../GridCachePartitionExchangeManager.java | 20 +-
...inityFunctionFastPowerOfTwoHashSelfTest.java | 17 -
...ndezvousAffinityFunctionSimpleBenchmark.java | 1100 ++++++++++++++++++
...ousAffinityFunctionStandardHashSelfTest.java | 17 -
.../IgniteClientReconnectCacheTest.java | 16 +-
.../internal/binary/BinaryEnumsSelfTest.java | 2 +
.../GridCachePartitionedAffinitySpreadTest.java | 169 ---
...dCachePartitionedQueueEntryMoveSelfTest.java | 2 +-
...ridCachePartitionNotLoadedEventSelfTest.java | 2 +
.../near/GridCacheNearTxForceKeyTest.java | 6 +-
...cheRebalancingPartitionDistributionTest.java | 2 +-
...gniteServiceConfigVariationsFullApiTest.java | 9 +-
.../IgniteServiceDynamicCachesSelfTest.java | 12 +-
.../ignite/testframework/GridTestNode.java | 12 +-
...PartitionOnAffinityRunAtomicCacheOpTest.java | 46 +-
16 files changed, 1364 insertions(+), 351 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index dcac7d4..9c84f00 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -22,18 +22,16 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.UUID;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
@@ -48,7 +46,6 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.Nullable;
@@ -84,20 +81,6 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
/** Comparator. */
private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator();
- /** Thread local message digest. */
- private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() {
- @Override protected MessageDigest initialValue() {
- try {
- return MessageDigest.getInstance("MD5");
- }
- catch (NoSuchAlgorithmException e) {
- assert false : "Should have failed in constructor";
-
- throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", e);
- }
- }
- };
-
/** Number of partitions. */
private int parts;
@@ -121,10 +104,6 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
/** Hash ID resolver. */
private AffinityNodeHashResolver hashIdRslvr = null;
- /** Ignite instance. */
- @IgniteInstanceResource
- private Ignite ignite;
-
/** Logger instance. */
@LoggerResource
private transient IgniteLogger log;
@@ -195,13 +174,6 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
setPartitions(parts);
this.backupFilter = backupFilter;
-
- try {
- MessageDigest.getInstance("MD5");
- }
- catch (NoSuchAlgorithmException e) {
- throw new IgniteException("Failed to obtain MD5 message digest instance.", e);
- }
}
/**
@@ -382,116 +354,94 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
/**
* Returns collection of nodes (primary first) for specified partition.
*
- * @param d Message digest.
* @param part Partition.
* @param nodes Nodes.
- * @param nodesHash Serialized nodes hashes.
* @param backups Number of backups.
* @param neighborhoodCache Neighborhood.
* @return Assignment.
*/
- public List<ClusterNode> assignPartition(MessageDigest d,
- int part,
+ public List<ClusterNode> assignPartition(int part,
List<ClusterNode> nodes,
- Map<ClusterNode, byte[]> nodesHash,
int backups,
@Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
if (nodes.size() <= 1)
return nodes;
- if (d == null)
- d = digest.get();
-
- List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size());
-
- try {
- for (int i = 0; i < nodes.size(); i++) {
- ClusterNode node = nodes.get(i);
+ IgniteBiTuple<Long, ClusterNode> [] hashArr =
+ (IgniteBiTuple<Long, ClusterNode> [])new IgniteBiTuple[nodes.size()];
- byte[] nodeHashBytes = nodesHash.get(node);
+ for (int i = 0; i < nodes.size(); i++) {
+ ClusterNode node = nodes.get(i);
- if (nodeHashBytes == null) {
- Object nodeHash = resolveNodeHash(node);
+ Object nodeHash = resolveNodeHash(node);
- byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+ long hash = hash(nodeHash.hashCode(), part);
- // Add 4 bytes for partition bytes.
- nodeHashBytes = new byte[nodeHashBytes0.length + 4];
-
- System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length);
-
- nodesHash.put(node, nodeHashBytes);
- }
-
- U.intToBytes(part, nodeHashBytes, 0);
-
- d.reset();
-
- byte[] bytes = d.digest(nodeHashBytes);
+ hashArr[i] = F.t(hash, node);
+ }
- long hash =
- (bytes[0] & 0xFFL)
- | ((bytes[1] & 0xFFL) << 8)
- | ((bytes[2] & 0xFFL) << 16)
- | ((bytes[3] & 0xFFL) << 24)
- | ((bytes[4] & 0xFFL) << 32)
- | ((bytes[5] & 0xFFL) << 40)
- | ((bytes[6] & 0xFFL) << 48)
- | ((bytes[7] & 0xFFL) << 56);
+ final int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size());
- lst.add(F.t(hash, node));
- }
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
+ Iterable<ClusterNode> sortedNodes = new LazyLinearSortedContainer(hashArr, primaryAndBackups);
- Collections.sort(lst, COMPARATOR);
+ // REPLICATED cache case
+ if (backups == Integer.MAX_VALUE)
+ return replicatedAssign(nodes, sortedNodes);
- int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size());
+ Iterator<ClusterNode> it = sortedNodes.iterator();
List<ClusterNode> res = new ArrayList<>(primaryAndBackups);
- ClusterNode primary = lst.get(0).get2();
+ Collection<ClusterNode> allNeighbors = new HashSet<>();
+
+ ClusterNode primary = it.next();
res.add(primary);
+ if (exclNeighbors)
+ allNeighbors.addAll(neighborhoodCache.get(primary.id()));
+
// Select backups.
if (backups > 0) {
- for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
- IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
-
- ClusterNode node = next.get2();
+ while (it.hasNext() && res.size() < primaryAndBackups) {
+ ClusterNode node = it.next();
if (exclNeighbors) {
- Collection<ClusterNode> allNeighbors = GridCacheUtils.neighborsForNodes(neighborhoodCache, res);
-
- if (!allNeighbors.contains(node))
+ if (!allNeighbors.contains(node)) {
res.add(node);
+
+ allNeighbors.addAll(neighborhoodCache.get(node.id()));
+ }
+ }
+ else if ((backupFilter != null && backupFilter.apply(primary, node))
+ || (affinityBackupFilter != null && affinityBackupFilter.apply(node, res))
+ || (affinityBackupFilter == null && backupFilter == null) ) {
+ res.add(node);
+
+ if (exclNeighbors)
+ allNeighbors.addAll(neighborhoodCache.get(node.id()));
}
- else if (affinityBackupFilter != null && affinityBackupFilter.apply(node, res))
- res.add(next.get2());
- else if (backupFilter != null && backupFilter.apply(primary, node))
- res.add(next.get2());
- else if (affinityBackupFilter == null && backupFilter == null)
- res.add(next.get2());
}
}
if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) {
// Need to iterate again in case if there are no nodes which pass exclude neighbors backups criteria.
- for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
- IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
+ it = sortedNodes.iterator();
+
+ it.next();
- ClusterNode node = next.get2();
+ while (it.hasNext() && res.size() < primaryAndBackups) {
+ ClusterNode node = it.next();
if (!res.contains(node))
- res.add(next.get2());
+ res.add(node);
}
if (!exclNeighborsWarn) {
LT.warn(log, "Affinity function excludeNeighbors property is ignored " +
- "because topology has no enough nodes to assign backups.");
+ "because topology has no enough nodes to assign backups.",
+ "Affinity function excludeNeighbors property is ignored " +
+ "because topology has no enough nodes to assign backups.");
exclNeighborsWarn = true;
}
@@ -502,6 +452,53 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
return res;
}
+ /**
+ * Creates assignment for REPLICATED cache
+ *
+ * @param nodes Topology.
+ * @param sortedNodes Sorted for specified partitions nodes.
+ * @return Assignment.
+ */
+ private List<ClusterNode> replicatedAssign(List<ClusterNode> nodes, Iterable<ClusterNode> sortedNodes) {
+ ClusterNode primary = sortedNodes.iterator().next();
+
+ List<ClusterNode> res = new ArrayList<>(nodes.size());
+
+ res.add(primary);
+
+ for (ClusterNode n : nodes)
+ if (!n.equals(primary))
+ res.add(n);
+
+ assert res.size() == nodes.size() : "Not enough backups: " + res.size();
+
+ return res;
+ }
+
+ /**
+ * The pack partition number and nodeHash.hashCode to long and mix it by hash function based on the Wang/Jenkins
+ * hash.
+ *
+ * @param key0 Hash key.
+ * @param key1 Hash key.
+ * @see <a href="https://gist.github.com/badboy/6267743#64-bit-mix-functions">64 bit mix functions</a>
+ * @return Long hash key.
+ */
+ private static long hash(int key0, int key1) {
+ long key = (key0 & 0xFFFFFFFFL)
+ | ((key1 & 0xFFFFFFFFL) << 32);
+
+ key = (~key) + (key << 21); // key = (key << 21) - key - 1;
+ key ^= (key >>> 24);
+ key += (key << 3) + (key << 8); // key * 265
+ key ^= (key >>> 14);
+ key += (key << 2) + (key << 4); // key * 21
+ key ^= (key >>> 28);
+ key += (key << 31);
+
+ return key;
+ }
+
/** {@inheritDoc} */
@Override public void reset() {
// No-op.
@@ -534,19 +531,10 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
- MessageDigest d = digest.get();
-
List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
- Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size());
-
for (int i = 0; i < parts; i++) {
- List<ClusterNode> partAssignment = assignPartition(d,
- i,
- nodes,
- nodesHash,
- affCtx.backups(),
- neighborhoodCache);
+ List<ClusterNode> partAssignment = assignPartition(i, nodes, affCtx.backups(), neighborhoodCache);
assignments.add(partAssignment);
}
@@ -590,4 +578,83 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
o1.get2().id().compareTo(o2.get2().id());
}
}
+
+ /**
+ * Sorts the initial array with linear sort algorithm array
+ */
+ private static class LazyLinearSortedContainer implements Iterable<ClusterNode> {
+ /** Initial node-hash array. */
+ private final IgniteBiTuple<Long, ClusterNode>[] arr;
+
+ /** Count of the sorted elements */
+ private int sorted;
+
+ /**
+ * @param arr Node / partition hash list.
+ * @param needFirstSortedCnt Estimate count of elements to return by iterator.
+ */
+ LazyLinearSortedContainer(IgniteBiTuple<Long, ClusterNode>[] arr, int needFirstSortedCnt) {
+ this.arr = arr;
+
+ if (needFirstSortedCnt > (int)Math.log(arr.length)) {
+ Arrays.sort(arr, COMPARATOR);
+
+ sorted = arr.length;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<ClusterNode> iterator() {
+ return new SortIterator();
+ }
+
+ /**
+ *
+ */
+ private class SortIterator implements Iterator<ClusterNode> {
+ /** Index of the first unsorted element. */
+ private int cur;
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return cur < arr.length;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ if (cur < sorted)
+ return arr[cur++].get2();
+
+ IgniteBiTuple<Long, ClusterNode> min = arr[cur];
+
+ int minIdx = cur;
+
+ for (int i = cur + 1; i < arr.length; i++) {
+ if (COMPARATOR.compare(arr[i], min) < 0) {
+ minIdx = i;
+
+ min = arr[i];
+ }
+ }
+
+ if (minIdx != cur) {
+ arr[minIdx] = arr[cur];
+
+ arr[cur] = min;
+ }
+
+ sorted = cur++;
+
+ return min.get2();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ throw new UnsupportedOperationException("Remove doesn't supported");
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1297c38..9350b2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -781,17 +781,31 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (log.isDebugEnabled())
log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']');
- Collection<ClusterNode> rmts;
-
// If this is the oldest node.
if (oldest.id().equals(cctx.localNodeId())) {
+ // Check rebalance state & send CacheAffinityChangeMessage if need.
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (!cacheCtx.isLocal()) {
+ if (cacheCtx == null)
+ continue;
+
+ GridDhtPartitionTopology top = null;
+
+ if (!cacheCtx.isLocal())
+ top = cacheCtx.topology();
+
+ if (top != null)
+ cctx.affinity().checkRebalanceState(top, cacheCtx.cacheId());
+ }
+ }
+
GridDhtPartitionsExchangeFuture lastFut = lastInitializedFut;
// No need to send to nodes which did not finish their first exchange.
AffinityTopologyVersion rmtTopVer =
lastFut != null ? lastFut.topologyVersion() : AffinityTopologyVersion.NONE;
- rmts = CU.remoteNodes(cctx, rmtTopVer);
+ Collection<ClusterNode> rmts = CU.remoteNodes(cctx, rmtTopVer);
if (log.isDebugEnabled())
log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest.java
index 683ffa2..dfebdbd 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest.java
@@ -17,34 +17,17 @@
package org.apache.ignite.cache.affinity.rendezvous;
-import org.apache.ignite.Ignite;
import org.apache.ignite.cache.affinity.AbstractAffinityFunctionSelfTest;
import org.apache.ignite.cache.affinity.AffinityFunction;
-import org.apache.ignite.testframework.GridTestUtils;
/**
* Tests for {@link RendezvousAffinityFunction}.
*/
public class RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest extends AbstractAffinityFunctionSelfTest {
- /** Ignite. */
- private static Ignite ignite;
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- ignite = startGrid();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
- }
-
/** {@inheritDoc} */
@Override protected AffinityFunction affinityFunction() {
AffinityFunction aff = new RendezvousAffinityFunction(512, null);
- GridTestUtils.setFieldValue(aff, "ignite", ignite);
-
return aff;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java
new file mode 100644
index 0000000..3e5bae9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java
@@ -0,0 +1,1100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import java.io.Externalizable;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cache.affinity.AffinityNodeHashResolver;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.GridTestNode;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Simple benchmarks, compatibility test and distribution check utils for affinity functions.
+ * Needs to check changes at the {@link RendezvousAffinityFunction}.
+ */
+public class RendezvousAffinityFunctionSimpleBenchmark extends GridCommonAbstractTest {
+ /** MAC prefix. */
+ private static final String MAC_PREF = "MAC";
+
+ /** Ignite. */
+ private static Ignite ignite;
+
+ /** Max experiments. */
+ private static final int MAX_EXPERIMENTS = 200;
+
+ /** Max experiments. */
+ private TopologyModificationMode mode = TopologyModificationMode.CHANGE_LAST_NODE;
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 3 * 3600 * 1000;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ ignite = startGrid();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @param nodesCnt Count of nodes to generate.
+ * @return Nodes list.
+ */
+ private List<ClusterNode> createBaseNodes(int nodesCnt) {
+ List<ClusterNode> nodes = new ArrayList<>(nodesCnt);
+
+ for (int i = 0; i < nodesCnt; i++) {
+ GridTestNode node = new GridTestNode(UUID.randomUUID());
+
+ // two neighbours nodes
+ node.setAttribute(IgniteNodeAttributes.ATTR_MACS, MAC_PREF + i / 2);
+
+ nodes.add(node);
+ }
+ return nodes;
+ }
+
+ /**
+ * Modify the topology by remove the last / add new node.
+ *
+ * @param nodes Topology.
+ * @param prevAssignment Previous afinity.
+ * @param iter Number of iteration.
+ * @param backups Backups count.
+ * @return Affinity context.
+ */
+ private GridAffinityFunctionContextImpl nodesModificationChangeLast(List<ClusterNode> nodes,
+ List<List<ClusterNode>> prevAssignment, int iter, int backups) {
+ DiscoveryEvent discoEvt;
+
+ discoEvt = iter % 2 == 0 ? addNode(nodes, iter) : removeNode(nodes, nodes.size() - 1);
+
+ return new GridAffinityFunctionContextImpl(nodes,
+ prevAssignment, discoEvt, new AffinityTopologyVersion(nodes.size()), backups);
+ }
+
+ /**
+ * @param nodes Topology.
+ * @param idx Index of node to remove.
+ * @return Discovery event.
+ */
+ @NotNull private DiscoveryEvent removeNode(List<ClusterNode> nodes, int idx) {
+ return new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_LEFT, nodes.remove(idx));
+ }
+
+ /**
+ * Modify the topology by remove the first node / add new node
+ *
+ * @param nodes Topology.
+ * @param prevAssignment Previous affinity.
+ * @param iter Number of iteration.
+ * @param backups Backups count.
+ * @return Affinity context.
+ */
+ private GridAffinityFunctionContextImpl nodesModificationChangeFirst(List<ClusterNode> nodes,
+ List<List<ClusterNode>> prevAssignment, int iter, int backups) {
+ DiscoveryEvent discoEvt;
+
+ discoEvt = iter % 2 == 0 ? addNode(nodes, iter) : removeNode(nodes, 0);
+
+ return new GridAffinityFunctionContextImpl(nodes,
+ prevAssignment, discoEvt, new AffinityTopologyVersion(nodes.size()), backups);
+ }
+
+ /**
+ * @param nodes Topology.
+ * @param iter Iteration count.
+ * @return Discovery event.
+ */
+ @NotNull private DiscoveryEvent addNode(List<ClusterNode> nodes, int iter) {
+ GridTestNode node = new GridTestNode(UUID.randomUUID());
+
+ // two neighbours nodes
+ node.setAttribute(IgniteNodeAttributes.ATTR_MACS, MAC_PREF + "_add_" + iter / 4);
+
+ nodes.add(node);
+
+ return new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_JOINED, node);
+ }
+
+ /**
+ *
+ * @param aff Affinity function.
+ * @param nodes Topology.
+ * @param iter Number of iteration.
+ * @param prevAssignment Previous affinity assignment.
+ * @param backups Backups count.
+ * @return Tuple with affinity and time spend of the affinity calculation.
+ */
+ private IgniteBiTuple<Long, List<List<ClusterNode>>> assignPartitions(AffinityFunction aff,
+ List<ClusterNode> nodes, List<List<ClusterNode>> prevAssignment, int backups, int iter) {
+
+ GridAffinityFunctionContextImpl ctx = null;
+ switch (mode) {
+ case CHANGE_LAST_NODE:
+ ctx = nodesModificationChangeLast(nodes, prevAssignment, iter, backups);
+ break;
+ case CHANGE_FIRST_NODE:
+ ctx = nodesModificationChangeFirst(nodes, prevAssignment, iter, backups);
+ break;
+
+ case ADD:
+ ctx = new GridAffinityFunctionContextImpl(nodes,
+ prevAssignment, addNode(nodes, iter), new AffinityTopologyVersion(nodes.size()), backups);
+ break;
+
+ case REMOVE_RANDOM:
+ ctx = new GridAffinityFunctionContextImpl(nodes,
+ prevAssignment, removeNode(nodes, nodes.size() - 1),
+ new AffinityTopologyVersion(nodes.size()), backups);
+ break;
+
+ case NONE:
+ ctx = new GridAffinityFunctionContextImpl(nodes,
+ prevAssignment,
+ new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_JOINED, nodes.get(nodes.size() - 1)),
+ new AffinityTopologyVersion(nodes.size()), backups);
+ break;
+
+ }
+
+ long start = System.currentTimeMillis();
+
+ List<List<ClusterNode>> assignments = aff.assignPartitions(ctx);
+
+ return F.t(System.currentTimeMillis() - start, assignments);
+ }
+
+ /**
+ * @param lst List pf measures.
+ * @return Average of measures.
+ */
+ private double average(Collection<Long> lst) {
+ if (lst.isEmpty())
+ return 0;
+
+ long sum = 0;
+
+ for (long l : lst)
+ sum += l;
+
+ return (double)sum / lst.size();
+ }
+
+ /**
+ * @param lst List pf measures.
+ * @param avg Average of the measures.
+ * @return Variance of the measures.
+ */
+ private double variance(Collection<Long> lst, double avg) {
+ if (lst.isEmpty())
+ return 0;
+
+ long sum = 0;
+
+ for (long l : lst)
+ sum += (l - avg) * (l - avg);
+
+ return Math.sqrt((double)sum / lst.size());
+ }
+
+ /**
+ * The table with count of partitions on node:
+ *
+ * column 0 - primary partitions counts
+ * column 1 - backup#0 partitions counts
+ * etc
+ *
+ * Rows correspond to the nodes.
+ *
+ * @param lst Affinity result.
+ * @param nodes Topology.
+ * @return Frequency distribution: counts of partitions on node.
+ */
+ private static List<List<Integer>> freqDistribution(List<List<ClusterNode>> lst, Collection<ClusterNode> nodes) {
+ List<Map<ClusterNode, AtomicInteger>> nodeMaps = new ArrayList<>();
+
+ int backups = lst.get(0).size();
+
+ for (int i = 0; i < backups; ++i) {
+ Map<ClusterNode, AtomicInteger> map = new HashMap<>();
+
+ for (List<ClusterNode> l : lst) {
+ ClusterNode node = l.get(i);
+
+ if (!map.containsKey(node))
+ map.put(node, new AtomicInteger(1));
+ else
+ map.get(node).incrementAndGet();
+ }
+
+ nodeMaps.add(map);
+ }
+
+ List<List<Integer>> byNodes = new ArrayList<>(nodes.size());
+ for (ClusterNode node : nodes) {
+ List<Integer> byBackups = new ArrayList<>(backups);
+
+ for (int j = 0; j < backups; ++j) {
+ if (nodeMaps.get(j).get(node) == null)
+ byBackups.add(0);
+ else
+ byBackups.add(nodeMaps.get(j).get(node).get());
+ }
+
+ byNodes.add(byBackups);
+ }
+ return byNodes;
+ }
+
+ /**
+ * @param byNodes Frequency distribution.
+ * @param suffix Label suffix.
+ * @throws IOException On error.
+ */
+ private void printDistribution(Collection<List<Integer>> byNodes, String suffix) throws IOException {
+ int nodes = byNodes.size();
+
+ try (PrintStream ps = new PrintStream(Files.newOutputStream(FileSystems.getDefault()
+ .getPath(String.format("%03d", nodes) + suffix)))) {
+
+ for (List<Integer> byNode : byNodes) {
+ for (int w : byNode)
+ ps.print(String.format("%05d ", w));
+
+ ps.println("");
+ }
+ }
+ }
+
+ /**
+ * Chi-square test of the distribution with uniform distribution.
+ *
+ * @param byNodes Distribution.
+ * @param parts Partitions count.
+ * @param goldenNodeWeight Weight of according the uniform distribution.
+ * @return Chi-square test.
+ */
+ private double chiSquare(List<List<Integer>> byNodes, int parts, double goldenNodeWeight) {
+ double sum = 0;
+
+ for (List<Integer> byNode : byNodes) {
+ double w = (double)byNode.get(0) / parts;
+
+ sum += (goldenNodeWeight - w) * (goldenNodeWeight - w) / goldenNodeWeight;
+ }
+ return sum;
+ }
+
+ /**
+ * @throws IOException On error.
+ */
+ public void testDistribution() throws IOException {
+ AffinityFunction aff0 = new RendezvousAffinityFunction(true, 1024);
+
+ AffinityFunction aff1 = new RendezvousAffinityFunctionOld(true, 1024);
+
+ GridTestUtils.setFieldValue(aff1, "ignite", ignite);
+
+ affinityDistribution(aff0, aff1);
+ }
+
+ /**
+ *
+ * @param aff0 Affinity function to compare.
+ * @param aff1 Affinity function to compare.
+ */
+ private void affinityDistribution(AffinityFunction aff0, AffinityFunction aff1) {
+ int[] nodesCnts = {5, 64, 100, 128, 200, 256, 300, 400, 500, 600};
+
+ for (int nodesCnt : nodesCnts) {
+ List<ClusterNode> nodes0 = createBaseNodes(nodesCnt);
+ List<ClusterNode> nodes1 = createBaseNodes(nodesCnt);
+
+ assignPartitions(aff0, nodes0, null, 2, 0).get2();
+ List<List<ClusterNode>> lst0 = assignPartitions(aff0, nodes0, null, 2, 1).get2();
+
+ assignPartitions(aff1, nodes1, null, 2, 0).get2();
+ List<List<ClusterNode>> lst1 = assignPartitions(aff1, nodes1, null, 2, 1).get2();
+
+ List<List<Integer>> dist0 = freqDistribution(lst0, nodes0);
+ List<List<Integer>> dist1 = freqDistribution(lst1, nodes1);
+
+ info(String.format("Chi^2. Test %d nodes. %s: %f; %s: %f;",
+ nodesCnt,
+ aff0.getClass().getSimpleName(),
+ chiSquare(dist0, aff0.partitions(), 1.0 / nodesCnt),
+ aff1.getClass().getSimpleName(),
+ chiSquare(dist1, aff0.partitions(), 1.0 / nodesCnt)));
+
+ try {
+ printDistribution(dist0, "." + aff0.getClass().getSimpleName());
+ printDistribution(dist1, "." + aff1.getClass().getSimpleName());
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ public void testAffinityBenchmarkAdd() {
+ mode = TopologyModificationMode.ADD;
+
+ AffinityFunction aff0 = new RendezvousAffinityFunctionOld(true, 1024);
+
+ GridTestUtils.setFieldValue(aff0, "ignite", ignite);
+
+ affinityBenchmark(aff0, new RendezvousAffinityFunction(true, 1024));
+ }
+
+ /**
+ *
+ */
+ public void testAffinityBenchmarkChangeLast() {
+ mode = TopologyModificationMode.CHANGE_LAST_NODE;
+
+ AffinityFunction aff0 = new RendezvousAffinityFunctionOld(true, 1024);
+
+ GridTestUtils.setFieldValue(aff0, "ignite", ignite);
+
+ affinityBenchmark(aff0, new RendezvousAffinityFunction(true, 1024));
+ }
+
+ /**
+ * @param aff0 Affinity function. to compare.
+ * @param aff1 Affinity function. to compare.
+ */
+ private void affinityBenchmark(AffinityFunction aff0, AffinityFunction aff1) {
+ int[] nodesCnts = {100, 4, 100, 200, 300, 400, 500, 600};
+
+ final int backups = 2;
+
+ for (int nodesCnt : nodesCnts) {
+ List<ClusterNode> nodes0 = createBaseNodes(nodesCnt);
+ List<ClusterNode> nodes1 = createBaseNodes(nodesCnt);
+
+ List<Long> times0 = new ArrayList<>(MAX_EXPERIMENTS);
+ List<Long> times1 = new ArrayList<>(MAX_EXPERIMENTS);
+
+ List<List<ClusterNode>> prevAssignment =
+ assignPartitions(aff0, nodes0, null, backups, 0).get2();
+
+ for (int i = 0; i < MAX_EXPERIMENTS; ++i) {
+ IgniteBiTuple<Long, List<List<ClusterNode>>> aa
+ = assignPartitions(aff0, nodes0, prevAssignment, backups, i);
+
+ prevAssignment = aa.get2();
+
+ times0.add(aa.get1());
+ }
+
+ prevAssignment = assignPartitions(aff1, nodes1, null, backups, 0).get2();
+
+ for (int i = 0; i < MAX_EXPERIMENTS; ++i) {
+ IgniteBiTuple<Long, List<List<ClusterNode>>> aa
+ = assignPartitions(aff1, nodes1, prevAssignment, backups, i);
+
+ prevAssignment = aa.get2();
+
+ times1.add(aa.get1());
+ }
+
+ double avr0 = average(times0);
+ double var0 = variance(times0, avr0);
+
+ double avr1 = average(times1);
+ double var1 = variance(times1, avr1);
+
+ info(String.format("Test %d nodes. %s: %.1f ms +/- %.3f ms; %s: %.1f ms +/- %.3f ms;",
+ nodesCnt,
+ aff0.getClass().getSimpleName(),
+ avr0, var0,
+ aff1.getClass().getSimpleName(),
+ avr1, var1));
+ }
+ }
+
+ /**
+ *
+ * @param affOld Old affinity.
+ * @param affNew New affinity/
+ * @return Count of partitions to migrate.
+ */
+ private int countPartitionsToMigrate(List<List<ClusterNode>> affOld, List<List<ClusterNode>> affNew) {
+ if (affOld == null || affNew == null)
+ return 0;
+
+ assertEquals(affOld.size(), affNew.size());
+
+ int diff = 0;
+ for (int i = 0; i < affOld.size(); ++i) {
+ Collection<ClusterNode> s0 = new HashSet<>(affOld.get(i));
+ Iterable<ClusterNode> s1 = new HashSet<>(affNew.get(i));
+
+ for (ClusterNode n : s1) {
+ if (!s0.contains(n))
+ ++diff;
+ }
+ }
+
+ return diff;
+ }
+
+ /**
+ *
+ */
+ public void testPartitionsMigrate() {
+ int[] nodesCnts = {2, 3, 10, 64, 100, 200, 300, 400, 500, 600};
+
+ final int backups = 2;
+
+ AffinityFunction aff0 = new RendezvousAffinityFunction(true, 256);
+ AffinityFunction aff1 = new FairAffinityFunction(true, 256);
+
+ for (int nodesCnt : nodesCnts) {
+ List<ClusterNode> nodes0 = createBaseNodes(nodesCnt);
+ List<ClusterNode> nodes1 = createBaseNodes(nodesCnt);
+
+ List<List<ClusterNode>> affPrev = null;
+
+ int diffCnt0 = 0;
+
+ affPrev = assignPartitions(aff0, nodes0, null, backups, 0).get2();
+ for (int i = 0; i < MAX_EXPERIMENTS; ++i) {
+ List<List<ClusterNode>> affCur = assignPartitions(aff0, nodes0, affPrev, backups, i).get2();
+ diffCnt0 += countPartitionsToMigrate(affPrev, affCur);
+ affPrev = affCur;
+ }
+
+ affPrev = assignPartitions(aff1, nodes1, null, backups, 0).get2();
+ int diffCnt1 = 0;
+ for (int i = 0; i < MAX_EXPERIMENTS; ++i) {
+ List<List<ClusterNode>> affCur = assignPartitions(aff1, nodes1, affPrev, backups, i).get2();
+ diffCnt1 += countPartitionsToMigrate(affPrev, affCur);
+ affPrev = affCur;
+ }
+
+ double goldenChangeAffinity = (double)aff1.partitions() / nodesCnt * (backups + 1);
+ info(String.format("Test %d nodes. Golden: %.1f; %s: %.1f; %s: %.1f;",
+ nodesCnt, goldenChangeAffinity,
+ aff0.getClass().getSimpleName(),
+ (double)diffCnt0 / (MAX_EXPERIMENTS - 1),
+ aff1.getClass().getSimpleName(),
+ (double)diffCnt1 / (MAX_EXPERIMENTS - 1)));
+ }
+ }
+
+ /**
+ *
+ */
+ public void _testAffinityCompatibility() {
+ mode = TopologyModificationMode.ADD;
+
+ AffinityFunction aff0 = new RendezvousAffinityFunction(true, 1024);
+
+ // Use the full copy of the old implementaion of the RendezvousAffinityFunction to check the compatibility.
+ AffinityFunction aff1 = new RendezvousAffinityFunctionOld(true, 1024);
+ GridTestUtils.setFieldValue(aff1, "ignite", ignite);
+
+ affinityCompatibility(aff0, aff1);
+ }
+
+ /**
+ * @param aff0 Affinity function to compare.
+ * @param aff1 Affinity function to compare.
+ */
+ private void affinityCompatibility(AffinityFunction aff0, AffinityFunction aff1) {
+ int[] nodesCnts = {64, 100, 200, 300, 400, 500, 600};
+
+ final int backups = 2;
+
+ mode = TopologyModificationMode.NONE;
+
+ for (int nodesCnt : nodesCnts) {
+ List<ClusterNode> nodes = createBaseNodes(nodesCnt);
+
+ List<List<ClusterNode>> assignment0 = assignPartitions(aff0, nodes, null, backups, 0).get2();
+
+ List<List<ClusterNode>> assignment1 = assignPartitions(aff1, nodes, null, backups, 0).get2();
+
+ assertEquals (assignment0, assignment1);
+ }
+ }
+
+ /**
+ *
+ */
+ private enum TopologyModificationMode {
+ /** Change the last node. */
+ CHANGE_LAST_NODE,
+
+ /** Change the first node. */
+ CHANGE_FIRST_NODE,
+
+ /** Add. */
+ ADD,
+
+ /** Remove random. */
+ REMOVE_RANDOM,
+
+ /** Do nothing*/
+ NONE
+ }
+
+ /**
+ * Full copy of the old implementation of the RendezvousAffinityFunction to check compatibility and performance.
+ */
+ private static class RendezvousAffinityFunctionOld implements AffinityFunction, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Default number of partitions. */
+ public static final int DFLT_PARTITION_COUNT = 1024;
+
+ /** Comparator. */
+ private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator();
+
+ /** Thread local message digest. */
+ private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() {
+ @Override protected MessageDigest initialValue() {
+ try {
+ return MessageDigest.getInstance("MD5");
+ }
+ catch (NoSuchAlgorithmException e) {
+ assert false : "Should have failed in constructor";
+
+ throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", e);
+ }
+ }
+ };
+
+ /** Number of partitions. */
+ private int parts;
+
+ /** Exclude neighbors flag. */
+ private boolean exclNeighbors;
+
+ /** Exclude neighbors warning. */
+ private transient boolean exclNeighborsWarn;
+
+ /** Optional backup filter. First node is primary, second node is a node being tested. */
+ private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter;
+
+ /** Optional affinity backups filter. The first node is a node being tested,
+ * the second is a list of nodes that are already assigned for a given partition (the first node in the list
+ * is primary). */
+ private IgniteBiPredicate<ClusterNode, List<ClusterNode>> affinityBackupFilter;
+
+ /** Hash ID resolver. */
+ private AffinityNodeHashResolver hashIdRslvr = null;
+
+ /** Ignite instance. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** Logger instance. */
+ @LoggerResource
+ private transient IgniteLogger log;
+
+ /**
+ * Empty constructor with all defaults.
+ */
+ public RendezvousAffinityFunctionOld() {
+ this(false);
+ }
+
+ /**
+ * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other
+ * and specified number of backups.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
+ * of each other.
+ */
+ public RendezvousAffinityFunctionOld(boolean exclNeighbors) {
+ this(exclNeighbors, DFLT_PARTITION_COUNT);
+ }
+
+ /**
+ * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other,
+ * and specified number of backups and partitions.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
+ * of each other.
+ * @param parts Total number of partitions.
+ */
+ public RendezvousAffinityFunctionOld(boolean exclNeighbors, int parts) {
+ this(exclNeighbors, parts, null);
+ }
+
+ /**
+ * Initializes optional counts for replicas and backups.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param parts Total number of partitions.
+ * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected
+ * from all nodes that pass this filter. First argument for this filter is primary node, and second
+ * argument is node being tested.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ */
+ public RendezvousAffinityFunctionOld(int parts, @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+ this(false, parts, backupFilter);
+ }
+
+ /**
+ * Private constructor.
+ *
+ * @param exclNeighbors Exclude neighbors flag.
+ * @param parts Partitions count.
+ * @param backupFilter Backup filter.
+ */
+ private RendezvousAffinityFunctionOld(boolean exclNeighbors, int parts,
+ IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+ A.ensure(parts > 0, "parts > 0");
+
+ this.exclNeighbors = exclNeighbors;
+ this.parts = parts;
+ this.backupFilter = backupFilter;
+
+ try {
+ MessageDigest.getInstance("MD5");
+ }
+ catch (NoSuchAlgorithmException e) {
+ throw new IgniteException("Failed to obtain MD5 message digest instance.", e);
+ }
+ }
+
+ /**
+ * Gets total number of key partitions. To ensure that all partitions are
+ * equally distributed across all nodes, please make sure that this
+ * number is significantly larger than a number of nodes. Also, partition
+ * size should be relatively small. Try to avoid having partitions with more
+ * than quarter million keys.
+ * <p>
+ * Note that for fully replicated caches this method should always
+ * return {@code 1}.
+ *
+ * @return Total partition count.
+ */
+ public int getPartitions() {
+ return parts;
+ }
+
+ /**
+ * Sets total number of partitions.
+ *
+ * @param parts Total number of partitions.
+ */
+ public void setPartitions(int parts) {
+ A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, "parts <= " + CacheConfiguration.MAX_PARTITIONS_COUNT);
+
+ this.parts = parts;
+ }
+
+ /**
+ * Gets hash ID resolver for nodes. This resolver is used to provide
+ * alternate hash ID, other than node ID.
+ * <p>
+ * Node IDs constantly change when nodes get restarted, which causes them to
+ * be placed on different locations in the hash ring, and hence causing
+ * repartitioning. Providing an alternate hash ID, which survives node restarts,
+ * puts node on the same location on the hash ring, hence minimizing required
+ * repartitioning.
+ *
+ * @return Hash ID resolver.
+ */
+ @Deprecated
+ public AffinityNodeHashResolver getHashIdResolver() {
+ return hashIdRslvr;
+ }
+
+ /**
+ * Sets hash ID resolver for nodes. This resolver is used to provide
+ * alternate hash ID, other than node ID.
+ * <p>
+ * Node IDs constantly change when nodes get restarted, which causes them to
+ * be placed on different locations in the hash ring, and hence causing
+ * repartitioning. Providing an alternate hash ID, which survives node restarts,
+ * puts node on the same location on the hash ring, hence minimizing required
+ * repartitioning.
+ *
+ * @param hashIdRslvr Hash ID resolver.
+ *
+ * @deprecated Use {@link IgniteConfiguration#setConsistentId(Serializable)} instead.
+ */
+ @Deprecated
+ public void setHashIdResolver(AffinityNodeHashResolver hashIdRslvr) {
+ this.hashIdRslvr = hashIdRslvr;
+ }
+
+ /**
+ * Gets optional backup filter. If not {@code null}, backups will be selected
+ * from all nodes that pass this filter. First node passed to this filter is primary node,
+ * and second node is a node being tested.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @return Optional backup filter.
+ */
+ @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() {
+ return backupFilter;
+ }
+
+ /**
+ * Sets optional backup filter. If provided, then backups will be selected from all
+ * nodes that pass this filter. First node being passed to this filter is primary node,
+ * and second node is a node being tested.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param backupFilter Optional backup filter.
+ * @deprecated Use {@code affinityBackupFilter} instead.
+ */
+ @Deprecated
+ public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+ this.backupFilter = backupFilter;
+ }
+
+ /**
+ * Gets optional backup filter. If not {@code null}, backups will be selected
+ * from all nodes that pass this filter. First node passed to this filter is a node being tested,
+ * and the second parameter is a list of nodes that are already assigned for a given partition (primary node is
+ * the first in the list).
+ * <p>
+ * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @return Optional backup filter.
+ */
+ @Nullable public IgniteBiPredicate<ClusterNode, List<ClusterNode>> getAffinityBackupFilter() {
+ return affinityBackupFilter;
+ }
+
+ /**
+ * Sets optional backup filter. If provided, then backups will be selected from all
+ * nodes that pass this filter. First node being passed to this filter is a node being tested,
+ * and the second parameter is a list of nodes that are already assigned for a given partition (primary node is
+ * the first in the list).
+ * <p>
+ * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param affinityBackupFilter Optional backup filter.
+ */
+ public void setAffinityBackupFilter(@Nullable IgniteBiPredicate<ClusterNode,
+ List<ClusterNode>> affinityBackupFilter) {
+ this.affinityBackupFilter = affinityBackupFilter;
+ }
+
+ /**
+ * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @return {@code True} if nodes residing on the same host may not act as backups of each other.
+ */
+ public boolean isExcludeNeighbors() {
+ return exclNeighbors;
+ }
+
+ /**
+ * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other.
+ */
+ public void setExcludeNeighbors(boolean exclNeighbors) {
+ this.exclNeighbors = exclNeighbors;
+ }
+
+ /**
+ * Resolves node hash.
+ *
+ * @param node Cluster node;
+ * @return Node hash.
+ */
+ public Object resolveNodeHash(ClusterNode node) {
+ if (hashIdRslvr != null)
+ return hashIdRslvr.resolve(node);
+ else
+ return node.consistentId();
+ }
+
+ /**
+ * Returns collection of nodes (primary first) for specified partition.
+ *
+ * @param d Message digest.
+ * @param part Partition.
+ * @param nodes Nodes.
+ * @param nodesHash Serialized nodes hashes.
+ * @param backups Number of backups.
+ * @param neighborhoodCache Neighborhood.
+ * @return Assignment.
+ */
+ public List<ClusterNode> assignPartition(MessageDigest d,
+ int part,
+ List<ClusterNode> nodes,
+ Map<ClusterNode, byte[]> nodesHash,
+ int backups,
+ @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
+ if (nodes.size() <= 1)
+ return nodes;
+
+ if (d == null)
+ d = digest.get();
+
+ List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size());
+
+ try {
+ for (int i = 0; i < nodes.size(); i++) {
+ ClusterNode node = nodes.get(i);
+
+ byte[] nodeHashBytes = nodesHash.get(node);
+
+ if (nodeHashBytes == null) {
+ Object nodeHash = resolveNodeHash(node);
+
+ byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+
+ // Add 4 bytes for partition bytes.
+ nodeHashBytes = new byte[nodeHashBytes0.length + 4];
+
+ System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length);
+
+ nodesHash.put(node, nodeHashBytes);
+ }
+
+ U.intToBytes(part, nodeHashBytes, 0);
+
+ d.reset();
+
+ byte[] bytes = d.digest(nodeHashBytes);
+
+ long hash =
+ (bytes[0] & 0xFFL)
+ | ((bytes[1] & 0xFFL) << 8)
+ | ((bytes[2] & 0xFFL) << 16)
+ | ((bytes[3] & 0xFFL) << 24)
+ | ((bytes[4] & 0xFFL) << 32)
+ | ((bytes[5] & 0xFFL) << 40)
+ | ((bytes[6] & 0xFFL) << 48)
+ | ((bytes[7] & 0xFFL) << 56);
+
+ lst.add(F.t(hash, node));
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ Collections.sort(lst, COMPARATOR);
+
+ int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size());
+
+ List<ClusterNode> res = new ArrayList<>(primaryAndBackups);
+
+ ClusterNode primary = lst.get(0).get2();
+
+ res.add(primary);
+
+ // Select backups.
+ if (backups > 0) {
+ for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
+ IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
+
+ ClusterNode node = next.get2();
+
+ if (exclNeighbors) {
+ Collection<ClusterNode> allNeighbors = GridCacheUtils.neighborsForNodes(neighborhoodCache, res);
+
+ if (!allNeighbors.contains(node))
+ res.add(node);
+ }
+ else if (affinityBackupFilter != null && affinityBackupFilter.apply(node, res))
+ res.add(next.get2());
+ else if (backupFilter != null && backupFilter.apply(primary, node))
+ res.add(next.get2());
+ else if (affinityBackupFilter == null && backupFilter == null)
+ res.add(next.get2());
+ }
+ }
+
+ if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) {
+ // Need to iterate again in case if there are no nodes which pass exclude neighbors backups criteria.
+ for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
+ IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
+
+ ClusterNode node = next.get2();
+
+ if (!res.contains(node))
+ res.add(next.get2());
+ }
+
+ if (!exclNeighborsWarn) {
+ LT.warn(log, "Affinity function excludeNeighbors property is ignored " +
+ "because topology has no enough nodes to assign backups.");
+
+ exclNeighborsWarn = true;
+ }
+ }
+
+ assert res.size() <= primaryAndBackups;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitions() {
+ return parts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key) {
+ if (key == null)
+ throw new IllegalArgumentException("Null key is passed for a partition calculation. " +
+ "Make sure that an affinity key that is used is initialized properly.");
+
+ return U.safeAbs(key.hashCode() % parts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+ List<List<ClusterNode>> assignments = new ArrayList<>(parts);
+
+ Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
+ GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
+
+ MessageDigest d = digest.get();
+
+ List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+
+ Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size());
+
+ for (int i = 0; i < parts; i++) {
+ List<ClusterNode> partAssignment = assignPartition(d,
+ i,
+ nodes,
+ nodesHash,
+ affCtx.backups(),
+ neighborhoodCache);
+
+ assignments.add(partAssignment);
+ }
+
+ return assignments;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeNode(UUID nodeId) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(parts);
+ out.writeBoolean(exclNeighbors);
+ out.writeObject(hashIdRslvr);
+ out.writeObject(backupFilter);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ parts = in.readInt();
+ exclNeighbors = in.readBoolean();
+ hashIdRslvr = (AffinityNodeHashResolver)in.readObject();
+ backupFilter = (IgniteBiPredicate<ClusterNode, ClusterNode>)in.readObject();
+ }
+
+ /**
+ *
+ */
+ private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) {
+ return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 :
+ o1.get2().id().compareTo(o2.get2().id());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionStandardHashSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionStandardHashSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionStandardHashSelfTest.java
index ed47c57..cffa277 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionStandardHashSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionStandardHashSelfTest.java
@@ -17,34 +17,17 @@
package org.apache.ignite.cache.affinity.rendezvous;
-import org.apache.ignite.Ignite;
import org.apache.ignite.cache.affinity.AbstractAffinityFunctionSelfTest;
import org.apache.ignite.cache.affinity.AffinityFunction;
-import org.apache.ignite.testframework.GridTestUtils;
/**
* Tests for {@link RendezvousAffinityFunction}.
*/
public class RendezvousAffinityFunctionStandardHashSelfTest extends AbstractAffinityFunctionSelfTest {
- /** Ignite. */
- private static Ignite ignite;
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- ignite = startGrid();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
- }
-
/** {@inheritDoc} */
@Override protected AffinityFunction affinityFunction() {
AffinityFunction aff = new RendezvousAffinityFunction(513, null);
- GridTestUtils.setFieldValue(aff, "ignite", ignite);
-
return aff;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 01aa256..dff827d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -101,6 +101,9 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
private static final String STATIC_CACHE = "static-cache";
/** */
+ private static final int CACHE_PUTS_CNT = 3;
+
+ /** */
private UUID nodeId;
/** {@inheritDoc} */
@@ -580,17 +583,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
new CI1<IgniteCache<Object, Object>>() {
@Override public void apply(IgniteCache<Object, Object> cache) {
try (Transaction tx = client.transactions().txStart(txConcurrency, REPEATABLE_READ)) {
- log.info("Put1: " + key);
-
- cache.put(key, key);
-
- Integer key2 = key + 1;
-
- log.info("Put2: " + key2);
-
- cache.put(key2, key2);
-
- log.info("Commit [key1=" + key + ", key2=" + key2 + ']');
+ for (int i = 0; i < CACHE_PUTS_CNT; ++i)
+ cache.put(key + i, key + i);
tx.commit();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryEnumsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryEnumsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryEnumsSelfTest.java
index ed473d8..6cac96c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryEnumsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryEnumsSelfTest.java
@@ -114,6 +114,8 @@ public class BinaryEnumsSelfTest extends GridCommonAbstractTest {
node2 = startGrid(1);
cache2 = node2.cache(CACHE_NAME);
cacheBinary2 = cache2.withKeepBinary();
+
+ awaitPartitionMapExchange();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
deleted file mode 100644
index 2d46cf4..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.testframework.GridTestNode;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- *
- */
-public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTest {
- /** */
- public static final int NODES_CNT = 50;
-
- /**
- * @throws Exception If failed.
- */
- public void testPartitionSpreading() throws Exception {
- System.out.printf("%6s, %6s, %6s, %6s, %8s\n", "Nodes", "Reps", "Min", "Max", "Dev");
-
- for (int i = 5; i < NODES_CNT; i = i * 3 / 2) {
- for (int replicas = 128; replicas <= 4096; replicas*=2) {
- Collection<ClusterNode> nodes = createNodes(i, replicas);
-
- RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 10000);
-
- checkDistribution(aff, nodes);
- }
-
- System.out.println();
- }
- }
-
- /**
- * @param nodesCnt Nodes count.
- * @param replicas Value of
- * @return Collection of test nodes.
- */
- private Collection<ClusterNode> createNodes(int nodesCnt, int replicas) {
- Collection<ClusterNode> nodes = new ArrayList<>(nodesCnt);
-
- for (int i = 0; i < nodesCnt; i++)
- nodes.add(new TestRichNode(replicas));
-
- return nodes;
- }
-
- /**
- * @param aff Affinity to check.
- * @param nodes Collection of nodes to test on.
- */
- private void checkDistribution(RendezvousAffinityFunction aff, Collection<ClusterNode> nodes) {
- Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size());
-
- for (int part = 0; part < aff.getPartitions(); part++) {
- Collection<ClusterNode> affNodes = aff.assignPartition(null,
- part,
- new ArrayList<>(nodes),
- new HashMap<ClusterNode, byte[]>(),
- 0,
- null);
-
- assertEquals(1, affNodes.size());
-
- ClusterNode node = F.first(affNodes);
-
- parts.put(node, parts.get(node) != null ? parts.get(node) + 1 : 1);
- }
-
- int min = Integer.MAX_VALUE;
- int max = Integer.MIN_VALUE;
- int total = 0;
-
- float mean = 0;
- float m2 = 0;
- int n = 0;
-
- for (ClusterNode node : nodes) {
- int partsCnt = parts.get(node) != null ? parts.get(node) : 0;
-
- total += partsCnt;
-
- if (partsCnt < min)
- min = partsCnt;
-
- if (partsCnt > max)
- max = partsCnt;
-
- n++;
- float delta = partsCnt - mean;
- mean += delta / n;
- m2 += delta * (partsCnt - mean);
- }
-
- m2 /= (n - 1);
- assertEquals(aff.getPartitions(), total);
-
- System.out.printf("%6s, %6s, %6s, %8.4f\n", nodes.size(),min, max, Math.sqrt(m2));
- }
-
- /**
- * Rich node stub to use in emulated server topology.
- */
- private static class TestRichNode extends GridTestNode {
- /** */
- private final UUID nodeId;
-
- /** */
- private final int replicas;
-
- /**
- * Externalizable class requires public no-arg constructor.
- */
- @SuppressWarnings("UnusedDeclaration")
- private TestRichNode(int replicas) {
- this(UUID.randomUUID(), replicas);
- }
-
- /**
- * Constructs rich node stub to use in emulated server topology.
- *
- * @param nodeId Node id.
- */
- private TestRichNode(UUID nodeId, int replicas) {
- this.nodeId = nodeId;
- this.replicas = replicas;
- }
-
- /**
- * Unused constructor for externalizable support.
- */
- public TestRichNode() {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public UUID id() {
- return nodeId;
- }
-
- /** {@inheritDoc} */
- @Override public <T> T attribute(String name) {
- return super.attribute(name);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
index db11291..c17e9f7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
@@ -90,7 +90,7 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection
* @throws Exception If failed.
*/
public void testQueue() throws Exception {
- final String queueName = "q";
+ final String queueName = "qq";
System.out.println(U.filler(20, '\n'));
http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
index 9e79a27..c8568d2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
@@ -102,6 +102,8 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
startGrid(1);
startGrid(2);
+ awaitPartitionMapExchange();
+
final PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
ignite(2).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
index f1c791e..47d54d5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
@@ -71,13 +71,13 @@ public class GridCacheNearTxForceKeyTest extends GridCommonAbstractTest {
Ignite ignite1 = startGrid(1);
+ awaitPartitionMapExchange();
+
// This key should become primary for ignite1.
- final Integer key = ignite0.configuration().getMarshaller() instanceof OptimizedMarshaller ? 2 : 7;
+ final Integer key = primaryKey(ignite1.cache(null));
assertNull(cache.getAndPut(key, key));
- awaitPartitionMapExchange();
-
assertTrue(ignite0.affinity(null).isPrimary(ignite1.cluster().localNode(), key));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java
index 61ee9ea..eebafed 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java
@@ -52,7 +52,7 @@ public class GridCacheRebalancingPartitionDistributionTest extends GridRollingRe
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setCacheMode(CacheMode.PARTITIONED)
.setBackups(1)
- .setAffinity(new RendezvousAffinityFunction(true /* machine-safe */, 271))
+ .setAffinity(new RendezvousAffinityFunction(true /* machine-safe */, 1024))
.setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.CLOCK)
.setRebalanceMode(CacheRebalanceMode.SYNC)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceConfigVariationsFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceConfigVariationsFullApiTest.java
index 0e33650..c0f836b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceConfigVariationsFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceConfigVariationsFullApiTest.java
@@ -25,6 +25,8 @@ import java.io.Serializable;
import java.util.concurrent.ThreadLocalRandom;
import javax.cache.configuration.Factory;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.binary.BinaryReader;
@@ -109,7 +111,12 @@ public class IgniteServiceConfigVariationsFullApiTest extends IgniteConfigVariat
@Override public void run(IgniteServices services, String svcName, TestService svc) {
IgniteCache<Object, Object> cache = grid(testedNodeIdx).getOrCreateCache(CACHE_NAME);
- services.deployKeyAffinitySingleton(svcName, (Service)svc, cache.getName(), "1");
+ try {
+ services.deployKeyAffinitySingleton(svcName, (Service)svc, cache.getName(), primaryKey(cache));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
}
}));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java
index 026e6a6..fb6c0f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java
@@ -83,7 +83,7 @@ public class IgniteServiceDynamicCachesSelfTest extends GridCommonAbstractTest {
final String svcName = "myService";
- svcs.deployKeyAffinitySingleton(svcName, new TestService(), cacheName, "key");
+ svcs.deployKeyAffinitySingleton(svcName, new TestService(), cacheName, primaryKey(ig.cache(cacheName)));
boolean res = GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
@@ -125,7 +125,15 @@ public class IgniteServiceDynamicCachesSelfTest extends GridCommonAbstractTest {
final String svcName = "myService";
- svcs.deployKeyAffinitySingleton(svcName, new TestService(), cacheName, "key");
+ ig.createCache(ccfg);
+
+ Object key = primaryKey(ig.cache(cacheName));
+
+ ig.destroyCache(cacheName);
+
+ awaitPartitionMapExchange();
+
+ svcs.deployKeyAffinitySingleton(svcName, new TestService(), cacheName, key);
assert svcs.service(svcName) == null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
index 09d4765..d331387 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
@@ -53,8 +53,8 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod
/** */
private UUID id;
- /** */
- private Object consistentId = consistentIdCtr.incrementAndGet();
+ /** String objects as a consistent Id is closer to real case than Integer */
+ private Object consistentId = "Node_" + consistentIdCtr.incrementAndGet();
/** */
private ClusterMetrics metrics;
@@ -247,13 +247,5 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod
/** {@inheritDoc} */
@Override public String toString() {
return id.toString();
-// StringBuilder buf = new StringBuilder();
-//
-// buf.append(getClass().getSimpleName());
-// buf.append(" [attrs=").append(attrs);
-// buf.append(", id=").append(id);
-// buf.append(']');
-//
-// return buf.toString();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java
index 7f59a4b..71e737f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java
@@ -21,9 +21,11 @@ import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -157,10 +159,15 @@ public class IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest extends Igni
affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
for (int i = 0; i < PARTS_CNT; ++i) {
- grid(0).compute().affinityRun(
- Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
- new Integer(i),
- new NotReservedCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT, cacheName));
+ try {
+ grid(0).compute().affinityRun(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(i),
+ new NotReservedCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT, cacheName));
+ }
+ catch (IgniteException e) {
+ checkException(e, ClusterTopologyException.class);
+ }
}
}
}, AFFINITY_THREADS_CNT, "affinity-run");
@@ -204,10 +211,15 @@ public class IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest extends Igni
if (System.currentTimeMillis() >= endTime)
break;
- grid(0).compute().affinityRun(
- Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
- new Integer(i),
- new ReservedPartitionCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT));
+ try {
+ grid(0).compute().affinityRun(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(i),
+ new ReservedPartitionCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT));
+ }
+ catch (IgniteException e) {
+ checkException(e, ClusterTopologyException.class);
+ }
}
}
}, AFFINITY_THREADS_CNT, "affinity-run");
@@ -229,6 +241,24 @@ public class IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest extends Igni
}
}
+
+ /**
+ *
+ * @param e Exception to check.
+ * @param exCls Expected exception cause class.
+ */
+ private void checkException(IgniteException e, Class<? extends Exception> exCls) {
+ for (Throwable t = e; t.getCause() != null; t = t.getCause()) {
+ if (t.getCause().getClass().isAssignableFrom(exCls)) {
+ log.info("Expected exception: " + e);
+
+ return;
+ }
+ }
+
+ throw e;
+ }
+
/** */
private static class NotReservedCacheOpAffinityRun implements IgniteRunnable {
/** Org id. */