You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/13 13:31:08 UTC
[21/21] ignite git commit: Merge master into ignite-3477-master
Merge master into ignite-3477-master
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/48494478
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/48494478
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/48494478
Branch: refs/heads/ignite-3477-master
Commit: 48494478f14eb76b731a483e868d5e1bc7c58bb7
Parents: 1218c41 d298e75
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Thu Apr 13 16:30:54 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Thu Apr 13 16:30:54 2017 +0300
----------------------------------------------------------------------
.../jmh/future/JmhFutureAdapterBenchmark.java | 145 +++
.../java/org/apache/ignite/IgniteCache.java | 265 ++--
.../rendezvous/RendezvousAffinityFunction.java | 283 ++--
.../ignite/internal/IgniteInternalFuture.java | 15 -
.../eventstorage/GridEventStorageManager.java | 341 ++---
.../cache/GridCacheCompoundFuture.java | 63 +
.../cache/GridCacheCompoundIdentityFuture.java | 63 +
.../processors/cache/GridCacheFuture.java | 15 +
.../cache/GridCacheFutureAdapter.java | 61 +
.../GridCachePartitionExchangeManager.java | 16 +
.../distributed/GridCacheTxRecoveryFuture.java | 9 +-
.../dht/CacheDistributedGetFutureAdapter.java | 4 +-
.../distributed/dht/GridDhtLockFuture.java | 33 +-
.../distributed/dht/GridDhtTxFinishFuture.java | 4 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 16 +-
.../dht/GridPartitionedSingleGetFuture.java | 4 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 4 +-
.../GridNearAtomicAbstractUpdateFuture.java | 8 +-
.../GridNearAtomicSingleUpdateFuture.java | 24 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 24 +-
.../colocated/GridDhtColocatedLockFuture.java | 23 +-
.../GridDhtPartitionsExchangeFuture.java | 35 +-
.../distributed/near/GridNearLockFuture.java | 20 +-
...arOptimisticSerializableTxPrepareFuture.java | 2 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 9 +-
.../GridNearPessimisticTxPrepareFuture.java | 5 +-
.../near/GridNearTxFinishFuture.java | 13 +-
.../cache/distributed/near/GridNearTxLocal.java | 7 +-
.../near/GridNearTxPrepareFutureAdapter.java | 4 +-
.../cache/local/GridLocalLockFuture.java | 6 +-
.../query/GridCacheDistributedQueryFuture.java | 18 +-
.../query/GridCacheQueryFutureAdapter.java | 31 +-
.../transactions/IgniteTransactionsImpl.java | 4 +-
.../cache/transactions/TxDeadlockDetection.java | 5 +-
.../datastructures/GridCacheSemaphoreImpl.java | 12 +
.../platform/compute/PlatformCompute.java | 10 -
.../tcp/GridTcpMemcachedNioListener.java | 20 +-
.../util/future/GridCompoundFuture.java | 45 +-
.../util/future/GridFinishedFuture.java | 13 -
.../internal/util/future/GridFutureAdapter.java | 479 ++++---
.../internal/util/future/IgniteFutureImpl.java | 10 -
.../ignite/internal/util/lang/GridFunc.java | 1223 ++----------------
.../ignite/internal/util/lang/GridTupleV.java | 1 -
.../lang/gridfunc/AlwaysFalsePredicate.java | 46 +
.../util/lang/gridfunc/AlwaysTruePredicate.java | 46 +
.../util/lang/gridfunc/AlwaysTrueReducer.java | 56 +
.../gridfunc/AtomicIntegerFactoryCallable.java | 40 +
.../gridfunc/CacheEntryGetValueClosure.java | 42 +
.../gridfunc/CacheEntryHasPeekPredicate.java | 41 +
.../lang/gridfunc/ClusterNodeGetIdClosure.java | 41 +
.../ConcurrentDequeFactoryCallable.java | 40 +
.../ConcurrentHashSetFactoryCallable.java | 40 +
.../gridfunc/ConcurrentMapFactoryCallable.java | 41 +
.../lang/gridfunc/ContainsNodeIdsPredicate.java | 52 +
.../util/lang/gridfunc/ContainsPredicate.java | 55 +
.../gridfunc/EntryByKeyEvaluationPredicate.java | 53 +
.../gridfunc/EqualsClusterNodeIdPredicate.java | 51 +
.../util/lang/gridfunc/EqualsUuidPredicate.java | 50 +
.../lang/gridfunc/FlatCollectionWrapper.java | 66 +
.../util/lang/gridfunc/FlatIterator.java | 104 ++
.../util/lang/gridfunc/HasEqualIdPredicate.java | 51 +
.../lang/gridfunc/HasNotEqualIdPredicate.java | 51 +
.../util/lang/gridfunc/IdentityClosure.java | 39 +
.../util/lang/gridfunc/IntSumReducer.java | 51 +
.../util/lang/gridfunc/IsAllPredicate.java | 52 +
.../util/lang/gridfunc/IsNotAllPredicate.java | 52 +
.../util/lang/gridfunc/IsNotNullPredicate.java | 44 +
.../util/lang/gridfunc/LongSumReducer.java | 51 +
.../util/lang/gridfunc/MapFactoryCallable.java | 41 +
.../util/lang/gridfunc/MultipleIterator.java | 106 ++
.../util/lang/gridfunc/NoOpClosure.java | 39 +
.../lang/gridfunc/NotContainsPredicate.java | 54 +
.../util/lang/gridfunc/NotEqualPredicate.java | 53 +
.../lang/gridfunc/PredicateCollectionView.java | 78 ++
.../util/lang/gridfunc/PredicateMapView.java | 121 ++
.../util/lang/gridfunc/PredicateSetView.java | 153 +++
.../lang/gridfunc/ReadOnlyCollectionView.java | 95 ++
.../lang/gridfunc/ReadOnlyCollectionView2X.java | 100 ++
.../lang/gridfunc/RunnableWrapperClosure.java | 51 +
.../util/lang/gridfunc/SetFactoryCallable.java | 41 +
.../util/lang/gridfunc/StringConcatReducer.java | 79 ++
.../util/lang/gridfunc/ToStringClosure.java | 42 +
.../lang/gridfunc/TransformCollectionView.java | 79 ++
.../gridfunc/TransformFilteringIterator.java | 138 ++
.../util/lang/gridfunc/TransformMapView.java | 168 +++
.../util/lang/gridfunc/TransformMapView2.java | 165 +++
.../util/lang/gridfunc/package-info.java | 22 +
.../org/apache/ignite/lang/IgniteFuture.java | 15 -
.../TransactionDeadlockException.java | 4 +-
.../transactions/TransactionException.java | 80 ++
.../TransactionHeuristicException.java | 4 +-
.../TransactionOptimisticException.java | 4 +-
.../TransactionRollbackException.java | 4 +-
.../TransactionTimeoutException.java | 4 +-
...inityFunctionFastPowerOfTwoHashSelfTest.java | 17 -
...ndezvousAffinityFunctionSimpleBenchmark.java | 1100 ++++++++++++++++
...ousAffinityFunctionStandardHashSelfTest.java | 17 -
.../IgniteClientReconnectCacheTest.java | 16 +-
.../internal/binary/BinaryEnumsSelfTest.java | 2 +
.../cache/CacheKeepBinaryTransactionTest.java | 121 ++
.../GridCacheOrderedPreloadingSelfTest.java | 48 +-
.../GridCachePartitionedAffinitySpreadTest.java | 169 ---
...SemaphoreFailoverSafeReleasePermitsTest.java | 9 +-
...dCachePartitionedQueueEntryMoveSelfTest.java | 2 +-
.../near/GridCacheNearTxForceKeyTest.java | 6 +-
...cheRebalancingPartitionDistributionTest.java | 2 +-
.../datastreamer/DataStreamerImplSelfTest.java | 36 -
...gniteServiceConfigVariationsFullApiTest.java | 9 +-
.../IgniteServiceDynamicCachesSelfTest.java | 24 +-
.../util/future/IgniteFutureImplTest.java | 38 -
.../ignite/testframework/GridTestNode.java | 12 +-
.../testsuites/IgniteCacheTestSuite5.java | 2 +
.../external/HadoopExternalTaskExecutor.java | 2 +-
...PartitionOnAffinityRunAtomicCacheOpTest.java | 46 +-
.../h2/GridIndexingSpiAbstractSelfTest.java | 2 -
.../Binary/BinaryBuilderSelfTest.cs | 4 +-
.../Binary/BinaryDynamicRegistrationTest.cs | 11 +-
.../Binary/BinarySelfTest.cs | 4 +-
.../IgniteConfigurationSerializerTest.cs | 15 +-
.../Binary/BinaryConfiguration.cs | 24 +-
.../IgniteConfigurationSection.xsd | 8 +-
.../Binary/BinarySurrogateTypeDescriptor.cs | 10 +-
.../Impl/Binary/BinaryUtils.cs | 8 +-
.../Impl/Binary/Marshaller.cs | 14 +-
.../processors/schedule/ScheduleFutureImpl.java | 20 -
125 files changed, 5727 insertions(+), 2458 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 47cd9fe,1a7c2c6..388a434
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@@ -38,7 -38,7 +38,8 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheLockCandidates;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+ import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 4f75480,23d7657..4faa475
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@@ -27,8 -27,8 +27,9 @@@ import org.apache.ignite.IgniteCheckedE
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+ import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 70fa1d1,964d423..6e7b324
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@@ -42,7 -42,7 +42,8 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
import org.apache.ignite.internal.processors.cache.CacheLockCandidates;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+ import org.apache.ignite.internal.processors.cache.GridCacheCompoundFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 46fb30c,8512298..4442b3a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@@ -208,9 -202,8 +208,9 @@@ public final class GridDhtColocatedLock
this.filter = filter;
this.skipStore = skipStore;
this.keepBinary = keepBinary;
+ this.recovery = recovery;
- ignoreInterrupts(true);
+ ignoreInterrupts();
threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 103bd49,55aca2a..f6827ab
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -187,11 -171,8 +187,8 @@@ public class GridDhtPartitionsExchangeF
@GridToStringInclude
private volatile IgniteInternalFuture<?> partReleaseFut;
- /** */
- private final Object mux = new Object();
-
/** Logger. */
- private IgniteLogger log;
+ private final IgniteLogger log;
/** Dynamic cache change requests. */
private Collection<DynamicCacheChangeRequest> reqs;
@@@ -1446,10 -1216,10 +1443,10 @@@
if (updateSingleMap) {
try {
- updatePartitionSingleMap(msg);
+ updatePartitionSingleMap(node, msg);
}
finally {
- synchronized (mux) {
+ synchronized (this) {
assert pendingSingleUpdates > 0;
pendingSingleUpdates--;
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index eb953d9,8de01c9..b94c014
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@@ -214,9 -208,8 +214,9 @@@ public final class GridNearLockFuture e
this.filter = filter;
this.skipStore = skipStore;
this.keepBinary = keepBinary;
+ this.recovery = recovery;
- ignoreInterrupts(true);
+ ignoreInterrupts();
threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java
index 0000000,3e5bae9..16f8e97
mode 000000,100644..100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java
@@@ -1,0 -1,1100 +1,1100 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.cache.affinity.rendezvous;
+
+ import java.io.Externalizable;
+ import java.io.ObjectInput;
+ import java.io.ObjectOutput;
+ import java.io.Serializable;
+ import java.security.MessageDigest;
+ import java.security.NoSuchAlgorithmException;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import org.apache.ignite.Ignite;
+ import org.apache.ignite.IgniteCheckedException;
+ import org.apache.ignite.IgniteException;
+ import org.apache.ignite.IgniteLogger;
+ import org.apache.ignite.cache.affinity.AffinityFunction;
+ import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+ import org.apache.ignite.cache.affinity.AffinityNodeHashResolver;
-import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+ import org.apache.ignite.cluster.ClusterNode;
+ import org.apache.ignite.configuration.CacheConfiguration;
+ import org.apache.ignite.configuration.IgniteConfiguration;
+ import org.apache.ignite.events.DiscoveryEvent;
+ import org.apache.ignite.events.EventType;
+ import org.apache.ignite.internal.IgniteNodeAttributes;
+ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+ import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
+ import org.apache.ignite.internal.processors.cache.GridCacheUtils;
+ import org.apache.ignite.internal.util.typedef.F;
+ import org.apache.ignite.internal.util.typedef.internal.A;
+ import org.apache.ignite.internal.util.typedef.internal.LT;
+ import org.apache.ignite.internal.util.typedef.internal.U;
+ import org.apache.ignite.lang.IgniteBiPredicate;
+ import org.apache.ignite.lang.IgniteBiTuple;
+ import org.apache.ignite.resources.IgniteInstanceResource;
+ import org.apache.ignite.resources.LoggerResource;
+ import org.apache.ignite.testframework.GridTestNode;
+ import org.apache.ignite.testframework.GridTestUtils;
+ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+ import java.io.IOException;
+ import java.io.PrintStream;
+ import java.nio.file.FileSystems;
+ import java.nio.file.Files;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.UUID;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import org.jetbrains.annotations.NotNull;
+ import org.jetbrains.annotations.Nullable;
+
+ /**
+ * Simple benchmarks, compatibility test and distribution check utils for affinity functions.
+ * Needs to check changes at the {@link RendezvousAffinityFunction}.
+ */
+ public class RendezvousAffinityFunctionSimpleBenchmark extends GridCommonAbstractTest {
+ /** MAC prefix. */
+ private static final String MAC_PREF = "MAC";
+
+ /** Ignite. */
+ private static Ignite ignite;
+
+ /** Max experiments. */
+ private static final int MAX_EXPERIMENTS = 200;
+
+ /** Max experiments. */
+ private TopologyModificationMode mode = TopologyModificationMode.CHANGE_LAST_NODE;
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 3 * 3600 * 1000;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ ignite = startGrid();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @param nodesCnt Count of nodes to generate.
+ * @return Nodes list.
+ */
+ private List<ClusterNode> createBaseNodes(int nodesCnt) {
+ List<ClusterNode> nodes = new ArrayList<>(nodesCnt);
+
+ for (int i = 0; i < nodesCnt; i++) {
+ GridTestNode node = new GridTestNode(UUID.randomUUID());
+
+ // two neighbours nodes
+ node.setAttribute(IgniteNodeAttributes.ATTR_MACS, MAC_PREF + i / 2);
+
+ nodes.add(node);
+ }
+ return nodes;
+ }
+
+ /**
+ * Modify the topology by remove the last / add new node.
+ *
+ * @param nodes Topology.
+ * @param prevAssignment Previous afinity.
+ * @param iter Number of iteration.
+ * @param backups Backups count.
+ * @return Affinity context.
+ */
+ private GridAffinityFunctionContextImpl nodesModificationChangeLast(List<ClusterNode> nodes,
+ List<List<ClusterNode>> prevAssignment, int iter, int backups) {
+ DiscoveryEvent discoEvt;
+
+ discoEvt = iter % 2 == 0 ? addNode(nodes, iter) : removeNode(nodes, nodes.size() - 1);
+
+ return new GridAffinityFunctionContextImpl(nodes,
+ prevAssignment, discoEvt, new AffinityTopologyVersion(nodes.size()), backups);
+ }
+
+ /**
+ * @param nodes Topology.
+ * @param idx Index of node to remove.
+ * @return Discovery event.
+ */
+ @NotNull private DiscoveryEvent removeNode(List<ClusterNode> nodes, int idx) {
+ return new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_LEFT, nodes.remove(idx));
+ }
+
+ /**
+ * Modify the topology by remove the first node / add new node
+ *
+ * @param nodes Topology.
+ * @param prevAssignment Previous affinity.
+ * @param iter Number of iteration.
+ * @param backups Backups count.
+ * @return Affinity context.
+ */
+ private GridAffinityFunctionContextImpl nodesModificationChangeFirst(List<ClusterNode> nodes,
+ List<List<ClusterNode>> prevAssignment, int iter, int backups) {
+ DiscoveryEvent discoEvt;
+
+ discoEvt = iter % 2 == 0 ? addNode(nodes, iter) : removeNode(nodes, 0);
+
+ return new GridAffinityFunctionContextImpl(nodes,
+ prevAssignment, discoEvt, new AffinityTopologyVersion(nodes.size()), backups);
+ }
+
+ /**
+ * @param nodes Topology.
+ * @param iter Iteration count.
+ * @return Discovery event.
+ */
+ @NotNull private DiscoveryEvent addNode(List<ClusterNode> nodes, int iter) {
+ GridTestNode node = new GridTestNode(UUID.randomUUID());
+
+ // two neighbours nodes
+ node.setAttribute(IgniteNodeAttributes.ATTR_MACS, MAC_PREF + "_add_" + iter / 4);
+
+ nodes.add(node);
+
+ return new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_JOINED, node);
+ }
+
+ /**
+ *
+ * @param aff Affinity function.
+ * @param nodes Topology.
+ * @param iter Number of iteration.
+ * @param prevAssignment Previous affinity assignment.
+ * @param backups Backups count.
+ * @return Tuple with affinity and time spend of the affinity calculation.
+ */
+ private IgniteBiTuple<Long, List<List<ClusterNode>>> assignPartitions(AffinityFunction aff,
+ List<ClusterNode> nodes, List<List<ClusterNode>> prevAssignment, int backups, int iter) {
+
+ GridAffinityFunctionContextImpl ctx = null;
+ switch (mode) {
+ case CHANGE_LAST_NODE:
+ ctx = nodesModificationChangeLast(nodes, prevAssignment, iter, backups);
+ break;
+ case CHANGE_FIRST_NODE:
+ ctx = nodesModificationChangeFirst(nodes, prevAssignment, iter, backups);
+ break;
+
+ case ADD:
+ ctx = new GridAffinityFunctionContextImpl(nodes,
+ prevAssignment, addNode(nodes, iter), new AffinityTopologyVersion(nodes.size()), backups);
+ break;
+
+ case REMOVE_RANDOM:
+ ctx = new GridAffinityFunctionContextImpl(nodes,
+ prevAssignment, removeNode(nodes, nodes.size() - 1),
+ new AffinityTopologyVersion(nodes.size()), backups);
+ break;
+
+ case NONE:
+ ctx = new GridAffinityFunctionContextImpl(nodes,
+ prevAssignment,
+ new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_JOINED, nodes.get(nodes.size() - 1)),
+ new AffinityTopologyVersion(nodes.size()), backups);
+ break;
+
+ }
+
+ long start = System.currentTimeMillis();
+
+ List<List<ClusterNode>> assignments = aff.assignPartitions(ctx);
+
+ return F.t(System.currentTimeMillis() - start, assignments);
+ }
+
+ /**
+ * @param lst List pf measures.
+ * @return Average of measures.
+ */
+ private double average(Collection<Long> lst) {
+ if (lst.isEmpty())
+ return 0;
+
+ long sum = 0;
+
+ for (long l : lst)
+ sum += l;
+
+ return (double)sum / lst.size();
+ }
+
+ /**
+ * @param lst List pf measures.
+ * @param avg Average of the measures.
+ * @return Variance of the measures.
+ */
+ private double variance(Collection<Long> lst, double avg) {
+ if (lst.isEmpty())
+ return 0;
+
+ long sum = 0;
+
+ for (long l : lst)
+ sum += (l - avg) * (l - avg);
+
+ return Math.sqrt((double)sum / lst.size());
+ }
+
+ /**
+ * The table with count of partitions on node:
+ *
+ * column 0 - primary partitions counts
+ * column 1 - backup#0 partitions counts
+ * etc
+ *
+ * Rows correspond to the nodes.
+ *
+ * @param lst Affinity result.
+ * @param nodes Topology.
+ * @return Frequency distribution: counts of partitions on node.
+ */
+ private static List<List<Integer>> freqDistribution(List<List<ClusterNode>> lst, Collection<ClusterNode> nodes) {
+ List<Map<ClusterNode, AtomicInteger>> nodeMaps = new ArrayList<>();
+
+ int backups = lst.get(0).size();
+
+ for (int i = 0; i < backups; ++i) {
+ Map<ClusterNode, AtomicInteger> map = new HashMap<>();
+
+ for (List<ClusterNode> l : lst) {
+ ClusterNode node = l.get(i);
+
+ if (!map.containsKey(node))
+ map.put(node, new AtomicInteger(1));
+ else
+ map.get(node).incrementAndGet();
+ }
+
+ nodeMaps.add(map);
+ }
+
+ List<List<Integer>> byNodes = new ArrayList<>(nodes.size());
+ for (ClusterNode node : nodes) {
+ List<Integer> byBackups = new ArrayList<>(backups);
+
+ for (int j = 0; j < backups; ++j) {
+ if (nodeMaps.get(j).get(node) == null)
+ byBackups.add(0);
+ else
+ byBackups.add(nodeMaps.get(j).get(node).get());
+ }
+
+ byNodes.add(byBackups);
+ }
+ return byNodes;
+ }
+
+ /**
+ * @param byNodes Frequency distribution.
+ * @param suffix Label suffix.
+ * @throws IOException On error.
+ */
+ private void printDistribution(Collection<List<Integer>> byNodes, String suffix) throws IOException {
+ int nodes = byNodes.size();
+
+ try (PrintStream ps = new PrintStream(Files.newOutputStream(FileSystems.getDefault()
+ .getPath(String.format("%03d", nodes) + suffix)))) {
+
+ for (List<Integer> byNode : byNodes) {
+ for (int w : byNode)
+ ps.print(String.format("%05d ", w));
+
+ ps.println("");
+ }
+ }
+ }
+
+ /**
+ * Chi-square test of the distribution with uniform distribution.
+ *
+ * @param byNodes Distribution.
+ * @param parts Partitions count.
+ * @param goldenNodeWeight Weight of according the uniform distribution.
+ * @return Chi-square test.
+ */
+ private double chiSquare(List<List<Integer>> byNodes, int parts, double goldenNodeWeight) {
+ double sum = 0;
+
+ for (List<Integer> byNode : byNodes) {
+ double w = (double)byNode.get(0) / parts;
+
+ sum += (goldenNodeWeight - w) * (goldenNodeWeight - w) / goldenNodeWeight;
+ }
+ return sum;
+ }
+
+ /**
+ * @throws IOException On error.
+ */
+ public void testDistribution() throws IOException {
+ AffinityFunction aff0 = new RendezvousAffinityFunction(true, 1024);
+
+ AffinityFunction aff1 = new RendezvousAffinityFunctionOld(true, 1024);
+
+ GridTestUtils.setFieldValue(aff1, "ignite", ignite);
+
+ affinityDistribution(aff0, aff1);
+ }
+
+ /**
+ *
+ * @param aff0 Affinity function to compare.
+ * @param aff1 Affinity function to compare.
+ */
+ private void affinityDistribution(AffinityFunction aff0, AffinityFunction aff1) {
+ int[] nodesCnts = {5, 64, 100, 128, 200, 256, 300, 400, 500, 600};
+
+ for (int nodesCnt : nodesCnts) {
+ List<ClusterNode> nodes0 = createBaseNodes(nodesCnt);
+ List<ClusterNode> nodes1 = createBaseNodes(nodesCnt);
+
+ assignPartitions(aff0, nodes0, null, 2, 0).get2();
+ List<List<ClusterNode>> lst0 = assignPartitions(aff0, nodes0, null, 2, 1).get2();
+
+ assignPartitions(aff1, nodes1, null, 2, 0).get2();
+ List<List<ClusterNode>> lst1 = assignPartitions(aff1, nodes1, null, 2, 1).get2();
+
+ List<List<Integer>> dist0 = freqDistribution(lst0, nodes0);
+ List<List<Integer>> dist1 = freqDistribution(lst1, nodes1);
+
+ info(String.format("Chi^2. Test %d nodes. %s: %f; %s: %f;",
+ nodesCnt,
+ aff0.getClass().getSimpleName(),
+ chiSquare(dist0, aff0.partitions(), 1.0 / nodesCnt),
+ aff1.getClass().getSimpleName(),
+ chiSquare(dist1, aff0.partitions(), 1.0 / nodesCnt)));
+
+ try {
+ printDistribution(dist0, "." + aff0.getClass().getSimpleName());
+ printDistribution(dist1, "." + aff1.getClass().getSimpleName());
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ public void testAffinityBenchmarkAdd() {
+ mode = TopologyModificationMode.ADD;
+
+ AffinityFunction aff0 = new RendezvousAffinityFunctionOld(true, 1024);
+
+ GridTestUtils.setFieldValue(aff0, "ignite", ignite);
+
+ affinityBenchmark(aff0, new RendezvousAffinityFunction(true, 1024));
+ }
+
+ /**
+ *
+ */
+ public void testAffinityBenchmarkChangeLast() {
+ mode = TopologyModificationMode.CHANGE_LAST_NODE;
+
+ AffinityFunction aff0 = new RendezvousAffinityFunctionOld(true, 1024);
+
+ GridTestUtils.setFieldValue(aff0, "ignite", ignite);
+
+ affinityBenchmark(aff0, new RendezvousAffinityFunction(true, 1024));
+ }
+
+ /**
+ * @param aff0 Affinity function. to compare.
+ * @param aff1 Affinity function. to compare.
+ */
+ private void affinityBenchmark(AffinityFunction aff0, AffinityFunction aff1) {
+ int[] nodesCnts = {100, 4, 100, 200, 300, 400, 500, 600};
+
+ final int backups = 2;
+
+ for (int nodesCnt : nodesCnts) {
+ List<ClusterNode> nodes0 = createBaseNodes(nodesCnt);
+ List<ClusterNode> nodes1 = createBaseNodes(nodesCnt);
+
+ List<Long> times0 = new ArrayList<>(MAX_EXPERIMENTS);
+ List<Long> times1 = new ArrayList<>(MAX_EXPERIMENTS);
+
+ List<List<ClusterNode>> prevAssignment =
+ assignPartitions(aff0, nodes0, null, backups, 0).get2();
+
+ for (int i = 0; i < MAX_EXPERIMENTS; ++i) {
+ IgniteBiTuple<Long, List<List<ClusterNode>>> aa
+ = assignPartitions(aff0, nodes0, prevAssignment, backups, i);
+
+ prevAssignment = aa.get2();
+
+ times0.add(aa.get1());
+ }
+
+ prevAssignment = assignPartitions(aff1, nodes1, null, backups, 0).get2();
+
+ for (int i = 0; i < MAX_EXPERIMENTS; ++i) {
+ IgniteBiTuple<Long, List<List<ClusterNode>>> aa
+ = assignPartitions(aff1, nodes1, prevAssignment, backups, i);
+
+ prevAssignment = aa.get2();
+
+ times1.add(aa.get1());
+ }
+
+ double avr0 = average(times0);
+ double var0 = variance(times0, avr0);
+
+ double avr1 = average(times1);
+ double var1 = variance(times1, avr1);
+
+ info(String.format("Test %d nodes. %s: %.1f ms +/- %.3f ms; %s: %.1f ms +/- %.3f ms;",
+ nodesCnt,
+ aff0.getClass().getSimpleName(),
+ avr0, var0,
+ aff1.getClass().getSimpleName(),
+ avr1, var1));
+ }
+ }
+
+ /**
+ *
+ * @param affOld Old affinity.
+ * @param affNew New affinity/
+ * @return Count of partitions to migrate.
+ */
+ private int countPartitionsToMigrate(List<List<ClusterNode>> affOld, List<List<ClusterNode>> affNew) {
+ if (affOld == null || affNew == null)
+ return 0;
+
+ assertEquals(affOld.size(), affNew.size());
+
+ int diff = 0;
+ for (int i = 0; i < affOld.size(); ++i) {
+ Collection<ClusterNode> s0 = new HashSet<>(affOld.get(i));
+ Iterable<ClusterNode> s1 = new HashSet<>(affNew.get(i));
+
+ for (ClusterNode n : s1) {
+ if (!s0.contains(n))
+ ++diff;
+ }
+ }
+
+ return diff;
+ }
+
+ /**
+ *
+ */
+ public void testPartitionsMigrate() {
+ int[] nodesCnts = {2, 3, 10, 64, 100, 200, 300, 400, 500, 600};
+
+ final int backups = 2;
+
+ AffinityFunction aff0 = new RendezvousAffinityFunction(true, 256);
- AffinityFunction aff1 = new FairAffinityFunction(true, 256);
++ // TODO choose another affinity function to compare.
++ AffinityFunction aff1 = new RendezvousAffinityFunction(true, 256);
+
+ for (int nodesCnt : nodesCnts) {
+ List<ClusterNode> nodes0 = createBaseNodes(nodesCnt);
+ List<ClusterNode> nodes1 = createBaseNodes(nodesCnt);
+
+ List<List<ClusterNode>> affPrev = null;
+
+ int diffCnt0 = 0;
+
+ affPrev = assignPartitions(aff0, nodes0, null, backups, 0).get2();
+ for (int i = 0; i < MAX_EXPERIMENTS; ++i) {
+ List<List<ClusterNode>> affCur = assignPartitions(aff0, nodes0, affPrev, backups, i).get2();
+ diffCnt0 += countPartitionsToMigrate(affPrev, affCur);
+ affPrev = affCur;
+ }
+
+ affPrev = assignPartitions(aff1, nodes1, null, backups, 0).get2();
+ int diffCnt1 = 0;
+ for (int i = 0; i < MAX_EXPERIMENTS; ++i) {
+ List<List<ClusterNode>> affCur = assignPartitions(aff1, nodes1, affPrev, backups, i).get2();
+ diffCnt1 += countPartitionsToMigrate(affPrev, affCur);
+ affPrev = affCur;
+ }
+
+ double goldenChangeAffinity = (double)aff1.partitions() / nodesCnt * (backups + 1);
+ info(String.format("Test %d nodes. Golden: %.1f; %s: %.1f; %s: %.1f;",
+ nodesCnt, goldenChangeAffinity,
+ aff0.getClass().getSimpleName(),
+ (double)diffCnt0 / (MAX_EXPERIMENTS - 1),
+ aff1.getClass().getSimpleName(),
+ (double)diffCnt1 / (MAX_EXPERIMENTS - 1)));
+ }
+ }
+
+ /**
+ *
+ */
+ public void _testAffinityCompatibility() {
+ mode = TopologyModificationMode.ADD;
+
+ AffinityFunction aff0 = new RendezvousAffinityFunction(true, 1024);
+
+ // Use the full copy of the old implementaion of the RendezvousAffinityFunction to check the compatibility.
+ AffinityFunction aff1 = new RendezvousAffinityFunctionOld(true, 1024);
+ GridTestUtils.setFieldValue(aff1, "ignite", ignite);
+
+ affinityCompatibility(aff0, aff1);
+ }
+
+ /**
+ * @param aff0 Affinity function to compare.
+ * @param aff1 Affinity function to compare.
+ */
+ private void affinityCompatibility(AffinityFunction aff0, AffinityFunction aff1) {
+ int[] nodesCnts = {64, 100, 200, 300, 400, 500, 600};
+
+ final int backups = 2;
+
+ mode = TopologyModificationMode.NONE;
+
+ for (int nodesCnt : nodesCnts) {
+ List<ClusterNode> nodes = createBaseNodes(nodesCnt);
+
+ List<List<ClusterNode>> assignment0 = assignPartitions(aff0, nodes, null, backups, 0).get2();
+
+ List<List<ClusterNode>> assignment1 = assignPartitions(aff1, nodes, null, backups, 0).get2();
+
+ assertEquals (assignment0, assignment1);
+ }
+ }
+
+ /**
+ *
+ */
+ private enum TopologyModificationMode {
+ /** Change the last node. */
+ CHANGE_LAST_NODE,
+
+ /** Change the first node. */
+ CHANGE_FIRST_NODE,
+
+ /** Add. */
+ ADD,
+
+ /** Remove random. */
+ REMOVE_RANDOM,
+
+ /** Do nothing*/
+ NONE
+ }
+
+ /**
+ * Full copy of the old implementation of the RendezvousAffinityFunction to check compatibility and performance.
+ */
+ private static class RendezvousAffinityFunctionOld implements AffinityFunction, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Default number of partitions. */
+ public static final int DFLT_PARTITION_COUNT = 1024;
+
+ /** Comparator. */
+ private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator();
+
+ /** Thread local message digest. */
+ private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() {
+ @Override protected MessageDigest initialValue() {
+ try {
+ return MessageDigest.getInstance("MD5");
+ }
+ catch (NoSuchAlgorithmException e) {
+ assert false : "Should have failed in constructor";
+
+ throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", e);
+ }
+ }
+ };
+
+ /** Number of partitions. */
+ private int parts;
+
+ /** Exclude neighbors flag. */
+ private boolean exclNeighbors;
+
+ /** Exclude neighbors warning. */
+ private transient boolean exclNeighborsWarn;
+
+ /** Optional backup filter. First node is primary, second node is a node being tested. */
+ private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter;
+
+ /** Optional affinity backups filter. The first node is a node being tested,
+ * the second is a list of nodes that are already assigned for a given partition (the first node in the list
+ * is primary). */
+ private IgniteBiPredicate<ClusterNode, List<ClusterNode>> affinityBackupFilter;
+
+ /** Hash ID resolver. */
+ private AffinityNodeHashResolver hashIdRslvr = null;
+
+ /** Ignite instance. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** Logger instance. */
+ @LoggerResource
+ private transient IgniteLogger log;
+
+ /**
+ * Empty constructor with all defaults.
+ */
+ public RendezvousAffinityFunctionOld() {
+ this(false);
+ }
+
+ /**
+ * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other
+ * and specified number of backups.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
+ * of each other.
+ */
+ public RendezvousAffinityFunctionOld(boolean exclNeighbors) {
+ this(exclNeighbors, DFLT_PARTITION_COUNT);
+ }
+
+ /**
+ * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other,
+ * and specified number of backups and partitions.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
+ * of each other.
+ * @param parts Total number of partitions.
+ */
+ public RendezvousAffinityFunctionOld(boolean exclNeighbors, int parts) {
+ this(exclNeighbors, parts, null);
+ }
+
+ /**
+ * Initializes optional counts for replicas and backups.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param parts Total number of partitions.
+ * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected
+ * from all nodes that pass this filter. First argument for this filter is primary node, and second
+ * argument is node being tested.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ */
+ public RendezvousAffinityFunctionOld(int parts, @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+ this(false, parts, backupFilter);
+ }
+
+ /**
+ * Private constructor.
+ *
+ * @param exclNeighbors Exclude neighbors flag.
+ * @param parts Partitions count.
+ * @param backupFilter Backup filter.
+ */
+ private RendezvousAffinityFunctionOld(boolean exclNeighbors, int parts,
+ IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+ A.ensure(parts > 0, "parts > 0");
+
+ this.exclNeighbors = exclNeighbors;
+ this.parts = parts;
+ this.backupFilter = backupFilter;
+
+ try {
+ MessageDigest.getInstance("MD5");
+ }
+ catch (NoSuchAlgorithmException e) {
+ throw new IgniteException("Failed to obtain MD5 message digest instance.", e);
+ }
+ }
+
+ /**
+ * Gets total number of key partitions. To ensure that all partitions are
+ * equally distributed across all nodes, please make sure that this
+ * number is significantly larger than a number of nodes. Also, partition
+ * size should be relatively small. Try to avoid having partitions with more
+ * than quarter million keys.
+ * <p>
+ * Note that for fully replicated caches this method should always
+ * return {@code 1}.
+ *
+ * @return Total partition count.
+ */
+ public int getPartitions() {
+ return parts;
+ }
+
+ /**
+ * Sets total number of partitions.
+ *
+ * @param parts Total number of partitions.
+ */
+ public void setPartitions(int parts) {
+ A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, "parts <= " + CacheConfiguration.MAX_PARTITIONS_COUNT);
+
+ this.parts = parts;
+ }
+
+ /**
+ * Gets hash ID resolver for nodes. This resolver is used to provide
+ * alternate hash ID, other than node ID.
+ * <p>
+ * Node IDs constantly change when nodes get restarted, which causes them to
+ * be placed on different locations in the hash ring, and hence causing
+ * repartitioning. Providing an alternate hash ID, which survives node restarts,
+ * puts node on the same location on the hash ring, hence minimizing required
+ * repartitioning.
+ *
+ * @return Hash ID resolver.
+ */
+ @Deprecated
+ public AffinityNodeHashResolver getHashIdResolver() {
+ return hashIdRslvr;
+ }
+
+ /**
+ * Sets hash ID resolver for nodes. This resolver is used to provide
+ * alternate hash ID, other than node ID.
+ * <p>
+ * Node IDs constantly change when nodes get restarted, which causes them to
+ * be placed on different locations in the hash ring, and hence causing
+ * repartitioning. Providing an alternate hash ID, which survives node restarts,
+ * puts node on the same location on the hash ring, hence minimizing required
+ * repartitioning.
+ *
+ * @param hashIdRslvr Hash ID resolver.
+ *
+ * @deprecated Use {@link IgniteConfiguration#setConsistentId(Serializable)} instead.
+ */
+ @Deprecated
+ public void setHashIdResolver(AffinityNodeHashResolver hashIdRslvr) {
+ this.hashIdRslvr = hashIdRslvr;
+ }
+
+ /**
+ * Gets optional backup filter. If not {@code null}, backups will be selected
+ * from all nodes that pass this filter. First node passed to this filter is primary node,
+ * and second node is a node being tested.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @return Optional backup filter.
+ */
+ @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() {
+ return backupFilter;
+ }
+
+ /**
+ * Sets optional backup filter. If provided, then backups will be selected from all
+ * nodes that pass this filter. First node being passed to this filter is primary node,
+ * and second node is a node being tested.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param backupFilter Optional backup filter.
+ * @deprecated Use {@code affinityBackupFilter} instead.
+ */
+ @Deprecated
+ public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+ this.backupFilter = backupFilter;
+ }
+
+ /**
+ * Gets optional backup filter. If not {@code null}, backups will be selected
+ * from all nodes that pass this filter. First node passed to this filter is a node being tested,
+ * and the second parameter is a list of nodes that are already assigned for a given partition (primary node is
+ * the first in the list).
+ * <p>
+ * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @return Optional backup filter.
+ */
+ @Nullable public IgniteBiPredicate<ClusterNode, List<ClusterNode>> getAffinityBackupFilter() {
+ return affinityBackupFilter;
+ }
+
+ /**
+ * Sets optional backup filter. If provided, then backups will be selected from all
+ * nodes that pass this filter. First node being passed to this filter is a node being tested,
+ * and the second parameter is a list of nodes that are already assigned for a given partition (primary node is
+ * the first in the list).
+ * <p>
+ * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param affinityBackupFilter Optional backup filter.
+ */
+ public void setAffinityBackupFilter(@Nullable IgniteBiPredicate<ClusterNode,
+ List<ClusterNode>> affinityBackupFilter) {
+ this.affinityBackupFilter = affinityBackupFilter;
+ }
+
+ /**
+ * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @return {@code True} if nodes residing on the same host may not act as backups of each other.
+ */
+ public boolean isExcludeNeighbors() {
+ return exclNeighbors;
+ }
+
+ /**
+ * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other.
+ */
+ public void setExcludeNeighbors(boolean exclNeighbors) {
+ this.exclNeighbors = exclNeighbors;
+ }
+
+ /**
+ * Resolves node hash.
+ *
+ * @param node Cluster node;
+ * @return Node hash.
+ */
+ public Object resolveNodeHash(ClusterNode node) {
+ if (hashIdRslvr != null)
+ return hashIdRslvr.resolve(node);
+ else
+ return node.consistentId();
+ }
+
+ /**
+ * Returns collection of nodes (primary first) for specified partition.
+ *
+ * @param d Message digest.
+ * @param part Partition.
+ * @param nodes Nodes.
+ * @param nodesHash Serialized nodes hashes.
+ * @param backups Number of backups.
+ * @param neighborhoodCache Neighborhood.
+ * @return Assignment.
+ */
+ public List<ClusterNode> assignPartition(MessageDigest d,
+ int part,
+ List<ClusterNode> nodes,
+ Map<ClusterNode, byte[]> nodesHash,
+ int backups,
+ @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
+ if (nodes.size() <= 1)
+ return nodes;
+
+ if (d == null)
+ d = digest.get();
+
+ List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size());
+
+ try {
+ for (int i = 0; i < nodes.size(); i++) {
+ ClusterNode node = nodes.get(i);
+
+ byte[] nodeHashBytes = nodesHash.get(node);
+
+ if (nodeHashBytes == null) {
+ Object nodeHash = resolveNodeHash(node);
+
+ byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+
+ // Add 4 bytes for partition bytes.
+ nodeHashBytes = new byte[nodeHashBytes0.length + 4];
+
+ System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length);
+
+ nodesHash.put(node, nodeHashBytes);
+ }
+
+ U.intToBytes(part, nodeHashBytes, 0);
+
+ d.reset();
+
+ byte[] bytes = d.digest(nodeHashBytes);
+
+ long hash =
+ (bytes[0] & 0xFFL)
+ | ((bytes[1] & 0xFFL) << 8)
+ | ((bytes[2] & 0xFFL) << 16)
+ | ((bytes[3] & 0xFFL) << 24)
+ | ((bytes[4] & 0xFFL) << 32)
+ | ((bytes[5] & 0xFFL) << 40)
+ | ((bytes[6] & 0xFFL) << 48)
+ | ((bytes[7] & 0xFFL) << 56);
+
+ lst.add(F.t(hash, node));
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ Collections.sort(lst, COMPARATOR);
+
+ int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size());
+
+ List<ClusterNode> res = new ArrayList<>(primaryAndBackups);
+
+ ClusterNode primary = lst.get(0).get2();
+
+ res.add(primary);
+
+ // Select backups.
+ if (backups > 0) {
+ for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
+ IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
+
+ ClusterNode node = next.get2();
+
+ if (exclNeighbors) {
+ Collection<ClusterNode> allNeighbors = GridCacheUtils.neighborsForNodes(neighborhoodCache, res);
+
+ if (!allNeighbors.contains(node))
+ res.add(node);
+ }
+ else if (affinityBackupFilter != null && affinityBackupFilter.apply(node, res))
+ res.add(next.get2());
+ else if (backupFilter != null && backupFilter.apply(primary, node))
+ res.add(next.get2());
+ else if (affinityBackupFilter == null && backupFilter == null)
+ res.add(next.get2());
+ }
+ }
+
+ if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) {
+ // Need to iterate again in case if there are no nodes which pass exclude neighbors backups criteria.
+ for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
+ IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
+
+ ClusterNode node = next.get2();
+
+ if (!res.contains(node))
+ res.add(next.get2());
+ }
+
+ if (!exclNeighborsWarn) {
+ LT.warn(log, "Affinity function excludeNeighbors property is ignored " +
+ "because topology has no enough nodes to assign backups.");
+
+ exclNeighborsWarn = true;
+ }
+ }
+
+ assert res.size() <= primaryAndBackups;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitions() {
+ return parts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key) {
+ if (key == null)
+ throw new IllegalArgumentException("Null key is passed for a partition calculation. " +
+ "Make sure that an affinity key that is used is initialized properly.");
+
+ return U.safeAbs(key.hashCode() % parts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+ List<List<ClusterNode>> assignments = new ArrayList<>(parts);
+
+ Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
+ GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
+
+ MessageDigest d = digest.get();
+
+ List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+
+ Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size());
+
+ for (int i = 0; i < parts; i++) {
+ List<ClusterNode> partAssignment = assignPartition(d,
+ i,
+ nodes,
+ nodesHash,
+ affCtx.backups(),
+ neighborhoodCache);
+
+ assignments.add(partAssignment);
+ }
+
+ return assignments;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeNode(UUID nodeId) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(parts);
+ out.writeBoolean(exclNeighbors);
+ out.writeObject(hashIdRslvr);
+ out.writeObject(backupFilter);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ parts = in.readInt();
+ exclNeighbors = in.readBoolean();
+ hashIdRslvr = (AffinityNodeHashResolver)in.readObject();
+ backupFilter = (IgniteBiPredicate<ClusterNode, ClusterNode>)in.readObject();
+ }
+
+ /**
+ *
+ */
+ private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) {
+ return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 :
+ o1.get2().id().compareTo(o2.get2().id());
+ }
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
index 3e79da0,0716c20..7baea2e
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
@@@ -18,11 -18,7 +18,12 @@@
package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
+import org.apache.ignite.GridCacheAffinityBackupsSelfTest;
+import org.apache.ignite.IgniteCacheAffinitySelfTest;
+import org.apache.ignite.cache.affinity.AffinityClientNodeSelfTest;
+import org.apache.ignite.cache.affinity.AffinityHistoryCleanupTest;
+import org.apache.ignite.cache.affinity.local.LocalAffinityFunctionTest;
+ import org.apache.ignite.internal.processors.cache.CacheKeepBinaryTransactionTest;
import org.apache.ignite.internal.processors.cache.CacheNearReaderUpdateTest;
import org.apache.ignite.internal.processors.cache.CacheRebalancingSelfTest;
import org.apache.ignite.internal.processors.cache.CacheSerializableTransactionsTest;
@@@ -58,10 -53,12 +59,11 @@@ public class IgniteCacheTestSuite5 exte
suite.addTestSuite(IgniteCacheStoreCollectionTest.class);
suite.addTestSuite(IgniteCacheWriteBehindNoUpdateSelfTest.class);
suite.addTestSuite(IgniteCachePutStackOverflowSelfTest.class);
- suite.addTestSuite(GridCacheSwapSpaceSpiConsistencySelfTest.class);
+ suite.addTestSuite(CacheKeepBinaryTransactionTest.class);
suite.addTestSuite(CacheLateAffinityAssignmentTest.class);
- suite.addTestSuite(CacheLateAffinityAssignmentFairAffinityTest.class);
suite.addTestSuite(CacheLateAffinityAssignmentNodeJoinValidationTest.class);
+ suite.addTestSuite(IgniteActiveOnStartNodeJoinValidationSelfTest.class);
suite.addTestSuite(EntryVersionConsistencyReadThroughTest.class);
suite.addTestSuite(IgniteCacheSyncRebalanceModeSelfTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinarySurrogateTypeDescriptor.cs
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/48494478/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
----------------------------------------------------------------------
diff --cc modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
index 61439b1,1dae576..f47cbe2
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
@@@ -497,8 -505,8 +497,8 @@@ namespace Apache.Ignite.Core.Impl.Binar
var ser = GetSerializer(_cfg, null, type, typeId, null, null, _log);
desc = desc == null
- ? new BinaryFullTypeDescriptor(type, typeId, typeName, true, _cfg.DefaultNameMapper,
- _cfg.DefaultIdMapper, ser, false, null, type.IsEnum, registered)
+ ? new BinaryFullTypeDescriptor(type, typeId, typeName, true, _cfg.NameMapper,
- _cfg.IdMapper, ser, false, null, type.IsEnum, null, registered)
++ _cfg.IdMapper, ser, false, null, type.IsEnum, registered)
: new BinaryFullTypeDescriptor(desc, type, ser, registered);
if (RegistrationDisabled)