You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/01/16 16:43:14 UTC
[11/12] incubator-ignite git commit: #IGNITE-99: Refactoring. Move
GridCache.affinity() to Ignite.affinity(String cacheName).
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapper.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapper.java
new file mode 100644
index 0000000..3584200
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapper.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import org.gridgain.grid.cache.*;
+
+import java.io.*;
+
+/**
+ * Affinity mapper which maps cache key to an affinity key. Affinity key is a key which will be
+ * used to determine a node on which this key will be cached. Every cache key will first be passed
+ * through {@link #affinityKey(Object)} method, and the returned value of this method
+ * will be given to {@link CacheAffinityFunction} implementation to find out key-to-node affinity.
+ * <p>
+ * The default implementation, which will be used if no explicit affinity mapper is specified
+ * in cache configuration, will first look for any field or method annotated with
+ * {@link CacheAffinityKeyMapped @GridCacheAffinityKeyMapped} annotation. If such field or method
+ * is not found, then the cache key itself will be returned from {@link #affinityKey(Object) affinityKey(Object)}
+ * method (this means that all objects with the same cache key will always be routed to the same node).
+ * If such field or method is found, then the value of this field or method will be returned from
+ * {@link #affinityKey(Object) affinityKey(Object)} method. This allows to specify alternate affinity key, other
+ * than the cache key itself, whenever needed.
+ * <p>
+ * A custom (other than default) affinity mapper can be provided
+ * via {@link GridCacheConfiguration#getAffinityMapper()} configuration property.
+ * <p>
+ * For more information on affinity mapping and examples refer to {@link CacheAffinityFunction} and
+ * {@link CacheAffinityKeyMapped @GridCacheAffinityKeyMapped} documentation.
+ * @see CacheAffinityFunction
+ * @see CacheAffinityKeyMapped
+ */
+public interface CacheAffinityKeyMapper extends Serializable {
+ /**
+ * Maps passed in key to an alternate key which will be used for node affinity.
+ *
+ * @param key Key to map.
+ * @return Key to be used for node-to-affinity mapping (may be the same
+ * key as passed in).
+ */
+ public Object affinityKey(Object key);
+
+ /**
+ * Resets cache affinity mapper to its initial state. This method will be called by
+ * the system any time the affinity mapper has been sent to remote node where
+ * it has to be reinitialized. If your implementation of affinity mapper
+ * has no initialization logic, leave this method empty.
+ */
+ public void reset();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeAddressHashResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeAddressHashResolver.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeAddressHashResolver.java
new file mode 100644
index 0000000..732ebf6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeAddressHashResolver.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import org.apache.ignite.cluster.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+/**
+ * Node hash resolver which uses {@link org.apache.ignite.cluster.ClusterNode#consistentId()} as alternate hash value.
+ */
+public class CacheAffinityNodeAddressHashResolver implements CacheAffinityNodeHashResolver {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public Object resolve(ClusterNode node) {
+ return node.consistentId();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheAffinityNodeAddressHashResolver.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeHashResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeHashResolver.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeHashResolver.java
new file mode 100644
index 0000000..79489cf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeHashResolver.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import org.apache.ignite.cluster.*;
+
+import java.io.*;
+
+/**
+ * Resolver which is used to provide node hash value for affinity function.
+ * <p>
+ * Node IDs constantly change when nodes get restarted, which causes affinity mapping to change between restarts,
+ * and hence causing redundant repartitioning. Providing an alternate node hash value, which survives node restarts,
+ * will help to map keys to the same nodes whenever possible.
+ * <p>
+ * Note that on case clients exist they will query this object from the server and use it for affinity calculation.
+ * Therefore you must ensure that server and clients can marshal and unmarshal this object in portable format,
+ * i.e. all parties have object class(es) configured as portable.
+ */
+public interface CacheAffinityNodeHashResolver extends Serializable {
+ /**
+ * Resolve alternate hash value for the given Grid node.
+ *
+ * @param node Grid node.
+ * @return Resolved hash ID.
+ */
+ public Object resolve(ClusterNode node);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeIdHashResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeIdHashResolver.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeIdHashResolver.java
new file mode 100644
index 0000000..9604d5a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeIdHashResolver.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import org.apache.ignite.cluster.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+/**
+ * Node hash resolver which uses generated node ID as node hash value. As new node ID is generated
+ * on each node start, this resolver do not provide ability to map keys to the same nodes after restart.
+ */
+public class CacheAffinityNodeIdHashResolver implements CacheAffinityNodeHashResolver {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public Object resolve(ClusterNode node) {
+ return node.id();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheAffinityNodeIdHashResolver.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheCentralizedAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheCentralizedAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheCentralizedAffinityFunction.java
new file mode 100644
index 0000000..0df8ffe
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheCentralizedAffinityFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import java.lang.annotation.*;
+
+/**
+ * Annotation marker which identifies affinity function that must be calculated on one centralized node
+ * instead of independently on each node. In many cases it happens because it requires previous affinity state
+ * in order to calculate new one.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface CacheCentralizedAffinityFunction {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/CacheConsistentHashAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/CacheConsistentHashAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/CacheConsistentHashAffinityFunction.java
new file mode 100644
index 0000000..17b53fb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/CacheConsistentHashAffinityFunction.java
@@ -0,0 +1,703 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.consistenthash;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Affinity function for partitioned cache. This function supports the following
+ * configuration:
+ * <ul>
+ * <li>
+ * {@code backups} - Use this flag to control how many back up nodes will be
+ * assigned to every key. The default value is {@code 0}.
+ * </li>
+ * <li>
+ * {@code replicas} - Generally the more replicas a node gets, the more key assignments
+ * it will receive. You can configure different number of replicas for a node by
+ * setting user attribute with name {@link #getReplicaCountAttributeName()} to some
+ * number. Default value is {@code 512} defined by {@link #DFLT_REPLICA_COUNT} constant.
+ * </li>
+ * <li>
+ * {@code backupFilter} - Optional filter for back up nodes. If provided, then only
+ * nodes that pass this filter will be selected as backup nodes. If not provided, then
+ * primary and backup nodes will be selected out of all nodes available for this cache.
+ * </li>
+ * </ul>
+ * <p>
+ * Cache affinity can be configured for individual caches via {@link GridCacheConfiguration#getAffinity()} method.
+ */
+public class CacheConsistentHashAffinityFunction implements CacheAffinityFunction {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Flag to enable/disable consistency check (for internal use only). */
+ private static final boolean AFFINITY_CONSISTENCY_CHECK = Boolean.getBoolean("GRIDGAIN_AFFINITY_CONSISTENCY_CHECK");
+
+ /** Default number of partitions. */
+ public static final int DFLT_PARTITION_COUNT = 10000;
+
+ /** Default replica count for partitioned caches. */
+ public static final int DFLT_REPLICA_COUNT = 128;
+
+ /**
+ * Name of node attribute to specify number of replicas for a node.
+ * Default value is {@code gg:affinity:node:replicas}.
+ */
+ public static final String DFLT_REPLICA_COUNT_ATTR_NAME = "gg:affinity:node:replicas";
+
+ /** Node hash. */
+ private transient GridConsistentHash<NodeInfo> nodeHash;
+
+ /** Total number of partitions. */
+ private int parts = DFLT_PARTITION_COUNT;
+
+ /** */
+ private int replicas = DFLT_REPLICA_COUNT;
+
+ /** */
+ private String attrName = DFLT_REPLICA_COUNT_ATTR_NAME;
+
+ /** */
+ private boolean exclNeighbors;
+
+ /**
+ * Optional backup filter. First node passed to this filter is primary node,
+ * and second node is a node being tested.
+ */
+ private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter;
+
+ /** */
+ private CacheAffinityNodeHashResolver hashIdRslvr = new CacheAffinityNodeAddressHashResolver();
+
+ /** Injected grid. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** Injected cache name. */
+ @IgniteCacheNameResource
+ private String cacheName;
+
+ /** Injected logger. */
+ @IgniteLoggerResource
+ private IgniteLogger log;
+
+ /** Initialization flag. */
+ @SuppressWarnings("TransientFieldNotInitialized")
+ private transient AtomicBoolean init = new AtomicBoolean();
+
+ /** Latch for initializing. */
+ @SuppressWarnings({"TransientFieldNotInitialized"})
+ private transient CountDownLatch initLatch = new CountDownLatch(1);
+
+ /** Nodes IDs. */
+ @GridToStringInclude
+ @SuppressWarnings({"TransientFieldNotInitialized"})
+ private transient ConcurrentMap<UUID, NodeInfo> addedNodes = new ConcurrentHashMap<>();
+
+ /** Optional backup filter. */
+ @GridToStringExclude
+ private final IgniteBiPredicate<NodeInfo, NodeInfo> backupIdFilter = new IgniteBiPredicate<NodeInfo, NodeInfo>() {
+ @Override public boolean apply(NodeInfo primaryNodeInfo, NodeInfo nodeInfo) {
+ return backupFilter == null || backupFilter.apply(primaryNodeInfo.node(), nodeInfo.node());
+ }
+ };
+
+ /** Map of neighbors. */
+ @SuppressWarnings("TransientFieldNotInitialized")
+ private transient ConcurrentMap<UUID, Collection<UUID>> neighbors =
+ new ConcurrentHashMap8<>();
+
+ /**
+ * Empty constructor with all defaults.
+ */
+ public CacheConsistentHashAffinityFunction() {
+ // No-op.
+ }
+
+ /**
+ * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other
+ * and specified number of backups.
+ * <p>
+ * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+ *
+ * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
+ * of each other.
+ */
+ public CacheConsistentHashAffinityFunction(boolean exclNeighbors) {
+ this.exclNeighbors = exclNeighbors;
+ }
+
+ /**
+ * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other,
+ * and specified number of backups and partitions.
+ * <p>
+ * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+ *
+ * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
+ * of each other.
+ * @param parts Total number of partitions.
+ */
+ public CacheConsistentHashAffinityFunction(boolean exclNeighbors, int parts) {
+ A.ensure(parts != 0, "parts != 0");
+
+ this.exclNeighbors = exclNeighbors;
+ this.parts = parts;
+ }
+
+ /**
+ * Initializes optional counts for replicas and backups.
+ * <p>
+ * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+ *
+ * @param parts Total number of partitions.
+ * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected
+ * from all nodes that pass this filter. First argument for this filter is primary node, and second
+ * argument is node being tested.
+ * <p>
+ * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+ */
+ public CacheConsistentHashAffinityFunction(int parts,
+ @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+ A.ensure(parts != 0, "parts != 0");
+
+ this.parts = parts;
+ this.backupFilter = backupFilter;
+ }
+
+ /**
+ * Gets default count of virtual replicas in consistent hash ring.
+ * <p>
+ * To determine node replicas, node attribute with {@link #getReplicaCountAttributeName()}
+ * name will be checked first. If it is absent, then this value will be used.
+ *
+ * @return Count of virtual replicas in consistent hash ring.
+ */
+ public int getDefaultReplicas() {
+ return replicas;
+ }
+
+ /**
+ * Sets default count of virtual replicas in consistent hash ring.
+ * <p>
+ * To determine node replicas, node attribute with {@link #getReplicaCountAttributeName} name
+ * will be checked first. If it is absent, then this value will be used.
+ *
+ * @param replicas Count of virtual replicas in consistent hash ring.s
+ */
+ public void setDefaultReplicas(int replicas) {
+ this.replicas = replicas;
+ }
+
+ /**
+ * Gets total number of key partitions. To ensure that all partitions are
+ * equally distributed across all nodes, please make sure that this
+ * number is significantly larger than a number of nodes. Also, partition
+ * size should be relatively small. Try to avoid having partitions with more
+ * than quarter million keys.
+ * <p>
+ * Note that for fully replicated caches this method should always
+ * return {@code 1}.
+ *
+ * @return Total partition count.
+ */
+ public int getPartitions() {
+ return parts;
+ }
+
+ /**
+ * Sets total number of partitions.
+ *
+ * @param parts Total number of partitions.
+ */
+ public void setPartitions(int parts) {
+ this.parts = parts;
+ }
+
+ /**
+ * Gets hash ID resolver for nodes. This resolver is used to provide
+ * alternate hash ID, other than node ID.
+ * <p>
+ * Node IDs constantly change when nodes get restarted, which causes them to
+ * be placed on different locations in the hash ring, and hence causing
+ * repartitioning. Providing an alternate hash ID, which survives node restarts,
+ * puts node on the same location on the hash ring, hence minimizing required
+ * repartitioning.
+ *
+ * @return Hash ID resolver.
+ */
+ public CacheAffinityNodeHashResolver getHashIdResolver() {
+ return hashIdRslvr;
+ }
+
+ /**
+ * Sets hash ID resolver for nodes. This resolver is used to provide
+ * alternate hash ID, other than node ID.
+ * <p>
+ * Node IDs constantly change when nodes get restarted, which causes them to
+ * be placed on different locations in the hash ring, and hence causing
+ * repartitioning. Providing an alternate hash ID, which survives node restarts,
+ * puts node on the same location on the hash ring, hence minimizing required
+ * repartitioning.
+ *
+ * @param hashIdRslvr Hash ID resolver.
+ */
+ public void setHashIdResolver(CacheAffinityNodeHashResolver hashIdRslvr) {
+ this.hashIdRslvr = hashIdRslvr;
+ }
+
+ /**
+ * Gets optional backup filter. If not {@code null}, backups will be selected
+ * from all nodes that pass this filter. First node passed to this filter is primary node,
+ * and second node is a node being tested.
+ * <p>
+ * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+ *
+ * @return Optional backup filter.
+ */
+ @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() {
+ return backupFilter;
+ }
+
+ /**
+ * Sets optional backup filter. If provided, then backups will be selected from all
+ * nodes that pass this filter. First node being passed to this filter is primary node,
+ * and second node is a node being tested.
+ * <p>
+ * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+ *
+ * @param backupFilter Optional backup filter.
+ */
+ public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+ this.backupFilter = backupFilter;
+ }
+
+ /**
+ * Gets optional attribute name for replica count. If not provided, the
+ * default is {@link #DFLT_REPLICA_COUNT_ATTR_NAME}.
+ *
+ * @return User attribute name for replica count for a node.
+ */
+ public String getReplicaCountAttributeName() {
+ return attrName;
+ }
+
+ /**
+ * Sets optional attribute name for replica count. If not provided, the
+ * default is {@link #DFLT_REPLICA_COUNT_ATTR_NAME}.
+ *
+ * @param attrName User attribute name for replica count for a node.
+ */
+ public void setReplicaCountAttributeName(String attrName) {
+ this.attrName = attrName;
+ }
+
+ /**
+ * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
+ * <p>
+ * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+ *
+ * @return {@code True} if nodes residing on the same host may not act as backups of each other.
+ */
+ public boolean isExcludeNeighbors() {
+ return exclNeighbors;
+ }
+
+ /**
+ * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
+ * <p>
+ * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+ *
+ * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other.
+ */
+ public void setExcludeNeighbors(boolean exclNeighbors) {
+ this.exclNeighbors = exclNeighbors;
+ }
+
+ /**
+ * Gets neighbors for a node.
+ *
+ * @param node Node.
+ * @return Neighbors.
+ */
+ private Collection<UUID> neighbors(final ClusterNode node) {
+ Collection<UUID> ns = neighbors.get(node.id());
+
+ if (ns == null) {
+ Collection<ClusterNode> nodes = ignite.cluster().forHost(node).nodes();
+
+ ns = F.addIfAbsent(neighbors, node.id(), new ArrayList<>(F.nodeIds(nodes)));
+ }
+
+ return ns;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public List<List<ClusterNode>> assignPartitions(CacheAffinityFunctionContext ctx) {
+ List<List<ClusterNode>> res = new ArrayList<>(parts);
+
+ Collection<ClusterNode> topSnapshot = ctx.currentTopologySnapshot();
+
+ for (int part = 0; part < parts; part++) {
+ res.add(F.isEmpty(topSnapshot) ?
+ Collections.<ClusterNode>emptyList() :
+ // Wrap affinity nodes with unmodifiable list since unmodifiable generic collection
+ // doesn't provide equals and hashCode implementations.
+ U.sealList(nodes(part, topSnapshot, ctx.backups())));
+ }
+
+ return res;
+ }
+
+ /**
+ * Assigns nodes to one partition.
+ *
+ * @param part Partition to assign nodes for.
+ * @param nodes Cache topology nodes.
+ * @return Assigned nodes, first node is primary, others are backups.
+ */
+ public Collection<ClusterNode> nodes(int part, Collection<ClusterNode> nodes, int backups) {
+ if (nodes == null)
+ return Collections.emptyList();
+
+ int nodesSize = nodes.size();
+
+ if (nodesSize == 0)
+ return Collections.emptyList();
+
+ if (nodesSize == 1) // Minor optimization.
+ return nodes;
+
+ initialize();
+
+ final Map<NodeInfo, ClusterNode> lookup = new GridLeanMap<>(nodesSize);
+
+ // Store nodes in map for fast lookup.
+ for (ClusterNode n : nodes)
+ // Add nodes into hash circle, if absent.
+ lookup.put(resolveNodeInfo(n), n);
+
+ Collection<NodeInfo> selected;
+
+ if (backupFilter != null) {
+ final IgnitePredicate<NodeInfo> p = new P1<NodeInfo>() {
+ @Override public boolean apply(NodeInfo id) {
+ return lookup.containsKey(id);
+ }
+ };
+
+ final NodeInfo primaryId = nodeHash.node(part, p);
+
+ IgnitePredicate<NodeInfo> backupPrimaryIdFilter = new IgnitePredicate<NodeInfo>() {
+ @Override public boolean apply(NodeInfo node) {
+ return backupIdFilter.apply(primaryId, node);
+ }
+ };
+
+ Collection<NodeInfo> backupIds = nodeHash.nodes(part, backups, p, backupPrimaryIdFilter);
+
+ if (F.isEmpty(backupIds) && primaryId != null) {
+ ClusterNode n = lookup.get(primaryId);
+
+ assert n != null;
+
+ return Collections.singletonList(n);
+ }
+
+ selected = primaryId != null ? F.concat(false, primaryId, backupIds) : backupIds;
+ }
+ else {
+ if (!exclNeighbors) {
+ selected = nodeHash.nodes(part, backups == Integer.MAX_VALUE ? backups : backups + 1, new P1<NodeInfo>() {
+ @Override public boolean apply(NodeInfo id) {
+ return lookup.containsKey(id);
+ }
+ });
+
+ if (selected.size() == 1) {
+ NodeInfo id = F.first(selected);
+
+ assert id != null : "Node ID cannot be null in affinity node ID collection: " + selected;
+
+ ClusterNode n = lookup.get(id);
+
+ assert n != null;
+
+ return Collections.singletonList(n);
+ }
+ }
+ else {
+ int primaryAndBackups = backups + 1;
+
+ selected = new ArrayList<>(primaryAndBackups);
+
+ final Collection<NodeInfo> selected0 = selected;
+
+ List<NodeInfo> ids = nodeHash.nodes(part, primaryAndBackups, new P1<NodeInfo>() {
+ @Override public boolean apply(NodeInfo id) {
+ ClusterNode n = lookup.get(id);
+
+ if (n == null)
+ return false;
+
+ Collection<UUID> neighbors = neighbors(n);
+
+ for (NodeInfo id0 : selected0) {
+ ClusterNode n0 = lookup.get(id0);
+
+ if (n0 == null)
+ return false;
+
+ Collection<UUID> neighbors0 = neighbors(n0);
+
+ if (F.containsAny(neighbors0, neighbors))
+ return false;
+ }
+
+ selected0.add(id);
+
+ return true;
+ }
+ });
+
+ if (AFFINITY_CONSISTENCY_CHECK)
+ assert F.eqOrdered(ids, selected);
+ }
+ }
+
+ Collection<ClusterNode> ret = new ArrayList<>(selected.size());
+
+ for (NodeInfo id : selected) {
+ ClusterNode n = lookup.get(id);
+
+ assert n != null;
+
+ ret.add(n);
+ }
+
+ return ret;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key) {
+ initialize();
+
+ return U.safeAbs(key.hashCode() % parts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitions() {
+ initialize();
+
+ return parts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ addedNodes = new ConcurrentHashMap<>();
+ neighbors = new ConcurrentHashMap8<>();
+
+ initLatch = new CountDownLatch(1);
+
+ init = new AtomicBoolean();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeNode(UUID nodeId) {
+ NodeInfo info = addedNodes.remove(nodeId);
+
+ if (info == null)
+ return;
+
+ nodeHash.removeNode(info);
+
+ neighbors.clear();
+ }
+
+ /**
+ * Resolve node info for specified node.
+ * Add node to hash circle if this is the first node invocation.
+ *
+ * @param n Node to get info for.
+ * @return Node info.
+ */
+ private NodeInfo resolveNodeInfo(ClusterNode n) {
+ UUID nodeId = n.id();
+ NodeInfo nodeInfo = addedNodes.get(nodeId);
+
+ if (nodeInfo != null)
+ return nodeInfo;
+
+ assert hashIdRslvr != null;
+
+ nodeInfo = new NodeInfo(nodeId, hashIdRslvr.resolve(n), n);
+
+ neighbors.clear();
+
+ nodeHash.addNode(nodeInfo, replicas(n));
+
+ addedNodes.put(nodeId, nodeInfo);
+
+ return nodeInfo;
+ }
+
+ /** {@inheritDoc} */
+ private void initialize() {
+ if (!init.get() && init.compareAndSet(false, true)) {
+ if (log.isInfoEnabled())
+ log.info("Consistent hash configuration [cacheName=" + cacheName + ", partitions=" + parts +
+ ", excludeNeighbors=" + exclNeighbors + ", replicas=" + replicas +
+ ", backupFilter=" + backupFilter + ", hashIdRslvr=" + hashIdRslvr + ']');
+
+ nodeHash = new GridConsistentHash<>();
+
+ initLatch.countDown();
+ }
+ else {
+ if (initLatch.getCount() > 0) {
+ try {
+ U.await(initLatch);
+ }
+ catch (GridInterruptedException ignored) {
+ // Recover interrupted state flag.
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ /**
+ * @param n Node.
+ * @return Replicas.
+ */
+ private int replicas(ClusterNode n) {
+ Integer nodeReplicas = n.attribute(attrName);
+
+ if (nodeReplicas == null)
+ nodeReplicas = replicas;
+
+ return nodeReplicas;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheConsistentHashAffinityFunction.class, this);
+ }
+
+ /**
+ * Node hash ID.
+ */
+ private static final class NodeInfo implements Comparable<NodeInfo> {
+ /** Node ID. */
+ private UUID nodeId;
+
+ /** Hash ID. */
+ private Object hashId;
+
+ /** Grid node. */
+ private ClusterNode node;
+
+ /**
+ * @param nodeId Node ID.
+ * @param hashId Hash ID.
+ * @param node Rich node.
+ */
+ private NodeInfo(UUID nodeId, Object hashId, ClusterNode node) {
+ assert nodeId != null;
+ assert hashId != null;
+
+ this.hashId = hashId;
+ this.nodeId = nodeId;
+ this.node = node;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Hash ID.
+ */
+ public Object hashId() {
+ return hashId;
+ }
+
+ /**
+ * @return Node.
+ */
+ public ClusterNode node() {
+ return node;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return hashId.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (!(obj instanceof NodeInfo))
+ return false;
+
+ NodeInfo that = (NodeInfo)obj;
+
+ // If objects are equal, hash codes should be the same.
+ // Cannot use that.hashId.equals(hashId) due to Comparable<N> interface restrictions.
+ return that.nodeId.equals(nodeId) && that.hashCode() == hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(NodeInfo o) {
+ int diff = nodeId.compareTo(o.nodeId);
+
+ if (diff == 0) {
+ int h1 = hashCode();
+ int h2 = o.hashCode();
+
+ diff = h1 == h2 ? 0 : (h1 < h2 ? -1 : 1);
+ }
+
+ return diff;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(NodeInfo.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/package.html b/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/package.html
new file mode 100644
index 0000000..f5d5e93
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/package.html
@@ -0,0 +1,24 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<body>
+ <!-- Package description. -->
+ Contains consistent hash based cache affinity for partitioned cache.
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/CachePartitionFairAffinity.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/CachePartitionFairAffinity.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/CachePartitionFairAffinity.java
new file mode 100644
index 0000000..3b34413
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/CachePartitionFairAffinity.java
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.fair;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Fair affinity function which tries to ensure that all nodes get equal number of partitions with
+ * minimum amount of reassignments between existing nodes.
+ * <p>
+ * Cache affinity can be configured for individual caches via {@link GridCacheConfiguration#getAffinity()} method.
+ */
+@CacheCentralizedAffinityFunction
+public class CachePartitionFairAffinity implements CacheAffinityFunction {
+ /** Default partition count. */
+ public static final int DFLT_PART_CNT = 256;
+
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Ascending comparator. */
+ private static final Comparator<PartitionSet> ASC_CMP = new PartitionSetComparator(false);
+
+ /** Descending comparator. */
+ private static final Comparator<PartitionSet> DESC_CMP = new PartitionSetComparator(true);
+
+ /** */
+ private int parts;
+
+ /**
+ * Creates fair affinity with default partition count.
+ */
+ public CachePartitionFairAffinity() {
+ this(DFLT_PART_CNT);
+ }
+
+ /**
+ * @param parts Number of partitions.
+ */
+ public CachePartitionFairAffinity(int parts) {
+ this.parts = parts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> assignPartitions(CacheAffinityFunctionContext ctx) {
+ List<ClusterNode> topSnapshot = ctx.currentTopologySnapshot();
+
+ if (topSnapshot.size() == 1) {
+ ClusterNode primary = topSnapshot.get(0);
+
+ List<List<ClusterNode>> assignments = new ArrayList<>(parts);
+
+ for (int i = 0; i < parts; i++)
+ assignments.add(Collections.singletonList(primary));
+
+ return assignments;
+ }
+
+ IgniteBiTuple<List<List<ClusterNode>>, Map<UUID, PartitionSet>> cp = createCopy(ctx, topSnapshot);
+
+ List<List<ClusterNode>> assignment = cp.get1();
+
+ int tiers = Math.min(ctx.backups() + 1, topSnapshot.size());
+
+ // Per tier pending partitions.
+ Map<Integer, Queue<Integer>> pendingParts = new HashMap<>();
+
+ FullAssignmentMap fullMap = new FullAssignmentMap(tiers, assignment, topSnapshot);
+
+ for (int tier = 0; tier < tiers; tier++) {
+ // Check if this is a new tier and add pending partitions.
+ Queue<Integer> pending = pendingParts.get(tier);
+
+ for (int part = 0; part < parts; part++) {
+ if (fullMap.assignments.get(part).size() < tier + 1) {
+ if (pending == null) {
+ pending = new LinkedList<>();
+
+ pendingParts.put(tier, pending);
+ }
+
+ if (!pending.contains(part))
+ pending.add(part);
+
+ }
+ }
+
+ // Assign pending partitions, if any.
+ assignPending(tier, pendingParts, fullMap, topSnapshot);
+
+ // Balance assignments.
+ balance(tier, pendingParts, fullMap, topSnapshot);
+ }
+
+ return fullMap.assignments;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitions() {
+ return parts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key) {
+ return U.safeAbs(hash(key.hashCode())) % parts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeNode(UUID nodeId) {
+ // No-op.
+ }
+
+ /**
+ * Assigns pending (unassigned) partitions to nodes.
+ *
+ * @param tier Tier to assign (0 is primary, 1 - 1st backup,...).
+ * @param pendingMap Pending partitions per tier.
+ * @param fullMap Full assignment map to modify.
+ * @param topSnapshot Topology snapshot.
+ */
+ private void assignPending(int tier, Map<Integer, Queue<Integer>> pendingMap, FullAssignmentMap fullMap,
+ List<ClusterNode> topSnapshot) {
+ Queue<Integer> pending = pendingMap.get(tier);
+
+ if (F.isEmpty(pending))
+ return;
+
+ int idealPartCnt = parts / topSnapshot.size();
+
+ Map<UUID, PartitionSet> tierMapping = fullMap.tierMapping(tier);
+
+ PrioritizedPartitionMap underloadedNodes = filterNodes(tierMapping, idealPartCnt, false);
+
+ // First iterate over underloaded nodes.
+ assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, false);
+
+ if (!pending.isEmpty() && !underloadedNodes.isEmpty()) {
+ // Same, forcing updates.
+ assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, true);
+ }
+
+ if (!pending.isEmpty())
+ assignPendingToNodes(tier, pendingMap, fullMap, topSnapshot);
+
+ assert pending.isEmpty();
+
+ pendingMap.remove(tier);
+ }
+
+ /**
+ * Assigns pending partitions to underloaded nodes.
+ *
+ * @param tier Tier to assign.
+ * @param pendingMap Pending partitions per tier.
+ * @param fullMap Full assignment map to modify.
+ * @param underloadedNodes Underloaded nodes.
+ * @param topSnapshot Topology snapshot.
+ * @param force {@code True} if partitions should be moved.
+ */
+ private void assignPendingToUnderloaded(
+ int tier,
+ Map<Integer, Queue<Integer>> pendingMap,
+ FullAssignmentMap fullMap,
+ PrioritizedPartitionMap underloadedNodes,
+ Collection<ClusterNode> topSnapshot,
+ boolean force) {
+ Iterator<Integer> it = pendingMap.get(tier).iterator();
+
+ int ideal = parts / topSnapshot.size();
+
+ while (it.hasNext()) {
+ int part = it.next();
+
+ for (PartitionSet set : underloadedNodes.assignments()) {
+ ClusterNode node = set.node();
+
+ assert node != null;
+
+ if (fullMap.assign(part, tier, node, force, pendingMap)) {
+ // We could add partition to partition map without forcing, remove partition from pending.
+ it.remove();
+
+ if (set.size() <= ideal)
+ underloadedNodes.remove(set.nodeId());
+ else
+ underloadedNodes.update();
+
+ break; // for, continue to the next partition.
+ }
+ }
+
+ if (underloadedNodes.isEmpty())
+ return;
+ }
+ }
+
+ /**
+ * Spreads pending partitions equally to all nodes in topology snapshot.
+ *
+ * @param tier Tier to assign.
+ * @param pendingMap Pending partitions per tier.
+ * @param fullMap Full assignment map to modify.
+ * @param topSnapshot Topology snapshot.
+ */
+ private void assignPendingToNodes(int tier, Map<Integer, Queue<Integer>> pendingMap,
+ FullAssignmentMap fullMap, List<ClusterNode> topSnapshot) {
+ Iterator<Integer> it = pendingMap.get(tier).iterator();
+
+ int idx = 0;
+
+ while (it.hasNext()) {
+ int part = it.next();
+
+ int i = idx;
+
+ boolean assigned = false;
+
+ do {
+ ClusterNode node = topSnapshot.get(i);
+
+ if (fullMap.assign(part, tier, node, false, pendingMap)) {
+ it.remove();
+
+ assigned = true;
+ }
+
+ i = (i + 1) % topSnapshot.size();
+
+ if (assigned)
+ idx = i;
+ } while (i != idx);
+
+ if (!assigned) {
+ do {
+ ClusterNode node = topSnapshot.get(i);
+
+ if (fullMap.assign(part, tier, node, true, pendingMap)) {
+ it.remove();
+
+ assigned = true;
+ }
+
+ i = (i + 1) % topSnapshot.size();
+
+ if (assigned)
+ idx = i;
+ } while (i != idx);
+ }
+
+ if (!assigned)
+ throw new IllegalStateException("Failed to find assignable node for partition.");
+ }
+ }
+
+ /**
+ * Tries to balance assignments between existing nodes in topology.
+ *
+ * @param tier Tier to assign.
+ * @param pendingParts Pending partitions per tier.
+ * @param fullMap Full assignment map to modify.
+ * @param topSnapshot Topology snapshot.
+ */
+ private void balance(int tier, Map<Integer, Queue<Integer>> pendingParts, FullAssignmentMap fullMap,
+ Collection<ClusterNode> topSnapshot) {
+ int idealPartCnt = parts / topSnapshot.size();
+
+ Map<UUID, PartitionSet> mapping = fullMap.tierMapping(tier);
+
+ PrioritizedPartitionMap underloadedNodes = filterNodes(mapping, idealPartCnt, false);
+ PrioritizedPartitionMap overloadedNodes = filterNodes(mapping, idealPartCnt, true);
+
+ do {
+ boolean retry = false;
+
+ for (PartitionSet overloaded : overloadedNodes.assignments()) {
+ for (Integer part : overloaded.partitions()) {
+ boolean assigned = false;
+
+ for (PartitionSet underloaded : underloadedNodes.assignments()) {
+ if (fullMap.assign(part, tier, underloaded.node(), false, pendingParts)) {
+ // Size of partition sets has changed.
+ if (overloaded.size() <= idealPartCnt)
+ overloadedNodes.remove(overloaded.nodeId());
+ else
+ overloadedNodes.update();
+
+ if (underloaded.size() >= idealPartCnt)
+ underloadedNodes.remove(underloaded.nodeId());
+ else
+ underloadedNodes.update();
+
+ assigned = true;
+
+ retry = true;
+
+ break;
+ }
+ }
+
+ if (!assigned) {
+ for (PartitionSet underloaded : underloadedNodes.assignments()) {
+ if (fullMap.assign(part, tier, underloaded.node(), true, pendingParts)) {
+ // Size of partition sets has changed.
+ if (overloaded.size() <= idealPartCnt)
+ overloadedNodes.remove(overloaded.nodeId());
+ else
+ overloadedNodes.update();
+
+ if (underloaded.size() >= idealPartCnt)
+ underloadedNodes.remove(underloaded.nodeId());
+ else
+ underloadedNodes.update();
+
+ retry = true;
+
+ break;
+ }
+ }
+ }
+
+ if (retry)
+ break; // for part.
+ }
+
+ if (retry)
+ break; // for overloaded.
+ }
+
+ if (!retry)
+ break;
+ }
+ while (true);
+ }
+
+ /**
+ * Constructs underloaded or overloaded partition map.
+ *
+ * @param mapping Mapping to filter.
+ * @param idealPartCnt Ideal number of partitions per node.
+ * @param overloaded {@code True} if should create overloaded map, {@code false} for underloaded.
+ * @return Prioritized partition map.
+ */
+ private PrioritizedPartitionMap filterNodes(Map<UUID, PartitionSet> mapping, int idealPartCnt, boolean overloaded) {
+ assert mapping != null;
+
+ PrioritizedPartitionMap res = new PrioritizedPartitionMap(overloaded ? DESC_CMP : ASC_CMP);
+
+ for (PartitionSet set : mapping.values()) {
+ if ((overloaded && set.size() > idealPartCnt) || (!overloaded && set.size() < idealPartCnt))
+ res.add(set);
+ }
+
+ return res;
+ }
+
+ /**
+ * Creates copy of previous partition assignment.
+ *
+ * @param ctx Affinity function context.
+ * @param topSnapshot Topology snapshot.
+ * @return Assignment copy and per node partition map.
+ */
+ private IgniteBiTuple<List<List<ClusterNode>>, Map<UUID, PartitionSet>> createCopy(
+ CacheAffinityFunctionContext ctx, Iterable<ClusterNode> topSnapshot) {
+ IgniteDiscoveryEvent discoEvt = ctx.discoveryEvent();
+
+ UUID leftNodeId = discoEvt.type() == IgniteEventType.EVT_NODE_JOINED ? null : discoEvt.eventNode().id();
+
+ List<List<ClusterNode>> cp = new ArrayList<>(parts);
+
+ Map<UUID, PartitionSet> parts = new HashMap<>();
+
+ for (int part = 0; part < this.parts; part++) {
+ List<ClusterNode> partNodes = ctx.previousAssignment(part);
+
+ List<ClusterNode> partNodesCp = new ArrayList<>(partNodes.size());
+
+ for (ClusterNode affNode : partNodes) {
+ if (!affNode.id().equals(leftNodeId)) {
+ partNodesCp.add(affNode);
+
+ PartitionSet partSet = parts.get(affNode.id());
+
+ if (partSet == null) {
+ partSet = new PartitionSet(affNode);
+
+ parts.put(affNode.id(), partSet);
+ }
+
+ partSet.add(part);
+ }
+ }
+
+ cp.add(partNodesCp);
+ }
+
+ if (leftNodeId == null) {
+ // Node joined, find it and add empty set to mapping.
+ ClusterNode joinedNode = null;
+
+ for (ClusterNode node : topSnapshot) {
+ if (node.id().equals(discoEvt.eventNode().id())) {
+ joinedNode = node;
+
+ break;
+ }
+ }
+
+ assert joinedNode != null;
+
+ parts.put(joinedNode.id(), new PartitionSet(joinedNode));
+ }
+
+ return F.t(cp, parts);
+ }
+
+ /**
+ *
+ */
+ private static class PartitionSetComparator implements Comparator<PartitionSet>, Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private boolean descending;
+
+ /**
+ * @param descending {@code True} if comparator should be descending.
+ */
+ private PartitionSetComparator(boolean descending) {
+ this.descending = descending;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compare(PartitionSet o1, PartitionSet o2) {
+ int res = o1.parts.size() < o2.parts.size() ? -1 : o1.parts.size() > o2.parts.size() ? 1 : 0;
+
+ return descending ? -res : res;
+ }
+ }
+
+ /**
+ * Prioritized partition map. Ordered structure in which nodes are ordered in ascending or descending order
+ * by number of partitions assigned to a node.
+ */
+ private static class PrioritizedPartitionMap {
+ /** Comparator. */
+ private Comparator<PartitionSet> cmp;
+
+ /** Assignment map. */
+ private Map<UUID, PartitionSet> assignmentMap = new HashMap<>();
+
+ /** Assignment list, ordered according to comparator. */
+ private List<PartitionSet> assignmentList = new ArrayList<>();
+
+ /**
+ * @param cmp Comparator.
+ */
+ private PrioritizedPartitionMap(Comparator<PartitionSet> cmp) {
+ this.cmp = cmp;
+ }
+
+ /**
+ * @param set Partition set to add.
+ */
+ public void add(PartitionSet set) {
+ PartitionSet old = assignmentMap.put(set.nodeId(), set);
+
+ if (old == null) {
+ assignmentList.add(set);
+
+ update();
+ }
+ }
+
+ /**
+ * Sorts assignment list.
+ */
+ public void update() {
+ Collections.sort(assignmentList, cmp);
+ }
+
+ /**
+ * @return Sorted assignment list.
+ */
+ public List<PartitionSet> assignments() {
+ return assignmentList;
+ }
+
+ public void remove(UUID uuid) {
+ PartitionSet rmv = assignmentMap.remove(uuid);
+
+ assignmentList.remove(rmv);
+ }
+
+ public boolean isEmpty() {
+ return assignmentList.isEmpty();
+ }
+ }
+
+ /**
+ * Constructs assignment map for specified tier.
+ *
+ * @param tier Tier number, -1 for all tiers altogether.
+ * @param assignment Assignment to construct map from.
+ * @param topSnapshot Topology snapshot.
+ * @return Assignment map.
+ */
+ private static Map<UUID, PartitionSet> assignments(int tier, List<List<ClusterNode>> assignment,
+ Collection<ClusterNode> topSnapshot) {
+ Map<UUID, PartitionSet> tmp = new LinkedHashMap<>();
+
+ for (int part = 0; part < assignment.size(); part++) {
+ List<ClusterNode> nodes = assignment.get(part);
+
+ assert nodes instanceof RandomAccess;
+
+ if (nodes.size() <= tier)
+ continue;
+
+ int start = tier < 0 ? 0 : tier;
+ int end = tier < 0 ? nodes.size() : tier + 1;
+
+ for (int i = start; i < end; i++) {
+ ClusterNode n = nodes.get(i);
+
+ PartitionSet set = tmp.get(n.id());
+
+ if (set == null) {
+ set = new PartitionSet(n);
+
+ tmp.put(n.id(), set);
+ }
+
+ set.add(part);
+ }
+ }
+
+ if (tmp.size() < topSnapshot.size()) {
+ for (ClusterNode node : topSnapshot) {
+ if (!tmp.containsKey(node.id()))
+ tmp.put(node.id(), new PartitionSet(node));
+ }
+ }
+
+ return tmp;
+ }
+
+ /**
+ * Full assignment map. Auxiliary data structure which maintains resulting assignment and temporary
+ * maps consistent.
+ */
+ @SuppressWarnings("unchecked")
+ private static class FullAssignmentMap {
+ /** Per-tier assignment maps. */
+ private Map<UUID, PartitionSet>[] tierMaps;
+
+ /** Full assignment map. */
+ private Map<UUID, PartitionSet> fullMap;
+
+ /** Resulting assignment. */
+ private List<List<ClusterNode>> assignments;
+
+ /**
+ * @param tiers Number of tiers.
+ * @param assignments Assignments to modify.
+ * @param topSnapshot Topology snapshot.
+ */
+ private FullAssignmentMap(int tiers, List<List<ClusterNode>> assignments, Collection<ClusterNode> topSnapshot) {
+ this.assignments = assignments;
+
+ tierMaps = new Map[tiers];
+
+ for (int tier = 0; tier < tiers; tier++)
+ tierMaps[tier] = assignments(tier, assignments, topSnapshot);
+
+ fullMap = assignments(-1, assignments, topSnapshot);
+ }
+
+ /**
+ * Tries to assign partition to given node on specified tier. If force is false, assignment will succeed
+ * only if this partition is not already assigned to a node. If force is true, then assignment will succeed
+ * only if partition is not assigned to a tier with number less than passed in. Assigned partition from
+ * greater tier will be moved to pending queue.
+ *
+ * @param part Partition to assign.
+ * @param tier Tier number to assign.
+ * @param node Node to move partition to.
+ * @param force Force flag.
+ * @param pendingParts per tier pending partitions map.
+ * @return {@code True} if assignment succeeded.
+ */
+ boolean assign(int part, int tier, ClusterNode node, boolean force, Map<Integer, Queue<Integer>> pendingParts) {
+ UUID nodeId = node.id();
+
+ if (!fullMap.get(nodeId).contains(part)) {
+ tierMaps[tier].get(nodeId).add(part);
+
+ fullMap.get(nodeId).add(part);
+
+ List<ClusterNode> assignment = assignments.get(part);
+
+ if (assignment.size() <= tier)
+ assignment.add(node);
+ else {
+ ClusterNode oldNode = assignment.set(tier, node);
+
+ if (oldNode != null) {
+ UUID oldNodeId = oldNode.id();
+
+ tierMaps[tier].get(oldNodeId).remove(part);
+ fullMap.get(oldNodeId).remove(part);
+ }
+ }
+
+ return true;
+ }
+ else if (force) {
+ assert !tierMaps[tier].get(nodeId).contains(part);
+
+ // Check previous tiers first.
+ for (int t = 0; t < tier; t++) {
+ if (tierMaps[t].get(nodeId).contains(part))
+ return false;
+ }
+
+ // Partition is on some lower tier, switch it.
+ for (int t = tier + 1; t < tierMaps.length; t++) {
+ if (tierMaps[t].get(nodeId).contains(part)) {
+ ClusterNode oldNode = assignments.get(part).get(tier);
+
+ // Move partition from level t to tier.
+ assignments.get(part).set(tier, node);
+ assignments.get(part).set(t, null);
+
+ if (oldNode != null) {
+ tierMaps[tier].get(oldNode.id()).remove(part);
+ fullMap.get(oldNode.id()).remove(part);
+ }
+
+ tierMaps[tier].get(nodeId).add(part);
+ tierMaps[t].get(nodeId).remove(part);
+
+ Queue<Integer> pending = pendingParts.get(t);
+
+ if (pending == null) {
+ pending = new LinkedList<>();
+
+ pendingParts.put(t, pending);
+ }
+
+ pending.add(part);
+
+ return true;
+ }
+ }
+
+ throw new IllegalStateException("Unable to assign partition to node while force is true.");
+ }
+
+ // !force.
+ return false;
+ }
+
+ /**
+ * Gets tier mapping.
+ *
+ * @param tier Tier to get mapping.
+ * @return Per node map.
+ */
+ public Map<UUID, PartitionSet> tierMapping(int tier) {
+ return tierMaps[tier];
+ }
+ }
+
+ /**
+ * Applies a supplemental hash function to a given hashCode, which
+ * defends against poor quality hash functions.
+ *
+ * @param h Hash code.
+ * @return Enhanced hash code.
+ */
+ private static int hash(int h) {
+ // Spread bits to regularize both segment and index locations,
+ // using variant of single-word Wang/Jenkins hash.
+ h += (h << 15) ^ 0xffffcd7d;
+ h ^= (h >>> 10);
+ h += (h << 3);
+ h ^= (h >>> 6);
+ h += (h << 2) + (h << 14);
+ return h ^ (h >>> 16);
+ }
+
+ @SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
+ private static class PartitionSet {
+ /** */
+ private ClusterNode node;
+
+ /** Partitions. */
+ private Collection<Integer> parts = new LinkedList<>();
+
+ /**
+ * @param node Node.
+ */
+ private PartitionSet(ClusterNode node) {
+ this.node = node;
+ }
+
+ /**
+ * @return Node.
+ */
+ private ClusterNode node() {
+ return node;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ private UUID nodeId() {
+ return node.id();
+ }
+
+ /**
+ * @return Partition set size.
+ */
+ private int size() {
+ return parts.size();
+ }
+
+ /**
+ * Adds partition to partition set.
+ *
+ * @param part Partition to add.
+ * @return {@code True} if partition was added, {@code false} if partition already exists.
+ */
+ private boolean add(int part) {
+ if (!parts.contains(part)) {
+ parts.add(part);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param part Partition to remove.
+ */
+ private void remove(Integer part) {
+ parts.remove(part); // Remove object, not index.
+ }
+
+ /**
+ * @return Partitions.
+ */
+ @SuppressWarnings("TypeMayBeWeakened")
+ private Collection<Integer> partitions() {
+ return parts;
+ }
+
+ /**
+ * Checks if partition set contains given partition.
+ *
+ * @param part Partition to check.
+ * @return {@code True} if partition set contains given partition.
+ */
+ private boolean contains(int part) {
+ return parts.contains(part);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "PartSet [nodeId=" + node.id() + ", size=" + parts.size() + ", parts=" + parts + ']';
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/package.html b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/package.html
new file mode 100644
index 0000000..f71f3c2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/package.html
@@ -0,0 +1,24 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<body>
+ <!-- Package description. -->
+ Contains fair cache affinity for partitioned cache.
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/package.html b/modules/core/src/main/java/org/apache/ignite/cache/affinity/package.html
new file mode 100644
index 0000000..defee90
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/package.html
@@ -0,0 +1,24 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<body>
+ <!-- Package description. -->
+ Contains cache node affinity implementations.
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/CacheRendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/CacheRendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/CacheRendezvousAffinityFunction.java
new file mode 100644
index 0000000..1a7a6ff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/CacheRendezvousAffinityFunction.java
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.gridgain.grid.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.security.*;
+import java.util.*;
+
+/**
+ * Affinity function for partitioned cache based on Highest Random Weight algorithm.
+ * This function supports the following configuration:
+ * <ul>
+ * <li>
+ * {@code partitions} - Number of partitions to spread across nodes.
+ * </li>
+ * <li>
+ * {@code excludeNeighbors} - If set to {@code true}, will exclude same-host-neighbors
+ * from being backups of each other. Note that {@code backupFilter} is ignored if
+ * {@code excludeNeighbors} is set to {@code true}.
+ * </li>
+ * <li>
+ * {@code backupFilter} - Optional filter for back up nodes. If provided, then only
+ * nodes that pass this filter will be selected as backup nodes. If not provided, then
+ * primary and backup nodes will be selected out of all nodes available for this cache.
+ * </li>
+ * </ul>
+ * <p>
+ * Cache affinity can be configured for individual caches via {@link GridCacheConfiguration#getAffinity()} method.
+ */
+public class CacheRendezvousAffinityFunction implements CacheAffinityFunction, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Default number of partitions. */
+ public static final int DFLT_PARTITION_COUNT = 10000;
+
+ /** Comparator. */
+ private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR =
+ new HashComparator();
+
+ /** Thread local message digest. */
+ private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() {
+ @Override protected MessageDigest initialValue() {
+ try {
+ return MessageDigest.getInstance("MD5");
+ }
+ catch (NoSuchAlgorithmException e) {
+ assert false : "Should have failed in constructor";
+
+ throw new IgniteException("Failed to obtain message digest (digest was available in constructor)",
+ e);
+ }
+ }
+ };
+
+ /** Number of partitions. */
+ private int parts;
+
+ /** Exclude neighbors flag. */
+ private boolean exclNeighbors;
+
+ /** Optional backup filter. First node is primary, second node is a node being tested. */
+ private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter;
+
+ /** Hash ID resolver. */
+ private CacheAffinityNodeHashResolver hashIdRslvr = new CacheAffinityNodeAddressHashResolver();
+
+ /** Marshaller. */
+ private IgniteMarshaller marshaller = new IgniteOptimizedMarshaller(false);
+
+ /**
+ * Empty constructor with all defaults.
+ */
+ public CacheRendezvousAffinityFunction() {
+ this(false);
+ }
+
+ /**
+ * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other
+ * and specified number of backups.
+ * <p>
+ * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+ *
+ * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
+ * of each other.
+ */
+ public CacheRendezvousAffinityFunction(boolean exclNeighbors) {
+ this(exclNeighbors, DFLT_PARTITION_COUNT);
+ }
+
+ /**
+ * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other,
+ * and specified number of backups and partitions.
+ * <p>
+ * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+ *
+ * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
+ * of each other.
+ * @param parts Total number of partitions.
+ */
+ public CacheRendezvousAffinityFunction(boolean exclNeighbors, int parts) {
+ this(exclNeighbors, parts, null);
+ }
+
+ /**
+ * Initializes optional counts for replicas and backups.
+ * <p>
+ * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+ *
+ * @param parts Total number of partitions.
+ * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected
+ * from all nodes that pass this filter. First argument for this filter is primary node, and second
+ * argument is node being tested.
+ * <p>
+ * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+ */
+ public CacheRendezvousAffinityFunction(int parts,
+ @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+ this(false, parts, backupFilter);
+ }
+
+ /**
+ * Private constructor.
+ *
+ * @param exclNeighbors Exclude neighbors flag.
+ * @param parts Partitions count.
+ * @param backupFilter Backup filter.
+ */
+ private CacheRendezvousAffinityFunction(boolean exclNeighbors, int parts,
+ IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+ A.ensure(parts != 0, "parts != 0");
+
+ this.exclNeighbors = exclNeighbors;
+ this.parts = parts;
+ this.backupFilter = backupFilter;
+
+ try {
+ MessageDigest.getInstance("MD5");
+ }
+ catch (NoSuchAlgorithmException e) {
+ throw new IgniteException("Failed to obtain MD5 message digest instance.", e);
+ }
+ }
+
+ /**
+ * Gets total number of key partitions. To ensure that all partitions are
+ * equally distributed across all nodes, please make sure that this
+ * number is significantly larger than a number of nodes. Also, partition
+ * size should be relatively small. Try to avoid having partitions with more
+ * than quarter million keys.
+ * <p>
+ * Note that for fully replicated caches this method should always
+ * return {@code 1}.
+ *
+ * @return Total partition count.
+ */
+ public int getPartitions() {
+ return parts;
+ }
+
+ /**
+ * Sets total number of partitions.
+ *
+ * @param parts Total number of partitions.
+ */
+ public void setPartitions(int parts) {
+ this.parts = parts;
+ }
+
+ /**
+ * Gets hash ID resolver for nodes. This resolver is used to provide
+ * alternate hash ID, other than node ID.
+ * <p>
+ * Node IDs constantly change when nodes get restarted, which causes them to
+ * be placed on different locations in the hash ring, and hence causing
+ * repartitioning. Providing an alternate hash ID, which survives node restarts,
+ * puts node on the same location on the hash ring, hence minimizing required
+ * repartitioning.
+ *
+ * @return Hash ID resolver.
+ */
+ public CacheAffinityNodeHashResolver getHashIdResolver() {
+ return hashIdRslvr;
+ }
+
+ /**
+ * Sets hash ID resolver for nodes. This resolver is used to provide
+ * alternate hash ID, other than node ID.
+ * <p>
+ * Node IDs constantly change when nodes get restarted, which causes them to
+ * be placed on different locations in the hash ring, and hence causing
+ * repartitioning. Providing an alternate hash ID, which survives node restarts,
+ * puts node on the same location on the hash ring, hence minimizing required
+ * repartitioning.
+ *
+ * @param hashIdRslvr Hash ID resolver.
+ */
+ public void setHashIdResolver(CacheAffinityNodeHashResolver hashIdRslvr) {
+ this.hashIdRslvr = hashIdRslvr;
+ }
+
+ /**
+ * Gets optional backup filter. If not {@code null}, backups will be selected
+ * from all nodes that pass this filter. First node passed to this filter is primary node,
+ * and second node is a node being tested.
+ * <p>
+ * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+ *
+ * @return Optional backup filter.
+ */
+ @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() {
+ return backupFilter;
+ }
+
+ /**
+ * Sets optional backup filter. If provided, then backups will be selected from all
+ * nodes that pass this filter. First node being passed to this filter is primary node,
+ * and second node is a node being tested.
+ * <p>
+ * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+ *
+ * @param backupFilter Optional backup filter.
+ */
+ public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+ this.backupFilter = backupFilter;
+ }
+
+ /**
+ * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
+ * <p>
+ * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+ *
+ * @return {@code True} if nodes residing on the same host may not act as backups of each other.
+ */
+ public boolean isExcludeNeighbors() {
+ return exclNeighbors;
+ }
+
+ /**
+ * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
+ * <p>
+ * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+ *
+ * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other.
+ */
+ public void setExcludeNeighbors(boolean exclNeighbors) {
+ this.exclNeighbors = exclNeighbors;
+ }
+
+ /**
+ * Returns collection of nodes (primary first) for specified partition.
+ */
+ public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int backups,
+ @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
+ if (nodes.size() <= 1)
+ return nodes;
+
+ List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>();
+
+ MessageDigest d = digest.get();
+
+ for (ClusterNode node : nodes) {
+ Object nodeHash = hashIdRslvr.resolve(node);
+
+ try {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ byte[] nodeHashBytes = marshaller.marshal(nodeHash);
+
+ out.write(nodeHashBytes, 0, nodeHashBytes.length); // Avoid IOException.
+ out.write(U.intToBytes(part), 0, 4); // Avoid IOException.
+
+ d.reset();
+
+ byte[] bytes = d.digest(out.toByteArray());
+
+ long hash =
+ (bytes[0] & 0xFFL)
+ | ((bytes[1] & 0xFFL) << 8)
+ | ((bytes[2] & 0xFFL) << 16)
+ | ((bytes[3] & 0xFFL) << 24)
+ | ((bytes[4] & 0xFFL) << 32)
+ | ((bytes[5] & 0xFFL) << 40)
+ | ((bytes[6] & 0xFFL) << 48)
+ | ((bytes[7] & 0xFFL) << 56);
+
+ lst.add(F.t(hash, node));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ Collections.sort(lst, COMPARATOR);
+
+ int primaryAndBackups;
+
+ List<ClusterNode> res;
+
+ if (backups == Integer.MAX_VALUE) {
+ primaryAndBackups = Integer.MAX_VALUE;
+
+ res = new ArrayList<>();
+ }
+ else {
+ primaryAndBackups = backups + 1;
+
+ res = new ArrayList<>(primaryAndBackups);
+ }
+
+ ClusterNode primary = lst.get(0).get2();
+
+ res.add(primary);
+
+ // Select backups.
+ if (backups > 0) {
+ for (int i = 1; i < lst.size(); i++) {
+ IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
+
+ ClusterNode node = next.get2();
+
+ if (exclNeighbors) {
+ Collection<ClusterNode> allNeighbors = allNeighbors(neighborhoodCache, res);
+
+ if (!allNeighbors.contains(node))
+ res.add(node);
+ }
+ else {
+ if (!res.contains(node) && (backupFilter == null || backupFilter.apply(primary, node)))
+ res.add(next.get2());
+ }
+
+ if (res.size() == primaryAndBackups)
+ break;
+ }
+ }
+
+ if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) {
+ // Need to iterate one more time in case if there are no nodes which pass exclude backups criteria.
+ for (int i = 1; i < lst.size(); i++) {
+ IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
+
+ ClusterNode node = next.get2();
+
+ if (!res.contains(node))
+ res.add(next.get2());
+
+ if (res.size() == primaryAndBackups)
+ break;
+ }
+ }
+
+ assert res.size() <= primaryAndBackups;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitions() {
+ return parts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key) {
+ return U.safeAbs(key.hashCode() % parts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> assignPartitions(CacheAffinityFunctionContext affCtx) {
+ List<List<ClusterNode>> assignments = new ArrayList<>(parts);
+
+ Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
+ neighbors(affCtx.currentTopologySnapshot()) : null;
+
+ for (int i = 0; i < parts; i++) {
+ List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(),
+ neighborhoodCache);
+
+ assignments.add(partAssignment);
+ }
+
+ return assignments;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeNode(UUID nodeId) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(parts);
+ out.writeBoolean(exclNeighbors);
+ out.writeObject(hashIdRslvr);
+ out.writeObject(backupFilter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ parts = in.readInt();
+ exclNeighbors = in.readBoolean();
+ hashIdRslvr = (CacheAffinityNodeHashResolver)in.readObject();
+ backupFilter = (IgniteBiPredicate<ClusterNode, ClusterNode>)in.readObject();
+ }
+
+ /**
+ * Builds neighborhood map for all nodes in snapshot.
+ *
+ * @param topSnapshot Topology snapshot.
+ * @return Neighbors map.
+ */
+ private Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) {
+ Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f);
+
+ // Group by mac addresses.
+ for (ClusterNode node : topSnapshot) {
+ String macs = node.attribute(GridNodeAttributes.ATTR_MACS);
+
+ Collection<ClusterNode> nodes = macMap.get(macs);
+
+ if (nodes == null) {
+ nodes = new HashSet<>();
+
+ macMap.put(macs, nodes);
+ }
+
+ nodes.add(node);
+ }
+
+ Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f);
+
+ for (Collection<ClusterNode> group : macMap.values()) {
+ for (ClusterNode node : group)
+ neighbors.put(node.id(), group);
+ }
+
+ return neighbors;
+ }
+
+ /**
+ * @param neighborhoodCache Neighborhood cache.
+ * @param nodes Nodes.
+ * @return All neighbors for given nodes.
+ */
+ private Collection<ClusterNode> allNeighbors(Map<UUID, Collection<ClusterNode>> neighborhoodCache,
+ Iterable<ClusterNode> nodes) {
+ Collection<ClusterNode> res = new HashSet<>();
+
+ for (ClusterNode node : nodes) {
+ if (!res.contains(node))
+ res.addAll(neighborhoodCache.get(node.id()));
+ }
+
+ return res;
+ }
+
+ /**
+ *
+ */
+ private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) {
+ return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 :
+ o1.get2().id().compareTo(o2.get2().id());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/package.html b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/package.html
new file mode 100644
index 0000000..780cabc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/package.html
@@ -0,0 +1,23 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<body>
+<!-- Package description. -->
+Contains HRW-based cache affinity for partitioned cache.
+</body>
+</html>