You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/01/16 16:43:11 UTC
[08/12] incubator-ignite git commit: #IGNITE-99: Refactoring. Move
GridCache.affinity() to Ignite.affinity(String cacheName).
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java
index 10e4fb5..cd1c9ef 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java
@@ -22,7 +22,7 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.events.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.portables.*;
-import org.gridgain.grid.cache.affinity.*;
+import org.apache.ignite.cache.affinity.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.util.future.*;
@@ -50,13 +50,13 @@ public class GridAffinityAssignmentCache {
private int backups;
/** Affinity function. */
- private final GridCacheAffinityFunction aff;
+ private final CacheAffinityFunction aff;
/** Partitions count. */
private final int partsCnt;
/** Affinity mapper function. */
- private final GridCacheAffinityKeyMapper affMapper;
+ private final CacheAffinityKeyMapper affMapper;
/** Affinity calculation results cache: topology version => partition => nodes. */
private final ConcurrentMap<Long, GridAffinityAssignment> affCache;
@@ -82,8 +82,8 @@ public class GridAffinityAssignmentCache {
* @param affMapper Affinity key mapper.
*/
@SuppressWarnings("unchecked")
- public GridAffinityAssignmentCache(GridCacheContext ctx, String cacheName, GridCacheAffinityFunction aff,
- GridCacheAffinityKeyMapper affMapper, int backups) {
+ public GridAffinityAssignmentCache(GridCacheContext ctx, String cacheName, CacheAffinityFunction aff,
+ CacheAffinityKeyMapper affMapper, int backups) {
this.ctx = ctx;
this.aff = aff;
this.affMapper = affMapper;
@@ -144,7 +144,7 @@ public class GridAffinityAssignmentCache {
List<List<ClusterNode>> prevAssignment = prev == null ? null : prev.assignment();
List<List<ClusterNode>> assignment = aff.assignPartitions(
- new GridCacheAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt, topVer, backups));
+ new CacheAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt, topVer, backups));
GridAffinityAssignment updated = new GridAffinityAssignment(topVer, assignment);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java
index c66176d..d6b7633 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java
@@ -22,7 +22,7 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.events.*;
import org.apache.ignite.lang.*;
import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.affinity.*;
+import org.apache.ignite.cache.affinity.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.managers.eventstorage.*;
import org.gridgain.grid.kernal.processors.*;
@@ -364,7 +364,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
}
/**
- * Requests {@link GridCacheAffinityFunction} and {@link GridCacheAffinityKeyMapper} from remote node.
+ * Requests {@link org.apache.ignite.cache.affinity.CacheAffinityFunction} and {@link org.apache.ignite.cache.affinity.CacheAffinityKeyMapper} from remote node.
*
* @param cacheName Name of cache on which affinity is requested.
* @param n Node from which affinity is requested.
@@ -376,8 +376,8 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> t = ctx.closure()
.callAsyncNoFailover(BALANCE, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/).get();
- GridCacheAffinityFunction f = (GridCacheAffinityFunction)unmarshall(ctx, n.id(), t.get1());
- GridCacheAffinityKeyMapper m = (GridCacheAffinityKeyMapper)unmarshall(ctx, n.id(), t.get2());
+ CacheAffinityFunction f = (CacheAffinityFunction)unmarshall(ctx, n.id(), t.get1());
+ CacheAffinityKeyMapper m = (CacheAffinityKeyMapper)unmarshall(ctx, n.id(), t.get2());
assert m != null;
@@ -458,10 +458,10 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
*/
private static class AffinityInfo {
/** Affinity function. */
- private GridCacheAffinityFunction affFunc;
+ private CacheAffinityFunction affFunc;
/** Mapper */
- private GridCacheAffinityKeyMapper mapper;
+ private CacheAffinityKeyMapper mapper;
/** Assignment. */
private GridAffinityAssignment assignment;
@@ -475,7 +475,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
* @param assignment Partition assignment.
* @param portableEnabled Portable enabled flag.
*/
- private AffinityInfo(GridCacheAffinityFunction affFunc, GridCacheAffinityKeyMapper mapper,
+ private AffinityInfo(CacheAffinityFunction affFunc, CacheAffinityKeyMapper mapper,
GridAffinityAssignment assignment, boolean portableEnabled) {
this.affFunc = affFunc;
this.mapper = mapper;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityUtils.java
index 1a3f0ba..4594bba 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityUtils.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityUtils.java
@@ -20,7 +20,6 @@ package org.gridgain.grid.kernal.processors.affinity;
import org.apache.ignite.*;
import org.apache.ignite.resources.*;
import org.gridgain.grid.*;
-import org.gridgain.grid.cache.affinity.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.managers.deployment.*;
import org.gridgain.grid.kernal.processors.cache.*;
@@ -39,11 +38,11 @@ import java.util.concurrent.*;
*/
class GridAffinityUtils {
/**
- * Creates a job that will look up {@link GridCacheAffinityKeyMapper} and {@link GridCacheAffinityFunction} on a
+ * Creates a job that will look up {@link org.apache.ignite.cache.affinity.CacheAffinityKeyMapper} and {@link org.apache.ignite.cache.affinity.CacheAffinityFunction} on a
* cache with given name. If they exist, this job will serialize and transfer them together with all deployment
* information needed to unmarshal objects on remote node. Result is returned as a {@link GridTuple3},
- * where first object is {@link GridAffinityMessage} for {@link GridCacheAffinityFunction}, second object
- * is {@link GridAffinityMessage} for {@link GridCacheAffinityKeyMapper} and third object is affinity assignment
+ * where first object is {@link GridAffinityMessage} for {@link org.apache.ignite.cache.affinity.CacheAffinityFunction}, second object
+ * is {@link GridAffinityMessage} for {@link org.apache.ignite.cache.affinity.CacheAffinityKeyMapper} and third object is affinity assignment
* for given topology version.
*
* @param cacheName Cache name.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridCacheAffinityFunctionContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridCacheAffinityFunctionContextImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridCacheAffinityFunctionContextImpl.java
deleted file mode 100644
index 308e822..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridCacheAffinityFunctionContextImpl.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.affinity;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.*;
-import org.gridgain.grid.cache.affinity.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Cache affinity function context implementation. Simple bean that holds all required fields.
- */
-public class GridCacheAffinityFunctionContextImpl implements GridCacheAffinityFunctionContext {
- /** Topology snapshot. */
- private List<ClusterNode> topSnapshot;
-
- /** Previous affinity assignment. */
- private List<List<ClusterNode>> prevAssignment;
-
- /** Discovery event that caused this topology change. */
- private IgniteDiscoveryEvent discoEvt;
-
- /** Topology version. */
- private long topVer;
-
- /** Number of backups to assign. */
- private int backups;
-
- /**
- * @param topSnapshot Topology snapshot.
- * @param topVer Topology version.
- */
- public GridCacheAffinityFunctionContextImpl(List<ClusterNode> topSnapshot, List<List<ClusterNode>> prevAssignment,
- IgniteDiscoveryEvent discoEvt, long topVer, int backups) {
- this.topSnapshot = topSnapshot;
- this.prevAssignment = prevAssignment;
- this.discoEvt = discoEvt;
- this.topVer = topVer;
- this.backups = backups;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public List<ClusterNode> previousAssignment(int part) {
- return prevAssignment.get(part);
- }
-
- /** {@inheritDoc} */
- @Override public List<ClusterNode> currentTopologySnapshot() {
- return topSnapshot;
- }
-
- /** {@inheritDoc} */
- @Override public long currentTopologyVersion() {
- return topVer;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public IgniteDiscoveryEvent discoveryEvent() {
- return discoEvt;
- }
-
- /** {@inheritDoc} */
- @Override public int backups() {
- return backups;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheDefaultAffinityKeyMapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheDefaultAffinityKeyMapper.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheDefaultAffinityKeyMapper.java
new file mode 100644
index 0000000..93fb87c
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheDefaultAffinityKeyMapper.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.portables.*;
+import org.apache.ignite.resources.*;
+import org.gridgain.grid.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.lang.annotation.*;
+import java.lang.reflect.*;
+
+/**
+ * Default key affinity mapper. If key class has annotation {@link org.apache.ignite.cache.affinity.CacheAffinityKeyMapped},
+ * then the value of annotated method or field will be used to get affinity value instead
+ * of the key itself. If there is no annotation, then the key is used as is.
+ * <p>
+ * Convenience affinity key adapter, {@link org.apache.ignite.cache.affinity.CacheAffinityKey} can be used in
+ * conjunction with this mapper to automatically provide custom affinity keys for cache keys.
+ * <p>
+ * If non-default affinity mapper is used, is should be provided via
+ * {@link GridCacheConfiguration#getAffinityMapper()} configuration property.
+ */
+public class CacheDefaultAffinityKeyMapper implements CacheAffinityKeyMapper {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Reflection cache. */
+ private GridReflectionCache reflectCache = new GridReflectionCache(
+ new P1<Field>() {
+ @Override public boolean apply(Field f) {
+ // Account for anonymous inner classes.
+ return f.getAnnotation(CacheAffinityKeyMapped.class) != null;
+ }
+ },
+ new P1<Method>() {
+ @Override public boolean apply(Method m) {
+ // Account for anonymous inner classes.
+ Annotation ann = m.getAnnotation(CacheAffinityKeyMapped.class);
+
+ if (ann != null) {
+ if (!F.isEmpty(m.getParameterTypes()))
+ throw new IllegalStateException("Method annotated with @GridCacheAffinityKey annotation " +
+ "cannot have parameters: " + m);
+
+ return true;
+ }
+
+ return false;
+ }
+ }
+ );
+
+ /** Logger. */
+ @IgniteLoggerResource
+ private transient IgniteLogger log;
+
+ /**
+ * If key class has annotation {@link org.apache.ignite.cache.affinity.CacheAffinityKeyMapped},
+ * then the value of annotated method or field will be used to get affinity value instead
+ * of the key itself. If there is no annotation, then the key is returned as is.
+ *
+ * @param key Key to get affinity key for.
+ * @return Affinity key for given key.
+ */
+ @Override public Object affinityKey(Object key) {
+ GridArgumentCheck.notNull(key, "key");
+
+ if (key instanceof PortableObject) {
+ PortableObject po = (PortableObject)key;
+
+ try {
+ PortableMetadata meta = po.metaData();
+
+ if (meta != null) {
+ String affKeyFieldName = meta.affinityKeyFieldName();
+
+ if (affKeyFieldName != null)
+ return po.field(affKeyFieldName);
+ }
+ }
+ catch (PortableException e) {
+ U.error(log, "Failed to get affinity field from portable object: " + key, e);
+ }
+ }
+ else {
+ try {
+ Object o = reflectCache.firstFieldValue(key);
+
+ if (o != null)
+ return o;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to access affinity field for key [field=" +
+ reflectCache.firstField(key.getClass()) + ", key=" + key + ']', e);
+ }
+
+ try {
+ Object o = reflectCache.firstMethodValue(key);
+
+ if (o != null)
+ return o;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to invoke affinity method for key [mtd=" +
+ reflectCache.firstMethod(key.getClass()) + ", key=" + key + ']', e);
+ }
+ }
+
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index 1a31edd..9b9fe9f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -29,7 +29,6 @@ import org.apache.ignite.resources.*;
import org.apache.ignite.transactions.*;
import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.affinity.*;
import org.gridgain.grid.cache.datastructures.*;
import org.gridgain.grid.cache.query.*;
import org.gridgain.grid.kernal.*;
@@ -158,7 +157,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
private GridCacheDataStructures dataStructures;
/** Affinity impl. */
- private GridCacheAffinity<K> aff;
+ private IgniteCacheAffinity<K> aff;
/** Whether this cache is GGFS data cache. */
private boolean ggfsDataCache;
@@ -265,7 +264,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
qry = new GridCacheQueriesImpl<>(ctx, null);
dataStructures = new GridCacheDataStructuresImpl<>(ctx);
- aff = new GridCacheAffinityImpl<>(ctx);
+ aff = new CacheAffinityImpl<>(ctx);
}
/**
@@ -349,8 +348,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
return qry;
}
- /** {@inheritDoc} */
- @Override public GridCacheAffinity<K> affinity() {
+ public IgniteCacheAffinity<K> affinityProxy() {
return aff;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java
index 84ac35a..4012e4f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAttributes.java
@@ -18,8 +18,8 @@
package org.gridgain.grid.kernal.processors.cache;
import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.affinity.*;
-import org.gridgain.grid.cache.affinity.consistenthash.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
@@ -185,11 +185,11 @@ public class GridCacheAttributes implements Externalizable {
affKeyBackups = cfg.getBackups();
- GridCacheAffinityFunction aff = cfg.getAffinity();
+ CacheAffinityFunction aff = cfg.getAffinity();
if (aff != null) {
- if (aff instanceof GridCacheConsistentHashAffinityFunction) {
- GridCacheConsistentHashAffinityFunction aff0 = (GridCacheConsistentHashAffinityFunction) aff;
+ if (aff instanceof CacheConsistentHashAffinityFunction) {
+ CacheConsistentHashAffinityFunction aff0 = (CacheConsistentHashAffinityFunction) aff;
affInclNeighbors = aff0.isExcludeNeighbors();
affReplicas = aff0.getDefaultReplicas();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDefaultAffinityKeyMapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDefaultAffinityKeyMapper.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDefaultAffinityKeyMapper.java
deleted file mode 100644
index efcb798..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheDefaultAffinityKeyMapper.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.portables.*;
-import org.apache.ignite.resources.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.affinity.*;
-import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-
-import java.lang.annotation.*;
-import java.lang.reflect.*;
-
-/**
- * Default key affinity mapper. If key class has annotation {@link GridCacheAffinityKeyMapped},
- * then the value of annotated method or field will be used to get affinity value instead
- * of the key itself. If there is no annotation, then the key is used as is.
- * <p>
- * Convenience affinity key adapter, {@link GridCacheAffinityKey} can be used in
- * conjunction with this mapper to automatically provide custom affinity keys for cache keys.
- * <p>
- * If non-default affinity mapper is used, is should be provided via
- * {@link GridCacheConfiguration#getAffinityMapper()} configuration property.
- */
-public class GridCacheDefaultAffinityKeyMapper implements GridCacheAffinityKeyMapper {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Reflection cache. */
- private GridReflectionCache reflectCache = new GridReflectionCache(
- new P1<Field>() {
- @Override public boolean apply(Field f) {
- // Account for anonymous inner classes.
- return f.getAnnotation(GridCacheAffinityKeyMapped.class) != null;
- }
- },
- new P1<Method>() {
- @Override public boolean apply(Method m) {
- // Account for anonymous inner classes.
- Annotation ann = m.getAnnotation(GridCacheAffinityKeyMapped.class);
-
- if (ann != null) {
- if (!F.isEmpty(m.getParameterTypes()))
- throw new IllegalStateException("Method annotated with @GridCacheAffinityKey annotation " +
- "cannot have parameters: " + m);
-
- return true;
- }
-
- return false;
- }
- }
- );
-
- /** Logger. */
- @IgniteLoggerResource
- private transient IgniteLogger log;
-
- /**
- * If key class has annotation {@link GridCacheAffinityKeyMapped},
- * then the value of annotated method or field will be used to get affinity value instead
- * of the key itself. If there is no annotation, then the key is returned as is.
- *
- * @param key Key to get affinity key for.
- * @return Affinity key for given key.
- */
- @Override public Object affinityKey(Object key) {
- GridArgumentCheck.notNull(key, "key");
-
- if (key instanceof PortableObject) {
- PortableObject po = (PortableObject)key;
-
- try {
- PortableMetadata meta = po.metaData();
-
- if (meta != null) {
- String affKeyFieldName = meta.affinityKeyFieldName();
-
- if (affKeyFieldName != null)
- return po.field(affKeyFieldName);
- }
- }
- catch (PortableException e) {
- U.error(log, "Failed to get affinity field from portable object: " + key, e);
- }
- }
- else {
- try {
- Object o = reflectCache.firstFieldValue(key);
-
- if (o != null)
- return o;
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to access affinity field for key [field=" +
- reflectCache.firstField(key.getClass()) + ", key=" + key + ']', e);
- }
-
- try {
- Object o = reflectCache.firstMethodValue(key);
-
- if (o != null)
- return o;
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to invoke affinity method for key [mtd=" +
- reflectCache.firstMethod(key.getClass()) + ", key=" + key + ']', e);
- }
- }
-
- return key;
- }
-
- /** {@inheritDoc} */
- @Override public void reset() {
- // No-op.
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java
index 8d1fbb2..d55b507 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryImpl.java
@@ -249,7 +249,7 @@ public class GridCacheEntryImpl<K, V> implements GridCacheEntry<K, V>, Externali
@Override public int partition() {
GridCacheEntryEx<K, V> e = unwrapNoCreate();
- return e == null ? ctx.cache().affinity().partition(key) : e.partition();
+ return e == null ? ctx.grid().affinity(ctx.cache().name()).partition(key) : e.partition();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java
index 950092e..536de58 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloaderAdapter.java
@@ -19,8 +19,7 @@ package org.gridgain.grid.kernal.processors.cache;
import org.apache.ignite.*;
import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.cache.affinity.*;
+import org.apache.ignite.cache.affinity.*;
import org.gridgain.grid.kernal.processors.cache.distributed.dht.preloader.*;
import org.gridgain.grid.util.future.*;
import org.jetbrains.annotations.*;
@@ -38,7 +37,7 @@ public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V>
protected final IgniteLogger log;
/** Affinity. */
- protected final GridCacheAffinityFunction aff;
+ protected final CacheAffinityFunction aff;
/** Start future (always completed by default). */
private final IgniteFuture finFut;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
index d4b971d..6ff14dd 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
@@ -26,10 +26,10 @@ import org.apache.ignite.lang.*;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.spi.*;
import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.affinity.*;
-import org.gridgain.grid.cache.affinity.consistenthash.*;
-import org.gridgain.grid.cache.affinity.fair.*;
-import org.gridgain.grid.cache.affinity.rendezvous.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.affinity.consistenthash.*;
+import org.apache.ignite.cache.affinity.fair.*;
+import org.apache.ignite.cache.affinity.rendezvous.*;
import org.gridgain.grid.cache.store.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.*;
@@ -138,16 +138,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (cfg.getAffinity() == null) {
if (cfg.getCacheMode() == PARTITIONED) {
- GridCacheConsistentHashAffinityFunction aff = new GridCacheConsistentHashAffinityFunction();
+ CacheConsistentHashAffinityFunction aff = new CacheConsistentHashAffinityFunction();
- aff.setHashIdResolver(new GridCacheAffinityNodeAddressHashResolver());
+ aff.setHashIdResolver(new CacheAffinityNodeAddressHashResolver());
cfg.setAffinity(aff);
}
else if (cfg.getCacheMode() == REPLICATED) {
- GridCacheConsistentHashAffinityFunction aff = new GridCacheConsistentHashAffinityFunction(false, 512);
+ CacheConsistentHashAffinityFunction aff = new CacheConsistentHashAffinityFunction(false, 512);
- aff.setHashIdResolver(new GridCacheAffinityNodeAddressHashResolver());
+ aff.setHashIdResolver(new CacheAffinityNodeAddressHashResolver());
cfg.setAffinity(aff);
@@ -158,11 +158,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
else {
if (cfg.getCacheMode() == PARTITIONED) {
- if (cfg.getAffinity() instanceof GridCacheConsistentHashAffinityFunction) {
- GridCacheConsistentHashAffinityFunction aff = (GridCacheConsistentHashAffinityFunction)cfg.getAffinity();
+ if (cfg.getAffinity() instanceof CacheConsistentHashAffinityFunction) {
+ CacheConsistentHashAffinityFunction aff = (CacheConsistentHashAffinityFunction)cfg.getAffinity();
if (aff.getHashIdResolver() == null)
- aff.setHashIdResolver(new GridCacheAffinityNodeAddressHashResolver());
+ aff.setHashIdResolver(new CacheAffinityNodeAddressHashResolver());
}
}
}
@@ -171,7 +171,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
cfg.setBackups(Integer.MAX_VALUE);
if (cfg.getAffinityMapper() == null)
- cfg.setAffinityMapper(new GridCacheDefaultAffinityKeyMapper());
+ cfg.setAffinityMapper(new CacheDefaultAffinityKeyMapper());
ctx.ggfsHelper().preProcessCacheConfiguration(cfg);
@@ -268,20 +268,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
*/
private void validate(IgniteConfiguration c, GridCacheConfiguration cc) throws IgniteCheckedException {
if (cc.getCacheMode() == REPLICATED) {
- if (cc.getAffinity() instanceof GridCachePartitionFairAffinity)
+ if (cc.getAffinity() instanceof CachePartitionFairAffinity)
throw new IgniteCheckedException("REPLICATED cache can not be started with GridCachePartitionFairAffinity" +
" [cacheName=" + cc.getName() + ']');
- if (cc.getAffinity() instanceof GridCacheConsistentHashAffinityFunction) {
- GridCacheConsistentHashAffinityFunction aff = (GridCacheConsistentHashAffinityFunction)cc.getAffinity();
+ if (cc.getAffinity() instanceof CacheConsistentHashAffinityFunction) {
+ CacheConsistentHashAffinityFunction aff = (CacheConsistentHashAffinityFunction)cc.getAffinity();
if (aff.isExcludeNeighbors())
throw new IgniteCheckedException("For REPLICATED cache flag 'excludeNeighbors' in " +
"GridCacheConsistentHashAffinityFunction cannot be set [cacheName=" + cc.getName() + ']');
}
- if (cc.getAffinity() instanceof GridCacheRendezvousAffinityFunction) {
- GridCacheRendezvousAffinityFunction aff = (GridCacheRendezvousAffinityFunction)cc.getAffinity();
+ if (cc.getAffinity() instanceof CacheRendezvousAffinityFunction) {
+ CacheRendezvousAffinityFunction aff = (CacheRendezvousAffinityFunction)cc.getAffinity();
if (aff.isExcludeNeighbors())
throw new IgniteCheckedException("For REPLICATED cache flag 'excludeNeighbors' in " +
@@ -965,10 +965,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
for (GridCacheAdapter cache : ctx.cache().internalCaches()) {
GridCacheConfiguration cfg = cache.configuration();
- if (cfg.getAffinity() instanceof GridCacheConsistentHashAffinityFunction) {
- GridCacheConsistentHashAffinityFunction aff = (GridCacheConsistentHashAffinityFunction)cfg.getAffinity();
+ if (cfg.getAffinity() instanceof CacheConsistentHashAffinityFunction) {
+ CacheConsistentHashAffinityFunction aff = (CacheConsistentHashAffinityFunction)cfg.getAffinity();
- GridCacheAffinityNodeHashResolver hashIdRslvr = aff.getHashIdResolver();
+ CacheAffinityNodeHashResolver hashIdRslvr = aff.getHashIdResolver();
assert hashIdRslvr != null;
@@ -1829,12 +1829,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
*
*/
- private static class LocalAffinityFunction implements GridCacheAffinityFunction {
+ private static class LocalAffinityFunction implements CacheAffinityFunction {
/** */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
- @Override public List<List<ClusterNode>> assignPartitions(GridCacheAffinityFunctionContext affCtx) {
+ @Override public List<List<ClusterNode>> assignPartitions(CacheAffinityFunctionContext affCtx) {
ClusterNode locNode = null;
for (ClusterNode n : affCtx.currentTopologySnapshot()) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
index ac13deb..caaa9a9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
@@ -22,7 +22,6 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.affinity.*;
import org.gridgain.grid.cache.datastructures.*;
import org.gridgain.grid.cache.query.*;
import org.gridgain.grid.kernal.processors.cache.affinity.*;
@@ -71,7 +70,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
private GridCacheDataStructures dataStructures;
/** Affinity. */
- private GridCacheAffinity<K> aff;
+ private IgniteCacheAffinity<K> aff;
/**
* Empty constructor required for {@link Externalizable}.
@@ -99,7 +98,9 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
qry = new GridCacheQueriesProxy<>(ctx, prj, (GridCacheQueriesEx<K, V>)delegate.queries());
dataStructures = new GridCacheDataStructuresProxy<>(ctx, ctx.cache().dataStructures());
- aff = new GridCacheAffinityProxy<>(ctx, ctx.cache().affinity());
+
+ Ignite ignite = ctx.grid();
+ aff = new CacheAffinityProxy<>(ctx, ignite.<K>affinity(ctx.cache().name()));
}
/**
@@ -144,11 +145,6 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Override public GridCacheAffinity<K> affinity() {
- return aff;
- }
-
- /** {@inheritDoc} */
@Override public GridCacheDataStructures dataStructures() {
return dataStructures;
}
@@ -1862,7 +1858,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
qry = new GridCacheQueriesProxy<>(ctx, prj, (GridCacheQueriesEx<K, V>)delegate.queries());
dataStructures = new GridCacheDataStructuresProxy<>(ctx, ctx.cache().dataStructures());
- aff = new GridCacheAffinityProxy<>(ctx, ctx.cache().affinity());
+ aff = new CacheAffinityProxy(ctx, ctx.grid().affinity(ctx.cache().name()));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/CacheAffinityImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/CacheAffinityImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/CacheAffinityImpl.java
new file mode 100644
index 0000000..e6112eb
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/CacheAffinityImpl.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.gridgain.grid.kernal.processors.cache.affinity;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.portables.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Affinity interface implementation.
+ */
+public class CacheAffinityImpl<K, V> implements IgniteCacheAffinity<K> {
+ /** Cache context. */
+ private GridCacheContext<K, V> cctx;
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /**
+ * @param cctx Context.
+ */
+ public CacheAffinityImpl(GridCacheContext<K, V> cctx) {
+ this.cctx = cctx;
+
+ log = cctx.logger(getClass());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitions() {
+ return cctx.config().getAffinity().partitions();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(K key) {
+ A.notNull(key, "key");
+
+ return cctx.affinity().partition(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isPrimary(ClusterNode n, K key) {
+ A.notNull(n, "n", key, "key");
+
+ return cctx.affinity().primary(n, key, topologyVersion());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isBackup(ClusterNode n, K key) {
+ A.notNull(n, "n", key, "key");
+
+ return cctx.affinity().backups(key, topologyVersion()).contains(n);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) {
+ A.notNull(n, "n", key, "key");
+
+ return cctx.affinity().belongs(n, key, topologyVersion());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] primaryPartitions(ClusterNode n) {
+ A.notNull(n, "n");
+
+ long topVer = cctx.discovery().topologyVersion();
+
+ Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topVer);
+
+ return U.toIntArray(parts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] backupPartitions(ClusterNode n) {
+ A.notNull(n, "n");
+
+ long topVer = cctx.discovery().topologyVersion();
+
+ Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topVer);
+
+ return U.toIntArray(parts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] allPartitions(ClusterNode n) {
+ A.notNull(n, "p");
+
+ Collection<Integer> parts = new HashSet<>();
+
+ long topVer = cctx.discovery().topologyVersion();
+
+ for (int partsCnt = partitions(), part = 0; part < partsCnt; part++) {
+ for (ClusterNode affNode : cctx.affinity().nodes(part, topVer)) {
+ if (n.id().equals(affNode.id())) {
+ parts.add(part);
+
+ break;
+ }
+ }
+ }
+
+ return U.toIntArray(parts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode mapPartitionToNode(int part) {
+ A.ensure(part >= 0 && part < partitions(), "part >= 0 && part < total partitions");
+
+ return F.first(cctx.affinity().nodes(part, topologyVersion()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> parts) {
+ A.notNull(parts, "parts");
+
+ Map<Integer, ClusterNode> map = new HashMap<>();
+
+ if (!F.isEmpty(parts)) {
+ for (int p : parts)
+ map.put(p, mapPartitionToNode(p));
+ }
+
+ return map;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object affinityKey(K key) {
+ A.notNull(key, "key");
+
+ if (cctx.portableEnabled()) {
+ try {
+ key = (K)cctx.marshalToPortable(key);
+ }
+ catch (PortableException e) {
+ U.error(log, "Failed to marshal key to portable: " + key, e);
+ }
+ }
+
+ return cctx.config().getAffinityMapper().affinityKey(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public ClusterNode mapKeyToNode(K key) {
+ A.notNull(key, "key");
+
+ return F.first(mapKeysToNodes(F.asList(key)).keySet());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys) {
+ A.notNull(keys, "keys");
+
+ long topVer = topologyVersion();
+
+ int nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.name(), topVer).size();
+
+ // Must return empty map if no alive nodes present or keys is empty.
+ Map<ClusterNode, Collection<K>> res = new HashMap<>(nodesCnt, 1.0f);
+
+ for (K key : keys) {
+ ClusterNode primary = cctx.affinity().primary(key, topVer);
+
+ if (primary != null) {
+ Collection<K> mapped = res.get(primary);
+
+ if (mapped == null) {
+ mapped = new ArrayList<>(Math.max(keys.size() / nodesCnt, 16));
+
+ res.put(primary, mapped);
+ }
+
+ mapped.add(key);
+ }
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key) {
+ A.notNull(key, "key");
+
+ return cctx.affinity().nodes(partition(key), topologyVersion());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part) {
+ A.ensure(part >= 0 && part < partitions(), "part >= 0 && part < total partitions");
+
+ return cctx.affinity().nodes(part, topologyVersion());
+ }
+
+ /**
+ * Gets current topology version.
+ *
+ * @return Topology version.
+ */
+ private long topologyVersion() {
+ return cctx.affinity().affinityTopologyVersion();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/CacheAffinityProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/CacheAffinityProxy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/CacheAffinityProxy.java
new file mode 100644
index 0000000..a204aac
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/CacheAffinityProxy.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.gridgain.grid.kernal.processors.cache.affinity;
+
+import org.apache.ignite.IgniteCacheAffinity;
+import org.apache.ignite.cluster.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Affinity interface implementation.
+ */
+public class CacheAffinityProxy<K, V> implements IgniteCacheAffinity<K>, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache gateway. */
+ private GridCacheGateway<K, V> gate;
+
+ /** Affinity delegate. */
+ private IgniteCacheAffinity<K> delegate;
+
+ /** Context. */
+ private GridCacheContext<K, V> cctx;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public CacheAffinityProxy() {
+ // No-op.
+ }
+
+ /**
+ * @param cctx Context.
+ * @param delegate Delegate object.
+ */
+ public CacheAffinityProxy(GridCacheContext<K, V> cctx, IgniteCacheAffinity<K> delegate) {
+ gate = cctx.gate();
+ this.delegate = delegate;
+ this.cctx = cctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitions() {
+ GridCacheProjectionImpl<K, V> old = gate.enter(null);
+
+ try {
+ return delegate.partitions();
+ }
+ finally {
+ gate.leave(old);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(K key) {
+ GridCacheProjectionImpl<K, V> old = gate.enter(null);
+
+ try {
+ return delegate.partition(key);
+ }
+ finally {
+ gate.leave(old);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isPrimary(ClusterNode n, K key) {
+ GridCacheProjectionImpl<K, V> old = gate.enter(null);
+
+ try {
+ return delegate.isPrimary(n, key);
+ }
+ finally {
+ gate.leave(old);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isBackup(ClusterNode n, K key) {
+ GridCacheProjectionImpl<K, V> old = gate.enter(null);
+
+ try {
+ return delegate.isBackup(n, key);
+ }
+ finally {
+ gate.leave(old);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) {
+ GridCacheProjectionImpl<K, V> old = gate.enter(null);
+
+ try {
+ return delegate.isPrimaryOrBackup(n, key);
+ }
+ finally {
+ gate.leave(old);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] primaryPartitions(ClusterNode n) {
+ GridCacheProjectionImpl<K, V> old = gate.enter(null);
+
+ try {
+ return delegate.primaryPartitions(n);
+ }
+ finally {
+ gate.leave(old);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] backupPartitions(ClusterNode n) {
+ GridCacheProjectionImpl<K, V> old = gate.enter(null);
+
+ try {
+ return delegate.backupPartitions(n);
+ }
+ finally {
+ gate.leave(old);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] allPartitions(ClusterNode n) {
+ GridCacheProjectionImpl<K, V> old = gate.enter(null);
+
+ try {
+ return delegate.allPartitions(n);
+ }
+ finally {
+ gate.leave(old);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode mapPartitionToNode(int part) {
+ GridCacheProjectionImpl<K, V> old = gate.enter(null);
+
+ try {
+ return delegate.mapPartitionToNode(part);
+ }
+ finally {
+ gate.leave(old);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> parts) {
+ GridCacheProjectionImpl<K, V> old = gate.enter(null);
+
+ try {
+ return delegate.mapPartitionsToNodes(parts);
+ }
+ finally {
+ gate.leave(old);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object affinityKey(K key) {
+ GridCacheProjectionImpl<K, V> old = gate.enter(null);
+
+ try {
+ return delegate.affinityKey(key);
+ }
+ finally {
+ gate.leave(old);
+ }
+ }
+
+
+ /** {@inheritDoc} */
+ @Override @Nullable public ClusterNode mapKeyToNode(K key) {
+ GridCacheProjectionImpl<K, V> old = gate.enter(null);
+
+ try {
+ return delegate.mapKeyToNode(key);
+ }
+ finally {
+ gate.leave(old);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys) {
+ GridCacheProjectionImpl<K, V> old = gate.enter(null);
+
+ try {
+ return delegate.mapKeysToNodes(keys);
+ }
+ finally {
+ gate.leave(old);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key) {
+ GridCacheProjectionImpl<K, V> old = gate.enter(null);
+
+ try {
+ return delegate.mapKeyToPrimaryAndBackups(key);
+ }
+ finally {
+ gate.leave(old);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part) {
+ GridCacheProjectionImpl<K, V> old = gate.enter(null);
+
+ try {
+ return delegate.mapPartitionToPrimaryAndBackups(part);
+ }
+ finally {
+ gate.leave(old);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(cctx);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ cctx = (GridCacheContext<K, V>)in.readObject();
+ }
+
+ /**
+ * Reconstructs object on unmarshalling.
+ *
+ * @return Reconstructed object.
+ * @throws ObjectStreamException Thrown in case of unmarshalling error.
+ */
+ private Object readResolve() throws ObjectStreamException {
+ return cctx.grid().affinity(cctx.cache().name());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/GridCacheAffinityImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/GridCacheAffinityImpl.java
deleted file mode 100644
index a820f9c..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/GridCacheAffinityImpl.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.affinity;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.portables.*;
-import org.gridgain.grid.cache.affinity.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Affinity interface implementation.
- */
-public class GridCacheAffinityImpl<K, V> implements GridCacheAffinity<K> {
- /** Cache context. */
- private GridCacheContext<K, V> cctx;
-
- /** Logger. */
- private IgniteLogger log;
-
- /**
- * @param cctx Context.
- */
- public GridCacheAffinityImpl(GridCacheContext<K, V> cctx) {
- this.cctx = cctx;
-
- log = cctx.logger(getClass());
- }
-
- /** {@inheritDoc} */
- @Override public int partitions() {
- return cctx.config().getAffinity().partitions();
- }
-
- /** {@inheritDoc} */
- @Override public int partition(K key) {
- A.notNull(key, "key");
-
- return cctx.affinity().partition(key);
- }
-
- /** {@inheritDoc} */
- @Override public boolean isPrimary(ClusterNode n, K key) {
- A.notNull(n, "n", key, "key");
-
- return cctx.affinity().primary(n, key, topologyVersion());
- }
-
- /** {@inheritDoc} */
- @Override public boolean isBackup(ClusterNode n, K key) {
- A.notNull(n, "n", key, "key");
-
- return cctx.affinity().backups(key, topologyVersion()).contains(n);
- }
-
- /** {@inheritDoc} */
- @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) {
- A.notNull(n, "n", key, "key");
-
- return cctx.affinity().belongs(n, key, topologyVersion());
- }
-
- /** {@inheritDoc} */
- @Override public int[] primaryPartitions(ClusterNode n) {
- A.notNull(n, "n");
-
- long topVer = cctx.discovery().topologyVersion();
-
- Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topVer);
-
- return U.toIntArray(parts);
- }
-
- /** {@inheritDoc} */
- @Override public int[] backupPartitions(ClusterNode n) {
- A.notNull(n, "n");
-
- long topVer = cctx.discovery().topologyVersion();
-
- Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topVer);
-
- return U.toIntArray(parts);
- }
-
- /** {@inheritDoc} */
- @Override public int[] allPartitions(ClusterNode n) {
- A.notNull(n, "p");
-
- Collection<Integer> parts = new HashSet<>();
-
- long topVer = cctx.discovery().topologyVersion();
-
- for (int partsCnt = partitions(), part = 0; part < partsCnt; part++) {
- for (ClusterNode affNode : cctx.affinity().nodes(part, topVer)) {
- if (n.id().equals(affNode.id())) {
- parts.add(part);
-
- break;
- }
- }
- }
-
- return U.toIntArray(parts);
- }
-
- /** {@inheritDoc} */
- @Override public ClusterNode mapPartitionToNode(int part) {
- A.ensure(part >= 0 && part < partitions(), "part >= 0 && part < total partitions");
-
- return F.first(cctx.affinity().nodes(part, topologyVersion()));
- }
-
- /** {@inheritDoc} */
- @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> parts) {
- A.notNull(parts, "parts");
-
- Map<Integer, ClusterNode> map = new HashMap<>();
-
- if (!F.isEmpty(parts)) {
- for (int p : parts)
- map.put(p, mapPartitionToNode(p));
- }
-
- return map;
- }
-
- /** {@inheritDoc} */
- @Override public Object affinityKey(K key) {
- A.notNull(key, "key");
-
- if (cctx.portableEnabled()) {
- try {
- key = (K)cctx.marshalToPortable(key);
- }
- catch (PortableException e) {
- U.error(log, "Failed to marshal key to portable: " + key, e);
- }
- }
-
- return cctx.config().getAffinityMapper().affinityKey(key);
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public ClusterNode mapKeyToNode(K key) {
- A.notNull(key, "key");
-
- return F.first(mapKeysToNodes(F.asList(key)).keySet());
- }
-
- /** {@inheritDoc} */
- @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys) {
- A.notNull(keys, "keys");
-
- long topVer = topologyVersion();
-
- int nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.name(), topVer).size();
-
- // Must return empty map if no alive nodes present or keys is empty.
- Map<ClusterNode, Collection<K>> res = new HashMap<>(nodesCnt, 1.0f);
-
- for (K key : keys) {
- ClusterNode primary = cctx.affinity().primary(key, topVer);
-
- if (primary != null) {
- Collection<K> mapped = res.get(primary);
-
- if (mapped == null) {
- mapped = new ArrayList<>(Math.max(keys.size() / nodesCnt, 16));
-
- res.put(primary, mapped);
- }
-
- mapped.add(key);
- }
- }
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key) {
- A.notNull(key, "key");
-
- return cctx.affinity().nodes(partition(key), topologyVersion());
- }
-
- /** {@inheritDoc} */
- @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part) {
- A.ensure(part >= 0 && part < partitions(), "part >= 0 && part < total partitions");
-
- return cctx.affinity().nodes(part, topologyVersion());
- }
-
- /**
- * Gets current topology version.
- *
- * @return Topology version.
- */
- private long topologyVersion() {
- return cctx.affinity().affinityTopologyVersion();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/GridCacheAffinityProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/GridCacheAffinityProxy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/GridCacheAffinityProxy.java
deleted file mode 100644
index f4e61f7..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/affinity/GridCacheAffinityProxy.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.affinity;
-
-import org.apache.ignite.cluster.*;
-import org.gridgain.grid.cache.affinity.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Affinity interface implementation.
- */
-public class GridCacheAffinityProxy<K, V> implements GridCacheAffinity<K>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Cache gateway. */
- private GridCacheGateway<K, V> gate;
-
- /** Affinity delegate. */
- private GridCacheAffinity<K> delegate;
-
- /** Context. */
- private GridCacheContext<K, V> cctx;
-
- /**
- * Required by {@link Externalizable}.
- */
- public GridCacheAffinityProxy() {
- // No-op.
- }
-
- /**
- * @param cctx Context.
- * @param delegate Delegate object.
- */
- public GridCacheAffinityProxy(GridCacheContext<K, V> cctx, GridCacheAffinity<K> delegate) {
- gate = cctx.gate();
- this.delegate = delegate;
- this.cctx = cctx;
- }
-
- /** {@inheritDoc} */
- @Override public int partitions() {
- GridCacheProjectionImpl<K, V> old = gate.enter(null);
-
- try {
- return delegate.partitions();
- }
- finally {
- gate.leave(old);
- }
- }
-
- /** {@inheritDoc} */
- @Override public int partition(K key) {
- GridCacheProjectionImpl<K, V> old = gate.enter(null);
-
- try {
- return delegate.partition(key);
- }
- finally {
- gate.leave(old);
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean isPrimary(ClusterNode n, K key) {
- GridCacheProjectionImpl<K, V> old = gate.enter(null);
-
- try {
- return delegate.isPrimary(n, key);
- }
- finally {
- gate.leave(old);
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean isBackup(ClusterNode n, K key) {
- GridCacheProjectionImpl<K, V> old = gate.enter(null);
-
- try {
- return delegate.isBackup(n, key);
- }
- finally {
- gate.leave(old);
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) {
- GridCacheProjectionImpl<K, V> old = gate.enter(null);
-
- try {
- return delegate.isPrimaryOrBackup(n, key);
- }
- finally {
- gate.leave(old);
- }
- }
-
- /** {@inheritDoc} */
- @Override public int[] primaryPartitions(ClusterNode n) {
- GridCacheProjectionImpl<K, V> old = gate.enter(null);
-
- try {
- return delegate.primaryPartitions(n);
- }
- finally {
- gate.leave(old);
- }
- }
-
- /** {@inheritDoc} */
- @Override public int[] backupPartitions(ClusterNode n) {
- GridCacheProjectionImpl<K, V> old = gate.enter(null);
-
- try {
- return delegate.backupPartitions(n);
- }
- finally {
- gate.leave(old);
- }
- }
-
- /** {@inheritDoc} */
- @Override public int[] allPartitions(ClusterNode n) {
- GridCacheProjectionImpl<K, V> old = gate.enter(null);
-
- try {
- return delegate.allPartitions(n);
- }
- finally {
- gate.leave(old);
- }
- }
-
- /** {@inheritDoc} */
- @Override public ClusterNode mapPartitionToNode(int part) {
- GridCacheProjectionImpl<K, V> old = gate.enter(null);
-
- try {
- return delegate.mapPartitionToNode(part);
- }
- finally {
- gate.leave(old);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> parts) {
- GridCacheProjectionImpl<K, V> old = gate.enter(null);
-
- try {
- return delegate.mapPartitionsToNodes(parts);
- }
- finally {
- gate.leave(old);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Object affinityKey(K key) {
- GridCacheProjectionImpl<K, V> old = gate.enter(null);
-
- try {
- return delegate.affinityKey(key);
- }
- finally {
- gate.leave(old);
- }
- }
-
-
- /** {@inheritDoc} */
- @Override @Nullable public ClusterNode mapKeyToNode(K key) {
- GridCacheProjectionImpl<K, V> old = gate.enter(null);
-
- try {
- return delegate.mapKeyToNode(key);
- }
- finally {
- gate.leave(old);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys) {
- GridCacheProjectionImpl<K, V> old = gate.enter(null);
-
- try {
- return delegate.mapKeysToNodes(keys);
- }
- finally {
- gate.leave(old);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key) {
- GridCacheProjectionImpl<K, V> old = gate.enter(null);
-
- try {
- return delegate.mapKeyToPrimaryAndBackups(key);
- }
- finally {
- gate.leave(old);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part) {
- GridCacheProjectionImpl<K, V> old = gate.enter(null);
-
- try {
- return delegate.mapPartitionToPrimaryAndBackups(part);
- }
- finally {
- gate.leave(old);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(cctx);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- cctx = (GridCacheContext<K, V>)in.readObject();
- }
-
- /**
- * Reconstructs object on unmarshalling.
- *
- * @return Reconstructed object.
- * @throws ObjectStreamException Thrown in case of unmarshalling error.
- */
- private Object readResolve() throws ObjectStreamException {
- return cctx.grid().cache(cctx.cache().name()).affinity();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheInternalKeyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheInternalKeyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheInternalKeyImpl.java
index e51a9fc..0b18de2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheInternalKeyImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheInternalKeyImpl.java
@@ -17,7 +17,7 @@
package org.gridgain.grid.kernal.processors.cache.datastructures;
-import org.gridgain.grid.cache.affinity.GridCacheAffinityKeyMapped;
+import org.apache.ignite.cache.affinity.CacheAffinityKeyMapped;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
@@ -52,7 +52,7 @@ public class GridCacheInternalKeyImpl implements GridCacheInternalKey, Externali
}
/** {@inheritDoc} */
- @GridCacheAffinityKeyMapped
+ @CacheAffinityKeyMapped
@Override public String name() {
return name;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java
index b8d603b..aaa11fe 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueAdapter.java
@@ -19,7 +19,7 @@ package org.gridgain.grid.kernal.processors.cache.datastructures;
import org.apache.ignite.*;
import org.apache.ignite.lang.*;
-import org.gridgain.grid.cache.affinity.*;
+import org.apache.ignite.cache.affinity.*;
import org.gridgain.grid.cache.datastructures.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.util.tostring.*;
@@ -636,7 +636,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
/**
* @return Item affinity key.
*/
- @GridCacheAffinityKeyMapped
+ @CacheAffinityKeyMapped
public Object affinityKey() {
return queueName();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java
index cacc933..4f2dc55 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java
@@ -21,7 +21,7 @@ import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.affinity.*;
+import org.apache.ignite.cache.affinity.*;
import org.gridgain.grid.cache.datastructures.*;
import org.gridgain.grid.cache.query.*;
import org.gridgain.grid.kernal.processors.cache.*;
@@ -556,7 +556,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa
/**
* @return Item affinity key.
*/
- @GridCacheAffinityKeyMapped
+ @CacheAffinityKeyMapped
public Object affinityKey() {
return setName;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 04c86a9..83159f0 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -22,7 +22,7 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.events.*;
import org.apache.ignite.lang.*;
import org.gridgain.grid.*;
-import org.gridgain.grid.cache.affinity.*;
+import org.apache.ignite.cache.affinity.*;
import org.gridgain.grid.kernal.managers.discovery.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
@@ -312,10 +312,10 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
* @return {@code True} if local node can calculate affinity on it's own for this partition map exchange.
*/
private boolean canCalculateAffinity(GridCacheContext<K, V> cacheCtx) {
- GridCacheAffinityFunction affFunc = cacheCtx.config().getAffinity();
+ CacheAffinityFunction affFunc = cacheCtx.config().getAffinity();
// Do not request affinity from remote nodes if affinity function is not centralized.
- if (!U.hasAnnotation(affFunc, GridCacheCentralizedAffinityFunction.class))
+ if (!U.hasAnnotation(affFunc, CacheCentralizedAffinityFunction.class))
return true;
// If local node did not initiate exchange or local node is the only cache node in grid.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
index 7184c7b..d449396 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -597,7 +597,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
info.unmarshalValue(cctx, cctx.deploy().globalLoader());
// Entries available locally in DHT should not be loaded into near cache for reading.
- if (!cctx.cache().affinity().isPrimaryOrBackup(cctx.localNode(), info.key())) {
+ if (!cctx.grid().affinity(ctx.cache().cache().name()).isPrimaryOrBackup(cctx.localNode(), info.key())) {
GridNearCacheEntry<K, V> entry = cache().entryExx(info.key(), topVer);
GridCacheVersion saved = savedVers.get(info.key());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java
index 9817fd6..f9ea82f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java
@@ -24,7 +24,7 @@ import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.resources.*;
import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.affinity.*;
+import org.apache.ignite.cache.affinity.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.*;
import org.gridgain.grid.util.*;
@@ -1050,7 +1050,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
private final String cn = cacheName;
/** */
- @GridCacheAffinityKeyMapped
+ @CacheAffinityKeyMapped
private final Object ak = affKey;
@Override public Object execute() {
@@ -1074,7 +1074,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
private final String cn = cacheName;
/** */
- @GridCacheAffinityKeyMapped
+ @CacheAffinityKeyMapped
private final Object ak = affKey;
@Override public Object execute() {
@@ -1142,7 +1142,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
private final String cn = cacheName;
/** */
- @GridCacheAffinityKeyMapped
+ @CacheAffinityKeyMapped
private final Object ak = affKey;
@Nullable @Override public Object execute() {
@@ -1163,7 +1163,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
private final String cn = cacheName;
/** */
- @GridCacheAffinityKeyMapped
+ @CacheAffinityKeyMapped
private final Object ak = affKey;
@Nullable @Override public Object execute() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java
index 03e1c80..79ab954 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadCacheUpdaters.java
@@ -22,7 +22,6 @@ import org.apache.ignite.dataload.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.transactions.*;
import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.affinity.*;
import org.gridgain.grid.util.typedef.*;
import org.jetbrains.annotations.*;
@@ -243,7 +242,7 @@ public class GridDataLoadCacheUpdaters {
Map<Integer, Collection<K>> rmvPartMap = null;
Map<Integer, Map<K, V>> putPartMap = null;
- GridCacheAffinity<K> aff = cache.ignite().<K, V>cache(cache.getName()).affinity();
+ IgniteCacheAffinity<K> aff = cache.ignite().affinity(cache.getName());
for (Map.Entry<K, V> entry : entries) {
K key = entry.getKey();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java
index 4408966..8fe4602 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java
@@ -26,7 +26,7 @@ import org.apache.ignite.thread.*;
import org.apache.ignite.transactions.*;
import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.affinity.*;
+import org.apache.ignite.cache.affinity.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.managers.communication.*;
import org.gridgain.grid.kernal.managers.eventstorage.*;
@@ -71,6 +71,9 @@ public class GridGgfsDataManager extends GridGgfsManager {
/** Data cache. */
private GridCache<Object, Object> dataCache;
+ /** Affinity */
+ private IgniteCacheAffinity<Object> cacheAff;
+
/** */
private IgniteFuture<?> dataCacheStartFut;
@@ -153,6 +156,8 @@ public class GridGgfsDataManager extends GridGgfsManager {
dataCachePrj = ggfsCtx.kernalContext().cache().internalCache(ggfsCtx.configuration().getDataCacheName());
dataCache = ggfsCtx.kernalContext().cache().internalCache(ggfsCtx.configuration().getDataCacheName());
+ cacheAff = ggfsCtx.kernalContext().grid().affinity(dataCache.name());
+
dataCacheStartFut = ggfsCtx.kernalContext().cache().internalCache(ggfsCtx.configuration().getDataCacheName())
.preloader().startFuture();
@@ -164,7 +169,7 @@ public class GridGgfsDataManager extends GridGgfsManager {
assert dataCachePrj != null;
- GridCacheAffinityKeyMapper mapper = ggfsCtx.kernalContext().cache()
+ CacheAffinityKeyMapper mapper = ggfsCtx.kernalContext().cache()
.internalCache(ggfsCtx.configuration().getDataCacheName()).configuration().getAffinityMapper();
grpSize = mapper instanceof IgniteFsGroupDataBlocksKeyMapper ?
@@ -275,13 +280,13 @@ public class GridGgfsDataManager extends GridGgfsManager {
UUID nodeId = ggfsCtx.kernalContext().localNodeId();
- if (prevAffKey != null && dataCache.affinity().mapKeyToNode(prevAffKey).isLocal())
+ if (prevAffKey != null && cacheAff.mapKeyToNode(prevAffKey).isLocal())
return prevAffKey;
while (true) {
IgniteUuid key = new IgniteUuid(nodeId, affKeyGen.getAndIncrement());
- if (dataCache.affinity().mapKeyToNode(key).isLocal())
+ if (cacheAff.mapKeyToNode(key).isLocal())
return key;
}
}
@@ -293,7 +298,7 @@ public class GridGgfsDataManager extends GridGgfsManager {
* @return Primary node for this key.
*/
public ClusterNode affinityNode(Object affinityKey) {
- return dataCache.affinity().mapKeyToNode(affinityKey);
+ return cacheAff.mapKeyToNode(affinityKey);
}
/**
@@ -349,7 +354,7 @@ public class GridGgfsDataManager extends GridGgfsManager {
GridGgfsBlockKey key = new GridGgfsBlockKey(fileInfo.id(), fileInfo.affinityKey(),
fileInfo.evictExclude(), i);
- Collection<ClusterNode> affNodes = dataCache.affinity().mapKeyToPrimaryAndBackups(key);
+ Collection<ClusterNode> affNodes = cacheAff.mapKeyToPrimaryAndBackups(key);
assert affNodes != null && !affNodes.isEmpty();
@@ -782,8 +787,8 @@ public class GridGgfsDataManager extends GridGgfsManager {
if (info.affinityKey() != null) {
Collection<IgniteFsBlockLocation> res = new LinkedList<>();
- splitBlocks(start, len, maxLen, dataCache.affinity().mapKeyToPrimaryAndBackups(
- new GridGgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), 0)), res);
+ splitBlocks(start, len, maxLen, cacheAff.mapKeyToPrimaryAndBackups(
+ new GridGgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), 0)), res);
return res;
}
@@ -818,8 +823,8 @@ public class GridGgfsDataManager extends GridGgfsManager {
if (range.belongs(pos)) {
long partEnd = Math.min(range.endOffset() + 1, end);
- Collection<ClusterNode> affNodes = dataCache.affinity().mapKeyToPrimaryAndBackups(
- range.affinityKey());
+ Collection<ClusterNode> affNodes = cacheAff.mapKeyToPrimaryAndBackups(
+ range.affinityKey());
if (log.isDebugEnabled())
log.debug("Calculated affinity for range [start=" + pos + ", end=" + partEnd +
@@ -901,7 +906,7 @@ public class GridGgfsDataManager extends GridGgfsManager {
GridGgfsBlockKey key = new GridGgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(),
grpIdx * grpSize);
- Collection<ClusterNode> affNodes = dataCache.affinity().mapKeyToPrimaryAndBackups(key);
+ Collection<ClusterNode> affNodes = cacheAff.mapKeyToPrimaryAndBackups(key);
if (log.isDebugEnabled())
log.debug("Mapped key to nodes [key=" + key + ", nodes=" + F.nodeIds(affNodes) +
@@ -1453,7 +1458,8 @@ public class GridGgfsDataManager extends GridGgfsManager {
// Will update range if necessary.
GridGgfsBlockKey key = createBlockKey(block, fileInfo, affinityRange);
- ClusterNode primaryNode = dataCachePrj.cache().affinity().mapKeyToNode(key);
+ Ignite ignite = dataCachePrj.gridProjection().ignite();
+ ClusterNode primaryNode = ignite.affinity(dataCachePrj.name()).mapKeyToNode(key);
if (block == first) {
off = (int)blockStartOff;