You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/05 17:26:37 UTC
[44/50] incubator-ignite git commit: Revert "#ignite-732:
IgniteCache.size() should not fail in case of topology changes."
Revert "#ignite-732: IgniteCache.size() should not fail in case of topology changes."
This reverts commit 139aa270ae61494c0757867f2dc531ec7251b1da.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c4bc9297
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c4bc9297
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c4bc9297
Branch: refs/heads/ignite-664
Commit: c4bc92974bace5e4cdb3ac9dc80790193e46d203
Parents: 281f4ef
Author: ivasilinets <iv...@gridgain.com>
Authored: Sat May 2 10:05:35 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Sat May 2 10:05:35 2015 +0300
----------------------------------------------------------------------
.../ignite/compute/ComputeTaskAdapter.java | 14 +-
.../processors/cache/GridCacheAdapter.java | 503 +++++++++----------
.../processors/cache/GridCacheProcessor.java | 109 ++--
3 files changed, 277 insertions(+), 349 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4bc9297/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
index 87081fc..c2ad198 100644
--- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
@@ -24,16 +24,15 @@ import java.util.*;
/**
* Convenience adapter for {@link ComputeTask} interface. Here is an example of
- * how {@code ComputeTaskAdapter} can be used:
+ * how {@code GridComputeTaskAdapter} can be used:
* <pre name="code" class="java">
- * public class MyFooBarTask extends ComputeTaskAdapter<String, String> {
+ * public class MyFooBarTask extends GridComputeTaskAdapter<String, String> {
* // Inject load balancer.
* @LoadBalancerResource
* ComputeLoadBalancer balancer;
*
* // Map jobs to grid nodes.
- * public Map<? extends ComputeJob, GridNode> map(List<GridNode> subgrid, String arg)
- * throws IgniteCheckedException {
+ * public Map<? extends ComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws IgniteCheckedException {
* Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size());
*
* // In more complex cases, you can actually do
@@ -77,8 +76,8 @@ public abstract class ComputeTaskAdapter<T, R> implements ComputeTask<T, R> {
* <p>
* If remote job resulted in exception ({@link ComputeJobResult#getException()} is not {@code null}),
* then {@link ComputeJobResultPolicy#FAILOVER} policy will be returned if the exception is instance
- * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link ComputeExecutionRejectedException},
- * which means that remote node either failed or job execution was rejected before it got a chance to start. In all
+ * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link ComputeExecutionRejectedException}, which means that
+ * remote node either failed or job execution was rejected before it got a chance to start. In all
* other cases the exception will be rethrown which will ultimately cause task to fail.
*
* @param res Received remote grid executable result.
@@ -88,8 +87,7 @@ public abstract class ComputeTaskAdapter<T, R> implements ComputeTask<T, R> {
* @throws IgniteException If handling a job result caused an error effectively rejecting
* a failover. This exception will be thrown out of {@link ComputeTaskFuture#get()} method.
*/
- @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd)
- throws IgniteException {
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
IgniteException e = res.getException();
// Try to failover if result is failed.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4bc9297/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 39f19b1..3f4e97b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -75,9 +75,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** */
private static final long serialVersionUID = 0L;
- /** Failed result. */
- private static final Object FAIL = new Integer(-1);
-
/** clearLocally() split threshold. */
public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000;
@@ -885,7 +882,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public Set<Cache.Entry<K, V>> entrySet() {
- return entrySet((CacheEntryPredicate[])null);
+ return entrySet((CacheEntryPredicate[]) null);
}
/** {@inheritDoc} */
@@ -900,17 +897,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public Set<K> keySet() {
- return keySet((CacheEntryPredicate[])null);
+ return keySet((CacheEntryPredicate[]) null);
}
/** {@inheritDoc} */
@Override public Set<K> primaryKeySet() {
- return primaryKeySet((CacheEntryPredicate[])null);
+ return primaryKeySet((CacheEntryPredicate[]) null);
}
/** {@inheritDoc} */
@Override public Collection<V> values() {
- return values((CacheEntryPredicate[])null);
+ return values((CacheEntryPredicate[]) null);
}
/**
@@ -1083,31 +1080,36 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public void clear() throws IgniteCheckedException {
- clearAll(0, new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion()));
+ // Clear local cache synchronously.
+ clearLocally();
+
+ clearRemotes(0, new GlobalClearAllCallable(name()));
}
/** {@inheritDoc} */
@Override public void clear(K key) throws IgniteCheckedException {
- clearAll(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(),
- Collections.singleton(key)));
+ // Clear local cache synchronously.
+ clearLocally(key);
+
+ clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key)));
}
/** {@inheritDoc} */
@Override public void clearAll(Set<? extends K> keys) throws IgniteCheckedException {
- clearAll(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(),
- keys));
+ // Clear local cache synchronously.
+ clearLocallyAll(keys);
+
+ clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), keys));
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> clearAsync(K key) {
- return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(),
- Collections.singleton(key)));
+ return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key)));
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) {
- return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(),
- keys));
+ return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), keys));
}
/**
@@ -1116,13 +1118,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param clearCall Global clear callable object.
* @throws IgniteCheckedException In case of cache could not be cleared on any of the nodes.
*/
- private void clearAll(long timeout, TopologyVersionAwareCallable clearCall) throws IgniteCheckedException {
+ private void clearRemotes(long timeout, GlobalClearCallable clearCall) throws IgniteCheckedException {
try {
+ // Send job to remote nodes only.
+ Collection<ClusterNode> nodes =
+ ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes();
+
IgniteInternalFuture<Object> fut = null;
- ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout);
+ if (!nodes.isEmpty()) {
+ ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout);
- fut = new ClearFuture(ctx, clearCall);
+ fut = ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true);
+ }
if (fut != null)
fut.get();
@@ -1141,18 +1149,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> clearAsync() {
- return clearAsync(new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion()));
+ return clearAsync(new GlobalClearAllCallable(name()));
}
/**
* @param clearCall Global clear callable object.
* @return Future.
*/
- private IgniteInternalFuture<?> clearAsync(TopologyVersionAwareCallable clearCall) {
+ private IgniteInternalFuture<?> clearAsync(GlobalClearCallable clearCall) {
Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name(), true, true, false).nodes();
if (!nodes.isEmpty()) {
- IgniteInternalFuture<Object> fut = new ClearFuture(ctx, clearCall);
+ IgniteInternalFuture<Object> fut =
+ ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true);
return fut.chain(new CX1<IgniteInternalFuture<Object>, Object>() {
@Override public Object applyx(IgniteInternalFuture<Object> fut) throws IgniteCheckedException {
@@ -2108,7 +2117,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
throws IgniteCheckedException {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys,
new C1<K, EntryProcessor<K, V, Object>>() {
- @Override public EntryProcessor apply(K k) {
+ @Override public EntryProcessor apply(K k) {
return entryProcessor;
}
});
@@ -2136,7 +2145,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp() {
@Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
- Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor);
+ Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor);
return tx.invokeAsync(ctx, invokeMap, args);
}
@@ -2362,7 +2371,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
@Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray())
- .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
+ .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
}
@Override public String toString() {
@@ -2517,7 +2526,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return asyncOp(new AsyncOp<Boolean>() {
@Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
+ (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
@Override public String toString() {
@@ -2906,7 +2915,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(oldVal);
- return tx.putAllAsync(ctx,
+ return (GridCacheReturn) tx.putAllAsync(ctx,
F.t(key, newVal),
true,
null,
@@ -3008,7 +3017,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.deploy().registerClass(val);
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false,
- ctx.equalsValArray(val)).get().success();
+ ctx.equalsValArray(val)).get().success();
}
@Override public String toString() {
@@ -3221,10 +3230,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
TransactionConfiguration cfg = ctx.gridConfig().getTransactionConfiguration();
return txStart(
- concurrency,
- isolation,
- cfg.getDefaultTxTimeout(),
- 0
+ concurrency,
+ isolation,
+ cfg.getDefaultTxTimeout(),
+ 0
);
}
@@ -3567,7 +3576,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (nodes.isEmpty())
return new GridFinishedFuture<>(0);
- return new SizeFuture(peekModes, ctx, modes.near);
+ IgniteInternalFuture<Collection<Integer>> fut =
+ ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), peekModes), null, nodes);
+
+ return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>, Integer>() {
+ @Override public Integer applyx(IgniteInternalFuture<Collection<Integer>> fut)
+ throws IgniteCheckedException {
+ Collection<Integer> res = fut.get();
+
+ int totalSize = 0;
+
+ for (Integer size : res)
+ totalSize += size;
+
+ return totalSize;
+ }
+ });
}
/** {@inheritDoc} */
@@ -3651,7 +3675,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return F.iterator(iterator(),
new IgniteClosure<Cache.Entry<K, V>, Cache.Entry<K, V>>() {
private IgniteCacheExpiryPolicy expiryPlc =
- ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
+ ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
@Override public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) {
CacheOperationContext prev = ctx.gate().enter(opCtx);
@@ -3885,6 +3909,50 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ * Gets cache global size (with or without backups).
+ *
+ * @param primaryOnly {@code True} if only primary sizes should be included.
+ * @return Global size.
+ * @throws IgniteCheckedException If internal task execution failed.
+ */
+ private int globalSize(boolean primaryOnly) throws IgniteCheckedException {
+ try {
+ // Send job to remote nodes only.
+ Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes();
+
+ IgniteInternalFuture<Collection<Integer>> fut = null;
+
+ if (!nodes.isEmpty()) {
+ ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, gridCfg.getNetworkTimeout());
+
+ fut = ctx.closures().broadcastNoFailover(new GlobalSizeCallable(name(), primaryOnly), null, nodes);
+ }
+
+ // Get local value.
+ int globalSize = primaryOnly ? primarySize() : size();
+
+ if (fut != null) {
+ for (Integer i : fut.get())
+ globalSize += i;
+ }
+
+ return globalSize;
+ }
+ catch (ClusterGroupEmptyCheckedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]");
+
+ return primaryOnly ? primarySize() : size();
+ }
+ catch (ComputeTaskTimeoutCheckedException e) {
+ U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " +
+ "'networkTimeout' configuration property) [cacheName=" + name() + "]");
+
+ throw e;
+ }
+ }
+
+ /**
* @param op Cache operation.
* @param <T> Return type.
* @return Operation result.
@@ -4825,10 +4893,47 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ * Internal callable which performs clear operation on a cache with the given name.
+ */
+ @GridInternal
+ private static abstract class GlobalClearCallable implements Callable<Object>, Externalizable {
+ /** Cache name. */
+ protected String cacheName;
+
+ /** Injected grid instance. */
+ @IgniteInstanceResource
+ protected Ignite ignite;
+
+ /**
+ * Empty constructor for serialization.
+ */
+ public GlobalClearCallable() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheName Cache name.
+ */
+ protected GlobalClearCallable(String cacheName) {
+ this.cacheName = cacheName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, cacheName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ cacheName = U.readString(in);
+ }
+ }
+
+ /**
* Global clear all.
*/
@GridInternal
- private static class GlobalClearAllCallable extends TopologyVersionAwareCallable {
+ private static class GlobalClearAllCallable extends GlobalClearCallable {
/** */
private static final long serialVersionUID = 0L;
@@ -4841,30 +4946,24 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* @param cacheName Cache name.
- * @param topVer Affinity topology version.
*/
- private GlobalClearAllCallable(String cacheName, AffinityTopologyVersion topVer) {
- super(cacheName, topVer);
+ private GlobalClearAllCallable(String cacheName) {
+ super(cacheName);
}
/** {@inheritDoc} */
- @Override protected Object callLocal() {
+ @Override public Object call() throws Exception {
((IgniteEx)ignite).cachex(cacheName).clearLocally();
return null;
}
-
- /** {@inheritDoc} */
- @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) {
- return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).nodes();
- }
}
/**
* Global clear keys.
*/
@GridInternal
- private static class GlobalClearKeySetCallable<K, V> extends TopologyVersionAwareCallable {
+ private static class GlobalClearKeySetCallable<K, V> extends GlobalClearCallable {
/** */
private static final long serialVersionUID = 0L;
@@ -4880,25 +4979,33 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* @param cacheName Cache name.
- * @param topVer Affinity topology version.
* @param keys Keys to clear.
*/
- protected GlobalClearKeySetCallable(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) {
- super(cacheName, topVer);
+ private GlobalClearKeySetCallable(String cacheName, Set<? extends K> keys) {
+ super(cacheName);
this.keys = keys;
}
/** {@inheritDoc} */
- @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) {
- return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).nodes();
+ @Override public Object call() throws Exception {
+ ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys);
+
+ return null;
}
/** {@inheritDoc} */
- @Override protected Object callLocal() {
- ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys);
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
- return null;
+ out.writeObject(keys);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ keys = (Set<K>) in.readObject();
}
}
@@ -4906,202 +5013,127 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* Internal callable for global size calculation.
*/
@GridInternal
- private static class GlobalSizeCallable extends TopologyVersionAwareCallable {
+ private static class SizeCallable extends IgniteClosureX<Object, Integer> implements Externalizable {
/** */
private static final long serialVersionUID = 0L;
+ /** Cache name. */
+ private String cacheName;
+
/** Peek modes. */
private CachePeekMode[] peekModes;
- /** Near enable. */
- private boolean nearEnable;
+ /** Injected grid instance. */
+ @IgniteInstanceResource
+ private Ignite ignite;
/**
* Required by {@link Externalizable}.
*/
- public GlobalSizeCallable() {
+ public SizeCallable() {
// No-op.
}
/**
* @param cacheName Cache name.
- * @param topVer Affinity topology version.
* @param peekModes Cache peek modes.
*/
- private GlobalSizeCallable(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes, boolean nearEnable) {
- super(cacheName, topVer);
-
+ private SizeCallable(String cacheName, CachePeekMode[] peekModes) {
+ this.cacheName = cacheName;
this.peekModes = peekModes;
- this.nearEnable = nearEnable;
- }
-
- /** {@inheritDoc} */
- @Override protected Object callLocal() {
- try {
- IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
-
- return cache == null ? 0 : cache.localSize(peekModes);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
}
/** {@inheritDoc} */
- @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) {
- IgniteClusterEx cluster = ctx.grid().cluster();
+ @Override public Integer applyx(Object o) throws IgniteCheckedException {
+ IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
- ClusterGroup grp = nearEnable ? cluster.forCacheNodes(ctx.name(), true, true, false) : cluster.forDataNodes(ctx.name());
+ assert cache != null : cacheName;
- return grp.nodes();
+ return cache.localSize(peekModes);
}
/** {@inheritDoc} */
- public String toString() {
- return S.toString(GlobalSizeCallable.class, this);
- }
- }
-
- /**
- * Cache size future.
- */
- private static class SizeFuture extends RetryFuture {
- /** Size. */
- private int size = 0;
-
- /**
- * @param peekModes Peek modes.
- */
- public SizeFuture(CachePeekMode[] peekModes, GridCacheContext ctx, boolean near) {
- super(ctx, new GlobalSizeCallable(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes, near));
- }
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, cacheName);
- /** {@inheritDoc} */
- @Override protected void onInit() {
- size = 0;
- }
+ out.writeInt(peekModes.length);
- /** {@inheritDoc} */
- @Override protected void onLocal(Object localRes) {
- size += (Integer)localRes;
+ for (int i = 0; i < peekModes.length; i++)
+ U.writeEnum(out, peekModes[i]);
}
/** {@inheritDoc} */
- @Override protected void allDone() {
- onDone(size);
- }
- }
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ cacheName = U.readString(in);
- /**
- * Cache clear future.
- */
- private static class ClearFuture extends RetryFuture {
- /**
- */
- public ClearFuture(GridCacheContext ctx, TopologyVersionAwareCallable clearCall) {
- super(ctx, clearCall);
- }
+ int len = in.readInt();
- /** {@inheritDoc} */
- @Override protected void onInit() {
- // No-op.
- }
+ peekModes = new CachePeekMode[len];
- /** {@inheritDoc} */
- @Override protected void onLocal(Object localRes) {
- // No-op.
+ for (int i = 0; i < len; i++)
+ peekModes[i] = CachePeekMode.fromOrdinal(in.readByte());
}
/** {@inheritDoc} */
- @Override protected void allDone() {
- onDone();
+ public String toString() {
+ return S.toString(SizeCallable.class, this);
}
}
/**
- * Retry future.
+ * Internal callable which performs {@link IgniteInternalCache#size()} or {@link IgniteInternalCache#primarySize()}
+ * operation on a cache with the given name.
*/
- protected static abstract class RetryFuture<T> extends GridFutureAdapter<T> {
- /** Context. */
- private final GridCacheContext ctx;
+ @GridInternal
+ private static class GlobalSizeCallable implements IgniteClosure<Object, Integer>, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
- /** Callable. */
- private final TopologyVersionAwareCallable call;
+ /** Cache name. */
+ private String cacheName;
- /** Max retries count before issuing an error. */
- private volatile int retries = 32;
+ /** Primary only flag. */
+ private boolean primaryOnly;
+
+ /** Injected grid instance. */
+ @IgniteInstanceResource
+ private Ignite ignite;
/**
+ * Empty constructor for serialization.
*/
- public RetryFuture(GridCacheContext ctx, TopologyVersionAwareCallable call) {
- this.ctx = ctx;
- this.call = call;
-
- init();
+ public GlobalSizeCallable() {
+ // No-op.
}
/**
- * Init.
+ * @param cacheName Cache name.
+ * @param primaryOnly Primary only flag.
*/
- private void init() {
- Collection<ClusterNode> nodes = call.nodes(ctx);
-
- call.topologyVersion(ctx.affinity().affinityTopologyVersion());
-
- IgniteInternalFuture<Collection<Object>> fut = ctx.closures().callAsyncNoFailover(BROADCAST,
- F.asSet((Callable<Object>)call), nodes, true);
-
- fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<Object>>>() {
- @Override public void apply(IgniteInternalFuture<Collection<Object>> fut) {
- try {
- Collection<Object> res = fut.get();
-
- onInit();
-
- for (Object locRes : res) {
- if (locRes == FAIL) {
- if (retries-- > 0)
- init();
- else {
- onDone(new ClusterTopologyException("Failed to wait topology."));
-
- return;
- }
- }
+ private GlobalSizeCallable(String cacheName, boolean primaryOnly) {
+ this.cacheName = cacheName;
+ this.primaryOnly = primaryOnly;
+ }
- onLocal(locRes);
- }
+ /** {@inheritDoc} */
+ @Override public Integer apply(Object o) {
+ IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
- allDone();
- }
- catch (IgniteCheckedException e) {
- if (X.hasCause(e, ClusterTopologyException.class)) {
- if (retries-- > 0)
- init();
- else
- onDone(e);
- }
- else
- onDone(e);
- }
- }
- });
+ return primaryOnly ? cache.primarySize() : cache.size();
}
- /**
- * Init reducer.
- */
- protected abstract void onInit();
-
- /**
- * @param localRes Add local result to global result.
- */
- protected abstract void onLocal(Object localRes);
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, cacheName);
+ out.writeBoolean(primaryOnly);
+ }
- /**
- * On done.
- */
- protected abstract void allDone();
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ cacheName = U.readString(in);
+ primaryOnly = in.readBoolean();
+ }
}
/**
@@ -5665,89 +5697,4 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
metrics.addPutAndGetTimeNanos(System.nanoTime() - start);
}
}
-
- /**
- * Delayed callable class.
- */
- protected static abstract class TopologyVersionAwareCallable<K, V> implements Serializable, Callable<Object> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Injected grid instance. */
- @IgniteInstanceResource
- protected Ignite ignite;
-
- /** Affinity topology version. */
- protected AffinityTopologyVersion topVer;
-
- /** Cache name. */
- protected String cacheName;
-
- /**
- * Empty constructor for serialization.
- */
- public TopologyVersionAwareCallable() {
- // No-op.
- }
-
- /**
- * @param topVer Affinity topology version.
- */
- public TopologyVersionAwareCallable(String cacheName, AffinityTopologyVersion topVer) {
- this.cacheName = cacheName;
- this.topVer = topVer;
- }
-
- /** {@inheritDoc} */
- @Override public Object call() throws Exception {
- if (!compareTopologyVersions())
- return FAIL;
-
- Object res = callLocal();
-
- if (!compareTopologyVersions())
- return FAIL;
- else
- return res;
- }
-
- /**
- * Call local.
- *
- * @return Local result.
- * @throws IgniteCheckedException If failed.
- */
- protected abstract Object callLocal() throws IgniteCheckedException;
-
- /**
- * @param ctx Grid cache context.
- * @return Nodes to call.
- */
- protected abstract Collection<ClusterNode> nodes(GridCacheContext ctx);
-
- /**
- * Compare topology versions.
- */
- public boolean compareTopologyVersions() {
- GridCacheProcessor cacheProc = ((IgniteKernal) ignite).context().cache();
-
- GridCacheAdapter<K, V> cacheAdapter = cacheProc.internalCache(cacheName);
-
- if (cacheAdapter == null)
- return false;
-
- final GridCacheContext<K, V> ctx = cacheAdapter.context();
-
- AffinityTopologyVersion locTopVer = ctx.affinity().affinityTopologyVersion();
-
- return locTopVer.compareTo(topVer) == 0;
- }
-
- /**
- * @param topVer Affinity topology version.
- */
- public void topologyVersion(AffinityTopologyVersion topVer) {
- this.topVer = topVer;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4bc9297/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 77fa104..c0026ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -124,9 +124,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Must use JDK marshaller since it is used by discovery to fire custom events. */
private Marshaller marshaller = new JdkMarshaller();
- /** Count down latch for caches. */
- private CountDownLatch cacheStartedLatch = new CountDownLatch(1);
-
/**
* @param ctx Kernal context.
*/
@@ -660,92 +657,87 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void onKernalStart() throws IgniteCheckedException {
- try {
- if (ctx.config().isDaemon())
- return;
+ if (ctx.config().isDaemon())
+ return;
- ClusterNode locNode = ctx.discovery().localNode();
+ ClusterNode locNode = ctx.discovery().localNode();
- // Init cache plugin managers.
- final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>();
+ // Init cache plugin managers.
+ final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>();
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- CacheConfiguration locCcfg = desc.cacheConfiguration();
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ CacheConfiguration locCcfg = desc.cacheConfiguration();
- CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg);
+ CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg);
- cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
- }
+ cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
+ }
- if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
- for (ClusterNode n : ctx.discovery().remoteNodes()) {
- checkTransactionConfiguration(n);
+ if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
+ for (ClusterNode n : ctx.discovery().remoteNodes()) {
+ checkTransactionConfiguration(n);
- DeploymentMode locDepMode = ctx.config().getDeploymentMode();
- DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
+ DeploymentMode locDepMode = ctx.config().getDeploymentMode();
+ DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
- CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
- locDepMode, rmtDepMode, true);
+ CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
+ locDepMode, rmtDepMode, true);
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
- if (rmtCfg != null) {
- CacheConfiguration locCfg = desc.cacheConfiguration();
+ if (rmtCfg != null) {
+ CacheConfiguration locCfg = desc.cacheConfiguration();
- checkCache(locCfg, rmtCfg, n);
+ checkCache(locCfg, rmtCfg, n);
- // Check plugin cache configurations.
- CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName());
+ // Check plugin cache configurations.
+ CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName());
- assert pluginMgr != null : " Map=" + cache2PluginMgr;
+ assert pluginMgr != null : " Map=" + cache2PluginMgr;
- pluginMgr.validateRemotes(rmtCfg, n);
- }
+ pluginMgr.validateRemotes(rmtCfg, n);
}
}
}
+ }
- // Start dynamic caches received from collect discovery data.
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- boolean started = desc.onStart();
+ // Start dynamic caches received from collect discovery data.
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ boolean started = desc.onStart();
- assert started : "Failed to change started flag for locally configured cache: " + desc;
+ assert started : "Failed to change started flag for locally configured cache: " + desc;
- desc.clearRemoteConfigurations();
+ desc.clearRemoteConfigurations();
- CacheConfiguration ccfg = desc.cacheConfiguration();
+ CacheConfiguration ccfg = desc.cacheConfiguration();
- IgnitePredicate filter = ccfg.getNodeFilter();
+ IgnitePredicate filter = ccfg.getNodeFilter();
- if (filter.apply(locNode)) {
- CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
+ if (filter.apply(locNode)) {
+ CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
- CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName());
+ CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName());
- assert pluginMgr != null : " Map=" + cache2PluginMgr;
+ assert pluginMgr != null : " Map=" + cache2PluginMgr;
- GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
+ GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
- ctx.dynamicDeploymentId(desc.deploymentId());
+ ctx.dynamicDeploymentId(desc.deploymentId());
- sharedCtx.addCacheContext(ctx);
+ sharedCtx.addCacheContext(ctx);
- GridCacheAdapter cache = ctx.cache();
+ GridCacheAdapter cache = ctx.cache();
- String name = ccfg.getName();
+ String name = ccfg.getName();
- caches.put(maskNull(name), cache);
+ caches.put(maskNull(name), cache);
- startCache(cache);
+ startCache(cache);
- jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
- }
+ jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
}
}
- finally {
- cacheStartedLatch.countDown();
- }
ctx.marshallerContext().onMarshallerCacheStarted(ctx);
@@ -843,8 +835,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void onKernalStop(boolean cancel) {
- cacheStartedLatch.countDown();
-
if (ctx.config().isDaemon())
return;
@@ -2696,13 +2686,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (log.isDebugEnabled())
log.debug("Getting internal cache adapter: " + name);
- try {
- cacheStartedLatch.await();
- }
- catch (InterruptedException e) {
- throw new IgniteException("Failed to wait starting caches.");
- }
-
return (GridCacheAdapter<K, V>)caches.get(maskNull(name));
}