You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/02 11:31:19 UTC
[42/50] [abbrv] ignite git commit: IGNITE-4717 Fixed hangs in
VisorCacheClearTask.
IGNITE-4717 Fixed hangs in VisorCacheClearTask.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/76f30604
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/76f30604
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/76f30604
Branch: refs/heads/ignite-4565-ddl
Commit: 76f30604be3e4e724f3741d3d21545d4f89d2e49
Parents: 50620a7
Author: Andrey Novikov <an...@gridgain.com>
Authored: Mon Feb 20 18:23:33 2017 +0700
Committer: Andrey Novikov <an...@gridgain.com>
Committed: Mon Feb 20 18:23:33 2017 +0700
----------------------------------------------------------------------
.../visor/cache/VisorCacheClearTask.java | 88 +++++---------------
.../visor/compute/VisorGatewayTask.java | 30 ++++++-
2 files changed, 49 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/76f30604/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
index 1f1a6fb..0c8476f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.visor.cache;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCompute;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.internal.processors.task.GridInternal;
@@ -26,7 +25,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorOneNodeTask;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.resources.JobContextResource;
@@ -90,17 +88,11 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
}
/**
- * @param subJob Sub job to execute asynchronously.
+ * @param fut Future for asynchronous cache operation.
* @param idx Index.
* @return {@code true} If subJob was not completed and this job should be suspended.
*/
- private boolean callAsync(IgniteCallable<Integer> subJob, int idx) {
- IgniteCompute compute = ignite.compute(ignite.cluster().forCacheNodes(cacheName)).withAsync();
-
- compute.call(subJob);
-
- IgniteFuture<Integer> fut = compute.future();
-
+ private boolean callAsync(IgniteFuture<Integer> fut, int idx) {
futs[idx] = fut;
if (fut.isDone())
@@ -119,16 +111,28 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
futs = new IgniteFuture[3];
if (futs[0] == null || futs[1] == null || futs[2] == null) {
- IgniteCache cache = ignite.cache(cacheName);
+ IgniteCache cache = ignite.cache(cacheName).withAsync();
+
+ if (futs[0] == null) {
+ cache.size(CachePeekMode.PRIMARY);
+
+ if (callAsync(cache.<Integer>future(), 0))
+ return null;
+ }
- if (futs[0] == null && callAsync(new VisorCacheSizeCallable(cache), 0))
- return null;
+ if (futs[1] == null) {
+ cache.clear();
- if (futs[1] == null && callAsync(new VisorCacheClearCallable(cache), 1))
- return null;
+ if (callAsync(cache.<Integer>future(), 1))
+ return null;
+ }
+
+ if (futs[2] == null) {
+ cache.size(CachePeekMode.PRIMARY);
- if (futs[2] == null && callAsync(new VisorCacheSizeCallable(cache), 2))
- return null;
+ if (callAsync(cache.<Integer>future(), 2))
+ return null;
+ }
}
assert futs[0].isDone() && futs[1].isDone() && futs[2].isDone();
@@ -141,54 +145,4 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
return S.toString(VisorCacheClearJob.class, this);
}
}
-
- /**
- * Callable to get cache size.
- */
- @GridInternal
- private static class VisorCacheSizeCallable implements IgniteCallable<Integer> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final IgniteCache cache;
-
- /**
- * @param cache Cache to take size from.
- */
- private VisorCacheSizeCallable(IgniteCache cache) {
- this.cache = cache;
- }
-
- /** {@inheritDoc} */
- @Override public Integer call() throws Exception {
- return cache.size(CachePeekMode.PRIMARY);
- }
- }
-
- /**
- * Callable to clear cache.
- */
- @GridInternal
- private static class VisorCacheClearCallable implements IgniteCallable<Integer> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final IgniteCache cache;
-
- /**
- * @param cache Cache to clear.
- */
- private VisorCacheClearCallable(IgniteCache cache) {
- this.cache = cache;
- }
-
- /** {@inheritDoc} */
- @Override public Integer call() throws Exception {
- cache.clear();
-
- return 0;
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/76f30604/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
index 2539a26..a64ec6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
@@ -29,21 +29,26 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.visor.VisorTaskArgument;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.JobContextResource;
import org.jetbrains.annotations.Nullable;
/**
@@ -101,9 +106,16 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> {
@IgniteInstanceResource
protected transient IgniteEx ignite;
+ /** Auto-inject job context. */
+ @JobContextResource
+ protected transient ComputeJobContext jobCtx;
+
/** Arguments count. */
private final int argsCnt;
+ /** Future for spawned task. */
+ private transient IgniteFuture fut;
+
/**
* Create job with specified argument.
*
@@ -284,6 +296,9 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public Object execute() throws IgniteException {
+ if (fut != null)
+ return fut.get();
+
String nidsArg = argument(0);
String taskName = argument(1);
@@ -355,8 +370,19 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> {
}
}
- return ignite.compute(ignite.cluster().forNodeIds(nids))
- .execute(taskName, new VisorTaskArgument<>(nids, jobArgs, false));
+ IgniteCompute comp = ignite.compute(ignite.cluster().forNodeIds(nids)).withAsync();
+
+ comp.execute(taskName, new VisorTaskArgument<>(nids, jobArgs, false));
+
+ fut = comp.future();
+
+ fut.listen(new CI1<IgniteFuture<Object>>() {
+ @Override public void apply(IgniteFuture<Object> f) {
+ jobCtx.callcc();
+ }
+ });
+
+ return jobCtx.holdcc();
}
}
}