You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2017/03/17 00:20:18 UTC

[10/19] ignite git commit: IGNITE-4717 Fixed hangs in VisorCacheClearTask. (cherry picked from commit 76f3060)

IGNITE-4717 Fixed hangs in VisorCacheClearTask.
(cherry picked from commit 76f3060)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a30183ac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a30183ac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a30183ac

Branch: refs/heads/ignite-1192
Commit: a30183ac821507fbdaa6f0cc2c6ef25ca2677867
Parents: 5736247
Author: Andrey Novikov <an...@gridgain.com>
Authored: Mon Feb 20 18:23:33 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Thu Mar 2 10:47:15 2017 +0700

----------------------------------------------------------------------
 .../visor/cache/VisorCacheClearTask.java        | 88 +++++---------------
 .../visor/compute/VisorGatewayTask.java         | 30 ++++++-
 2 files changed, 49 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a30183ac/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
index 1f1a6fb..0c8476f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.visor.cache;
 
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.compute.ComputeJobContext;
 import org.apache.ignite.internal.processors.task.GridInternal;
@@ -26,7 +25,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.visor.VisorJob;
 import org.apache.ignite.internal.visor.VisorOneNodeTask;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.resources.JobContextResource;
@@ -90,17 +88,11 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
         }
 
         /**
-         * @param subJob Sub job to execute asynchronously.
+         * @param fut Future for asynchronous cache operation.
          * @param idx Index.
          * @return {@code true} If subJob was not completed and this job should be suspended.
          */
-        private boolean callAsync(IgniteCallable<Integer> subJob, int idx) {
-            IgniteCompute compute = ignite.compute(ignite.cluster().forCacheNodes(cacheName)).withAsync();
-
-            compute.call(subJob);
-
-            IgniteFuture<Integer> fut = compute.future();
-
+        private boolean callAsync(IgniteFuture<Integer> fut, int idx) {
             futs[idx] = fut;
 
             if (fut.isDone())
@@ -119,16 +111,28 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
                 futs = new IgniteFuture[3];
 
             if (futs[0] == null || futs[1] == null || futs[2] == null) {
-                IgniteCache cache = ignite.cache(cacheName);
+                IgniteCache cache = ignite.cache(cacheName).withAsync();
+
+                if (futs[0] == null) {
+                    cache.size(CachePeekMode.PRIMARY);
+
+                    if (callAsync(cache.<Integer>future(), 0))
+                        return null;
+                }
 
-                if (futs[0] == null && callAsync(new VisorCacheSizeCallable(cache), 0))
-                    return null;
+                if (futs[1] == null) {
+                    cache.clear();
 
-                if (futs[1] == null && callAsync(new VisorCacheClearCallable(cache), 1))
-                    return null;
+                    if (callAsync(cache.<Integer>future(), 1))
+                        return null;
+                }
+                
+                if (futs[2] == null) {
+                    cache.size(CachePeekMode.PRIMARY);
 
-                if (futs[2] == null && callAsync(new VisorCacheSizeCallable(cache), 2))
-                    return null;
+                    if (callAsync(cache.<Integer>future(), 2))
+                        return null;
+                }
             }
 
             assert futs[0].isDone() && futs[1].isDone() && futs[2].isDone();
@@ -141,54 +145,4 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
             return S.toString(VisorCacheClearJob.class, this);
         }
     }
-
-    /**
-     * Callable to get cache size.
-     */
-    @GridInternal
-    private static class VisorCacheSizeCallable implements IgniteCallable<Integer> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private final IgniteCache cache;
-
-        /**
-         * @param cache Cache to take size from.
-         */
-        private VisorCacheSizeCallable(IgniteCache cache) {
-            this.cache = cache;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Integer call() throws Exception {
-            return cache.size(CachePeekMode.PRIMARY);
-        }
-    }
-
-    /**
-     * Callable to clear cache.
-     */
-    @GridInternal
-    private static class VisorCacheClearCallable implements IgniteCallable<Integer> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private final IgniteCache cache;
-
-        /**
-         * @param cache Cache to clear.
-         */
-        private VisorCacheClearCallable(IgniteCache cache) {
-            this.cache = cache;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Integer call() throws Exception {
-            cache.clear();
-
-            return 0;
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a30183ac/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
index 2539a26..a64ec6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
@@ -29,21 +29,26 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobContext;
 import org.apache.ignite.compute.ComputeJobResult;
 import org.apache.ignite.compute.ComputeJobResultPolicy;
 import org.apache.ignite.compute.ComputeTask;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.visor.VisorTaskArgument;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.JobContextResource;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -101,9 +106,16 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> {
         @IgniteInstanceResource
         protected transient IgniteEx ignite;
 
+        /** Auto-inject job context. */
+        @JobContextResource
+        protected transient ComputeJobContext jobCtx;
+
         /** Arguments count. */
         private final int argsCnt;
 
+        /** Future for spawned task. */
+        private transient IgniteFuture fut;
+
         /**
          * Create job with specified argument.
          *
@@ -284,6 +296,9 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> {
         /** {@inheritDoc} */
         @SuppressWarnings("unchecked")
         @Override public Object execute() throws IgniteException {
+            if (fut != null)
+                return fut.get();
+
             String nidsArg = argument(0);
             String taskName = argument(1);
 
@@ -355,8 +370,19 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> {
                 }
             }
 
-            return ignite.compute(ignite.cluster().forNodeIds(nids))
-                .execute(taskName, new VisorTaskArgument<>(nids, jobArgs, false));
+            IgniteCompute comp = ignite.compute(ignite.cluster().forNodeIds(nids)).withAsync();
+            
+            comp.execute(taskName, new VisorTaskArgument<>(nids, jobArgs, false));
+
+            fut = comp.future();
+
+            fut.listen(new CI1<IgniteFuture<Object>>() {
+                @Override public void apply(IgniteFuture<Object> f) {
+                    jobCtx.callcc();
+                }
+            });
+
+            return jobCtx.holdcc();
         }
     }
 }