You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/05/05 10:16:53 UTC
[08/15] incubator-ignite git commit: #ignite-732: IgniteCache.size()
should not fail in case of topology changes.
#ignite-732: IgniteCache.size() should not fail in case of topology changes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/139aa270
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/139aa270
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/139aa270
Branch: refs/heads/ignite-gg-9819
Commit: 139aa270ae61494c0757867f2dc531ec7251b1da
Parents: 0885ac0
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu Apr 30 18:43:56 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu Apr 30 18:43:56 2015 +0300
----------------------------------------------------------------------
.../ignite/compute/ComputeTaskAdapter.java | 14 +-
.../processors/cache/GridCacheAdapter.java | 503 ++++++++++---------
.../processors/cache/GridCacheProcessor.java | 109 ++--
3 files changed, 349 insertions(+), 277 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/139aa270/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
index c2ad198..87081fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
@@ -24,15 +24,16 @@ import java.util.*;
/**
* Convenience adapter for {@link ComputeTask} interface. Here is an example of
- * how {@code GridComputeTaskAdapter} can be used:
+ * how {@code ComputeTaskAdapter} can be used:
* <pre name="code" class="java">
- * public class MyFooBarTask extends GridComputeTaskAdapter<String, String> {
+ * public class MyFooBarTask extends ComputeTaskAdapter<String, String> {
* // Inject load balancer.
* @LoadBalancerResource
* ComputeLoadBalancer balancer;
*
* // Map jobs to grid nodes.
- * public Map<? extends ComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws IgniteCheckedException {
+ * public Map<? extends ComputeJob, GridNode> map(List<GridNode> subgrid, String arg)
+ * throws IgniteCheckedException {
* Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size());
*
* // In more complex cases, you can actually do
@@ -76,8 +77,8 @@ public abstract class ComputeTaskAdapter<T, R> implements ComputeTask<T, R> {
* <p>
* If remote job resulted in exception ({@link ComputeJobResult#getException()} is not {@code null}),
* then {@link ComputeJobResultPolicy#FAILOVER} policy will be returned if the exception is instance
- * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link ComputeExecutionRejectedException}, which means that
- * remote node either failed or job execution was rejected before it got a chance to start. In all
+ * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link ComputeExecutionRejectedException},
+ * which means that remote node either failed or job execution was rejected before it got a chance to start. In all
* other cases the exception will be rethrown which will ultimately cause task to fail.
*
* @param res Received remote grid executable result.
@@ -87,7 +88,8 @@ public abstract class ComputeTaskAdapter<T, R> implements ComputeTask<T, R> {
* @throws IgniteException If handling a job result caused an error effectively rejecting
* a failover. This exception will be thrown out of {@link ComputeTaskFuture#get()} method.
*/
- @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd)
+ throws IgniteException {
IgniteException e = res.getException();
// Try to failover if result is failed.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/139aa270/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3f4e97b..39f19b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -75,6 +75,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** */
private static final long serialVersionUID = 0L;
+ /** Failed result. */
+ private static final Object FAIL = new Integer(-1);
+
/** clearLocally() split threshold. */
public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000;
@@ -882,7 +885,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public Set<Cache.Entry<K, V>> entrySet() {
- return entrySet((CacheEntryPredicate[]) null);
+ return entrySet((CacheEntryPredicate[])null);
}
/** {@inheritDoc} */
@@ -897,17 +900,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public Set<K> keySet() {
- return keySet((CacheEntryPredicate[]) null);
+ return keySet((CacheEntryPredicate[])null);
}
/** {@inheritDoc} */
@Override public Set<K> primaryKeySet() {
- return primaryKeySet((CacheEntryPredicate[]) null);
+ return primaryKeySet((CacheEntryPredicate[])null);
}
/** {@inheritDoc} */
@Override public Collection<V> values() {
- return values((CacheEntryPredicate[]) null);
+ return values((CacheEntryPredicate[])null);
}
/**
@@ -1080,36 +1083,31 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public void clear() throws IgniteCheckedException {
- // Clear local cache synchronously.
- clearLocally();
-
- clearRemotes(0, new GlobalClearAllCallable(name()));
+ clearAll(0, new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion()));
}
/** {@inheritDoc} */
@Override public void clear(K key) throws IgniteCheckedException {
- // Clear local cache synchronously.
- clearLocally(key);
-
- clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key)));
+ clearAll(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(),
+ Collections.singleton(key)));
}
/** {@inheritDoc} */
@Override public void clearAll(Set<? extends K> keys) throws IgniteCheckedException {
- // Clear local cache synchronously.
- clearLocallyAll(keys);
-
- clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), keys));
+ clearAll(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(),
+ keys));
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> clearAsync(K key) {
- return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key)));
+ return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(),
+ Collections.singleton(key)));
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) {
- return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), keys));
+ return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(),
+ keys));
}
/**
@@ -1118,19 +1116,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param clearCall Global clear callable object.
* @throws IgniteCheckedException In case of cache could not be cleared on any of the nodes.
*/
- private void clearRemotes(long timeout, GlobalClearCallable clearCall) throws IgniteCheckedException {
+ private void clearAll(long timeout, TopologyVersionAwareCallable clearCall) throws IgniteCheckedException {
try {
- // Send job to remote nodes only.
- Collection<ClusterNode> nodes =
- ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes();
-
IgniteInternalFuture<Object> fut = null;
- if (!nodes.isEmpty()) {
- ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout);
+ ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout);
- fut = ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true);
- }
+ fut = new ClearFuture(ctx, clearCall);
if (fut != null)
fut.get();
@@ -1149,19 +1141,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> clearAsync() {
- return clearAsync(new GlobalClearAllCallable(name()));
+ return clearAsync(new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion()));
}
/**
* @param clearCall Global clear callable object.
* @return Future.
*/
- private IgniteInternalFuture<?> clearAsync(GlobalClearCallable clearCall) {
+ private IgniteInternalFuture<?> clearAsync(TopologyVersionAwareCallable clearCall) {
Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name(), true, true, false).nodes();
if (!nodes.isEmpty()) {
- IgniteInternalFuture<Object> fut =
- ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true);
+ IgniteInternalFuture<Object> fut = new ClearFuture(ctx, clearCall);
return fut.chain(new CX1<IgniteInternalFuture<Object>, Object>() {
@Override public Object applyx(IgniteInternalFuture<Object> fut) throws IgniteCheckedException {
@@ -2117,7 +2108,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
throws IgniteCheckedException {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys,
new C1<K, EntryProcessor<K, V, Object>>() {
- @Override public EntryProcessor apply(K k) {
+ @Override public EntryProcessor apply(K k) {
return entryProcessor;
}
});
@@ -2145,7 +2136,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp() {
@Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
- Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor);
+ Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor);
return tx.invokeAsync(ctx, invokeMap, args);
}
@@ -2371,7 +2362,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
@Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray())
- .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
+ .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
}
@Override public String toString() {
@@ -2526,7 +2517,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return asyncOp(new AsyncOp<Boolean>() {
@Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
+ (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
}
@Override public String toString() {
@@ -2915,7 +2906,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(oldVal);
- return (GridCacheReturn) tx.putAllAsync(ctx,
+ return tx.putAllAsync(ctx,
F.t(key, newVal),
true,
null,
@@ -3017,7 +3008,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.deploy().registerClass(val);
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false,
- ctx.equalsValArray(val)).get().success();
+ ctx.equalsValArray(val)).get().success();
}
@Override public String toString() {
@@ -3230,10 +3221,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
TransactionConfiguration cfg = ctx.gridConfig().getTransactionConfiguration();
return txStart(
- concurrency,
- isolation,
- cfg.getDefaultTxTimeout(),
- 0
+ concurrency,
+ isolation,
+ cfg.getDefaultTxTimeout(),
+ 0
);
}
@@ -3576,22 +3567,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (nodes.isEmpty())
return new GridFinishedFuture<>(0);
- IgniteInternalFuture<Collection<Integer>> fut =
- ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), peekModes), null, nodes);
-
- return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>, Integer>() {
- @Override public Integer applyx(IgniteInternalFuture<Collection<Integer>> fut)
- throws IgniteCheckedException {
- Collection<Integer> res = fut.get();
-
- int totalSize = 0;
-
- for (Integer size : res)
- totalSize += size;
-
- return totalSize;
- }
- });
+ return new SizeFuture(peekModes, ctx, modes.near);
}
/** {@inheritDoc} */
@@ -3675,7 +3651,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return F.iterator(iterator(),
new IgniteClosure<Cache.Entry<K, V>, Cache.Entry<K, V>>() {
private IgniteCacheExpiryPolicy expiryPlc =
- ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
+ ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
@Override public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) {
CacheOperationContext prev = ctx.gate().enter(opCtx);
@@ -3909,50 +3885,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
- * Gets cache global size (with or without backups).
- *
- * @param primaryOnly {@code True} if only primary sizes should be included.
- * @return Global size.
- * @throws IgniteCheckedException If internal task execution failed.
- */
- private int globalSize(boolean primaryOnly) throws IgniteCheckedException {
- try {
- // Send job to remote nodes only.
- Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes();
-
- IgniteInternalFuture<Collection<Integer>> fut = null;
-
- if (!nodes.isEmpty()) {
- ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, gridCfg.getNetworkTimeout());
-
- fut = ctx.closures().broadcastNoFailover(new GlobalSizeCallable(name(), primaryOnly), null, nodes);
- }
-
- // Get local value.
- int globalSize = primaryOnly ? primarySize() : size();
-
- if (fut != null) {
- for (Integer i : fut.get())
- globalSize += i;
- }
-
- return globalSize;
- }
- catch (ClusterGroupEmptyCheckedException ignore) {
- if (log.isDebugEnabled())
- log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]");
-
- return primaryOnly ? primarySize() : size();
- }
- catch (ComputeTaskTimeoutCheckedException e) {
- U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " +
- "'networkTimeout' configuration property) [cacheName=" + name() + "]");
-
- throw e;
- }
- }
-
- /**
* @param op Cache operation.
* @param <T> Return type.
* @return Operation result.
@@ -4893,47 +4825,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
- * Internal callable which performs clear operation on a cache with the given name.
- */
- @GridInternal
- private static abstract class GlobalClearCallable implements Callable<Object>, Externalizable {
- /** Cache name. */
- protected String cacheName;
-
- /** Injected grid instance. */
- @IgniteInstanceResource
- protected Ignite ignite;
-
- /**
- * Empty constructor for serialization.
- */
- public GlobalClearCallable() {
- // No-op.
- }
-
- /**
- * @param cacheName Cache name.
- */
- protected GlobalClearCallable(String cacheName) {
- this.cacheName = cacheName;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, cacheName);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- cacheName = U.readString(in);
- }
- }
-
- /**
* Global clear all.
*/
@GridInternal
- private static class GlobalClearAllCallable extends GlobalClearCallable {
+ private static class GlobalClearAllCallable extends TopologyVersionAwareCallable {
/** */
private static final long serialVersionUID = 0L;
@@ -4946,24 +4841,30 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* @param cacheName Cache name.
+ * @param topVer Affinity topology version.
*/
- private GlobalClearAllCallable(String cacheName) {
- super(cacheName);
+ private GlobalClearAllCallable(String cacheName, AffinityTopologyVersion topVer) {
+ super(cacheName, topVer);
}
/** {@inheritDoc} */
- @Override public Object call() throws Exception {
+ @Override protected Object callLocal() {
((IgniteEx)ignite).cachex(cacheName).clearLocally();
return null;
}
+
+ /** {@inheritDoc} */
+ @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) {
+ return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).nodes();
+ }
}
/**
* Global clear keys.
*/
@GridInternal
- private static class GlobalClearKeySetCallable<K, V> extends GlobalClearCallable {
+ private static class GlobalClearKeySetCallable<K, V> extends TopologyVersionAwareCallable {
/** */
private static final long serialVersionUID = 0L;
@@ -4979,33 +4880,25 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* @param cacheName Cache name.
+ * @param topVer Affinity topology version.
* @param keys Keys to clear.
*/
- private GlobalClearKeySetCallable(String cacheName, Set<? extends K> keys) {
- super(cacheName);
+ protected GlobalClearKeySetCallable(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) {
+ super(cacheName, topVer);
this.keys = keys;
}
/** {@inheritDoc} */
- @Override public Object call() throws Exception {
- ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys);
-
- return null;
+ @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) {
+ return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).nodes();
}
/** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- super.writeExternal(out);
-
- out.writeObject(keys);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- super.readExternal(in);
+ @Override protected Object callLocal() {
+ ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys);
- keys = (Set<K>) in.readObject();
+ return null;
}
}
@@ -5013,127 +4906,202 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* Internal callable for global size calculation.
*/
@GridInternal
- private static class SizeCallable extends IgniteClosureX<Object, Integer> implements Externalizable {
+ private static class GlobalSizeCallable extends TopologyVersionAwareCallable {
/** */
private static final long serialVersionUID = 0L;
- /** Cache name. */
- private String cacheName;
-
/** Peek modes. */
private CachePeekMode[] peekModes;
- /** Injected grid instance. */
- @IgniteInstanceResource
- private Ignite ignite;
+ /** Near enable. */
+ private boolean nearEnable;
/**
* Required by {@link Externalizable}.
*/
- public SizeCallable() {
+ public GlobalSizeCallable() {
// No-op.
}
/**
* @param cacheName Cache name.
+ * @param topVer Affinity topology version.
* @param peekModes Cache peek modes.
*/
- private SizeCallable(String cacheName, CachePeekMode[] peekModes) {
- this.cacheName = cacheName;
+ private GlobalSizeCallable(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes, boolean nearEnable) {
+ super(cacheName, topVer);
+
this.peekModes = peekModes;
+ this.nearEnable = nearEnable;
}
/** {@inheritDoc} */
- @Override public Integer applyx(Object o) throws IgniteCheckedException {
- IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
-
- assert cache != null : cacheName;
+ @Override protected Object callLocal() {
+ try {
+ IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
- return cache.localSize(peekModes);
+ return cache == null ? 0 : cache.localSize(peekModes);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
}
/** {@inheritDoc} */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, cacheName);
+ @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) {
+ IgniteClusterEx cluster = ctx.grid().cluster();
- out.writeInt(peekModes.length);
+ ClusterGroup grp = nearEnable ? cluster.forCacheNodes(ctx.name(), true, true, false) : cluster.forDataNodes(ctx.name());
- for (int i = 0; i < peekModes.length; i++)
- U.writeEnum(out, peekModes[i]);
+ return grp.nodes();
}
/** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- cacheName = U.readString(in);
+ public String toString() {
+ return S.toString(GlobalSizeCallable.class, this);
+ }
+ }
- int len = in.readInt();
+ /**
+ * Cache size future.
+ */
+ private static class SizeFuture extends RetryFuture {
+ /** Size. */
+ private int size = 0;
- peekModes = new CachePeekMode[len];
+ /**
+ * @param peekModes Peek modes.
+ */
+ public SizeFuture(CachePeekMode[] peekModes, GridCacheContext ctx, boolean near) {
+ super(ctx, new GlobalSizeCallable(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes, near));
+ }
- for (int i = 0; i < len; i++)
- peekModes[i] = CachePeekMode.fromOrdinal(in.readByte());
+ /** {@inheritDoc} */
+ @Override protected void onInit() {
+ size = 0;
}
/** {@inheritDoc} */
- public String toString() {
- return S.toString(SizeCallable.class, this);
+ @Override protected void onLocal(Object localRes) {
+ size += (Integer)localRes;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void allDone() {
+ onDone(size);
}
}
/**
- * Internal callable which performs {@link IgniteInternalCache#size()} or {@link IgniteInternalCache#primarySize()}
- * operation on a cache with the given name.
+ * Cache clear future.
*/
- @GridInternal
- private static class GlobalSizeCallable implements IgniteClosure<Object, Integer>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
+ private static class ClearFuture extends RetryFuture {
+ /**
+ */
+ public ClearFuture(GridCacheContext ctx, TopologyVersionAwareCallable clearCall) {
+ super(ctx, clearCall);
+ }
- /** Cache name. */
- private String cacheName;
+ /** {@inheritDoc} */
+ @Override protected void onInit() {
+ // No-op.
+ }
- /** Primary only flag. */
- private boolean primaryOnly;
+ /** {@inheritDoc} */
+ @Override protected void onLocal(Object localRes) {
+ // No-op.
+ }
- /** Injected grid instance. */
- @IgniteInstanceResource
- private Ignite ignite;
+ /** {@inheritDoc} */
+ @Override protected void allDone() {
+ onDone();
+ }
+ }
+
+ /**
+ * Retry future.
+ */
+ protected static abstract class RetryFuture<T> extends GridFutureAdapter<T> {
+ /** Context. */
+ private final GridCacheContext ctx;
+
+ /** Callable. */
+ private final TopologyVersionAwareCallable call;
+
+ /** Max retries count before issuing an error. */
+ private volatile int retries = 32;
/**
- * Empty constructor for serialization.
*/
- public GlobalSizeCallable() {
- // No-op.
+ public RetryFuture(GridCacheContext ctx, TopologyVersionAwareCallable call) {
+ this.ctx = ctx;
+ this.call = call;
+
+ init();
}
/**
- * @param cacheName Cache name.
- * @param primaryOnly Primary only flag.
+ * Init.
*/
- private GlobalSizeCallable(String cacheName, boolean primaryOnly) {
- this.cacheName = cacheName;
- this.primaryOnly = primaryOnly;
- }
+ private void init() {
+ Collection<ClusterNode> nodes = call.nodes(ctx);
- /** {@inheritDoc} */
- @Override public Integer apply(Object o) {
- IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
+ call.topologyVersion(ctx.affinity().affinityTopologyVersion());
- return primaryOnly ? cache.primarySize() : cache.size();
- }
+ IgniteInternalFuture<Collection<Object>> fut = ctx.closures().callAsyncNoFailover(BROADCAST,
+ F.asSet((Callable<Object>)call), nodes, true);
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, cacheName);
- out.writeBoolean(primaryOnly);
- }
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<Object>>>() {
+ @Override public void apply(IgniteInternalFuture<Collection<Object>> fut) {
+ try {
+ Collection<Object> res = fut.get();
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- cacheName = U.readString(in);
- primaryOnly = in.readBoolean();
+ onInit();
+
+ for (Object locRes : res) {
+ if (locRes == FAIL) {
+ if (retries-- > 0)
+ init();
+ else {
+ onDone(new ClusterTopologyException("Failed to wait topology."));
+
+ return;
+ }
+ }
+
+ onLocal(locRes);
+ }
+
+ allDone();
+ }
+ catch (IgniteCheckedException e) {
+ if (X.hasCause(e, ClusterTopologyException.class)) {
+ if (retries-- > 0)
+ init();
+ else
+ onDone(e);
+ }
+ else
+ onDone(e);
+ }
+ }
+ });
}
+
+ /**
+ * Init reducer.
+ */
+ protected abstract void onInit();
+
+ /**
+ * @param localRes Add local result to global result.
+ */
+ protected abstract void onLocal(Object localRes);
+
+ /**
+ * On done.
+ */
+ protected abstract void allDone();
}
/**
@@ -5697,4 +5665,89 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
metrics.addPutAndGetTimeNanos(System.nanoTime() - start);
}
}
+
+ /**
+ * Delayed callable class.
+ */
+ protected static abstract class TopologyVersionAwareCallable<K, V> implements Serializable, Callable<Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Injected grid instance. */
+ @IgniteInstanceResource
+ protected Ignite ignite;
+
+ /** Affinity topology version. */
+ protected AffinityTopologyVersion topVer;
+
+ /** Cache name. */
+ protected String cacheName;
+
+ /**
+ * Empty constructor for serialization.
+ */
+ public TopologyVersionAwareCallable() {
+ // No-op.
+ }
+
+ /**
+ * @param topVer Affinity topology version.
+ */
+ public TopologyVersionAwareCallable(String cacheName, AffinityTopologyVersion topVer) {
+ this.cacheName = cacheName;
+ this.topVer = topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ if (!compareTopologyVersions())
+ return FAIL;
+
+ Object res = callLocal();
+
+ if (!compareTopologyVersions())
+ return FAIL;
+ else
+ return res;
+ }
+
+ /**
+ * Call local.
+ *
+ * @return Local result.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected abstract Object callLocal() throws IgniteCheckedException;
+
+ /**
+ * @param ctx Grid cache context.
+ * @return Nodes to call.
+ */
+ protected abstract Collection<ClusterNode> nodes(GridCacheContext ctx);
+
+ /**
+ * Compare topology versions.
+ */
+ public boolean compareTopologyVersions() {
+ GridCacheProcessor cacheProc = ((IgniteKernal) ignite).context().cache();
+
+ GridCacheAdapter<K, V> cacheAdapter = cacheProc.internalCache(cacheName);
+
+ if (cacheAdapter == null)
+ return false;
+
+ final GridCacheContext<K, V> ctx = cacheAdapter.context();
+
+ AffinityTopologyVersion locTopVer = ctx.affinity().affinityTopologyVersion();
+
+ return locTopVer.compareTo(topVer) == 0;
+ }
+
+ /**
+ * @param topVer Affinity topology version.
+ */
+ public void topologyVersion(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/139aa270/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c0026ab..77fa104 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -124,6 +124,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Must use JDK marshaller since it is used by discovery to fire custom events. */
private Marshaller marshaller = new JdkMarshaller();
+ /** Count down latch for caches. */
+ private CountDownLatch cacheStartedLatch = new CountDownLatch(1);
+
/**
* @param ctx Kernal context.
*/
@@ -657,87 +660,92 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void onKernalStart() throws IgniteCheckedException {
- if (ctx.config().isDaemon())
- return;
+ try {
+ if (ctx.config().isDaemon())
+ return;
- ClusterNode locNode = ctx.discovery().localNode();
+ ClusterNode locNode = ctx.discovery().localNode();
- // Init cache plugin managers.
- final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>();
+ // Init cache plugin managers.
+ final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>();
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- CacheConfiguration locCcfg = desc.cacheConfiguration();
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ CacheConfiguration locCcfg = desc.cacheConfiguration();
- CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg);
+ CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg);
- cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
- }
+ cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
+ }
- if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
- for (ClusterNode n : ctx.discovery().remoteNodes()) {
- checkTransactionConfiguration(n);
+ if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
+ for (ClusterNode n : ctx.discovery().remoteNodes()) {
+ checkTransactionConfiguration(n);
- DeploymentMode locDepMode = ctx.config().getDeploymentMode();
- DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
+ DeploymentMode locDepMode = ctx.config().getDeploymentMode();
+ DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
- CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
- locDepMode, rmtDepMode, true);
+ CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
+ locDepMode, rmtDepMode, true);
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
- if (rmtCfg != null) {
- CacheConfiguration locCfg = desc.cacheConfiguration();
+ if (rmtCfg != null) {
+ CacheConfiguration locCfg = desc.cacheConfiguration();
- checkCache(locCfg, rmtCfg, n);
+ checkCache(locCfg, rmtCfg, n);
- // Check plugin cache configurations.
- CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName());
+ // Check plugin cache configurations.
+ CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName());
- assert pluginMgr != null : " Map=" + cache2PluginMgr;
+ assert pluginMgr != null : " Map=" + cache2PluginMgr;
- pluginMgr.validateRemotes(rmtCfg, n);
+ pluginMgr.validateRemotes(rmtCfg, n);
+ }
}
}
}
- }
- // Start dynamic caches received from collect discovery data.
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- boolean started = desc.onStart();
+ // Start dynamic caches received from collect discovery data.
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ boolean started = desc.onStart();
- assert started : "Failed to change started flag for locally configured cache: " + desc;
+ assert started : "Failed to change started flag for locally configured cache: " + desc;
- desc.clearRemoteConfigurations();
+ desc.clearRemoteConfigurations();
- CacheConfiguration ccfg = desc.cacheConfiguration();
+ CacheConfiguration ccfg = desc.cacheConfiguration();
- IgnitePredicate filter = ccfg.getNodeFilter();
+ IgnitePredicate filter = ccfg.getNodeFilter();
- if (filter.apply(locNode)) {
- CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
+ if (filter.apply(locNode)) {
+ CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
- CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName());
+ CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName());
- assert pluginMgr != null : " Map=" + cache2PluginMgr;
+ assert pluginMgr != null : " Map=" + cache2PluginMgr;
- GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
+ GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
- ctx.dynamicDeploymentId(desc.deploymentId());
+ ctx.dynamicDeploymentId(desc.deploymentId());
- sharedCtx.addCacheContext(ctx);
+ sharedCtx.addCacheContext(ctx);
- GridCacheAdapter cache = ctx.cache();
+ GridCacheAdapter cache = ctx.cache();
- String name = ccfg.getName();
+ String name = ccfg.getName();
- caches.put(maskNull(name), cache);
+ caches.put(maskNull(name), cache);
- startCache(cache);
+ startCache(cache);
- jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
+ jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
+ }
}
}
+ finally {
+ cacheStartedLatch.countDown();
+ }
ctx.marshallerContext().onMarshallerCacheStarted(ctx);
@@ -835,6 +843,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void onKernalStop(boolean cancel) {
+ cacheStartedLatch.countDown();
+
if (ctx.config().isDaemon())
return;
@@ -2686,6 +2696,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (log.isDebugEnabled())
log.debug("Getting internal cache adapter: " + name);
+ try {
+ cacheStartedLatch.await();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException("Failed to wait starting caches.");
+ }
+
return (GridCacheAdapter<K, V>)caches.get(maskNull(name));
}