You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/02/13 11:30:05 UTC

[25/40] ignite git commit: IGNITE-4676 Fixed hang if closure executed nested internal task with continuation. Added test.

IGNITE-4676 Fixed hang if closure executed nested internal task with continuation. Added test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e7a53079
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e7a53079
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e7a53079

Branch: refs/heads/ignite-2.0
Commit: e7a5307911dff1df93d003b2ef98aeca96a89dac
Parents: db5da76
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Thu Feb 9 16:44:41 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Thu Feb 9 16:44:41 2017 +0700

----------------------------------------------------------------------
 .../internal/processors/job/GridJobWorker.java  |  4 +
 .../internal/GridContinuousTaskSelfTest.java    | 79 ++++++++++++++++++++
 2 files changed, 83 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e7a53079/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 6a00d96..acefde7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -617,6 +617,10 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
                 // Finish here only if not held by this thread.
                 if (!HOLD.get())
                     finishJob(res, ex, sndRes);
+                else
+                    // Make sure flag is not set for current thread.
+                    // This may happen in case of nested internal task call with continuation.
+                    HOLD.set(false);
 
                 ctx.job().currentTaskSession(null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7a53079/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
index 88667d9..d224fc5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
@@ -21,10 +21,12 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.Callable;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteException;
@@ -43,7 +45,9 @@ import org.apache.ignite.compute.ComputeTaskSession;
 import org.apache.ignite.compute.ComputeTaskSessionAttributeListener;
 import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
 import org.apache.ignite.compute.ComputeTaskSplitAdapter;
+import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.JobContextResource;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.resources.TaskContinuousMapperResource;
@@ -51,6 +55,7 @@ import org.apache.ignite.resources.TaskSessionResource;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.common.GridCommonTest;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Continuous task test.
@@ -195,6 +200,80 @@ public class GridContinuousTaskSelfTest extends GridCommonAbstractTest {
         }
     }
 
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testClosureWithNestedInternalTask() throws Exception {
+        try {
+            IgniteEx ignite = startGrid(0);
+
+            ComputeTaskInternalFuture<String> fut = ignite.context().closure().callAsync(GridClosureCallMode.BALANCE, new Callable<String>() {
+                /** */
+                @IgniteInstanceResource
+                private IgniteEx g;
+
+                @Override public String call() throws Exception {
+                    return g.compute(g.cluster()).execute(NestedHoldccTask.class, null);
+                }
+            }, ignite.cluster().nodes());
+
+            assertEquals("DONE", fut.get(3000));
+        }
+        finally {
+            stopGrid(0, true);
+        }
+    }
+
+    /** Test task with continuation. */
+    @GridInternal
+    public static class NestedHoldccTask extends ComputeTaskAdapter<String, String> {
+        /** {@inheritDoc} */
+        @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+            @Nullable String arg) throws IgniteException {
+            Map<ComputeJob, ClusterNode> map = new HashMap<>();
+
+            for (ClusterNode node : subgrid)
+                map.put(new NestedHoldccJob(), node);
+
+            return map;
+
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String reduce(List<ComputeJobResult> results) throws IgniteException {
+            return results.get(0).getData();
+        }
+    }
+
+    /** Test job. */
+    public static class NestedHoldccJob extends ComputeJobAdapter {
+        /** */
+        @JobContextResource
+        private ComputeJobContext jobCtx;
+
+        /** */
+        private int cnt = 0;
+
+        /** {@inheritDoc} */
+        @Override public Object execute() throws IgniteException {
+            if (cnt < 1) {
+                cnt++;
+
+                jobCtx.holdcc();
+
+                new Timer().schedule(new TimerTask() {
+                    @Override public void run() {
+                        jobCtx.callcc();
+                    }
+                }, 500);
+
+                return "NOT DONE";
+            }
+
+            return "DONE";
+        }
+    }
+
     /** */
     @SuppressWarnings({"PublicInnerClass"})
     public static class TestMultipleHoldccCallsClosure implements IgniteClosure<Object, Boolean> {