You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2017/02/09 10:05:33 UTC
[10/12] ignite git commit: IGNITE-4676 Fixed hang if closure executed
nested internal task with continuation. Added test.
IGNITE-4676 Fixed hang if closure executed nested internal task with continuation. Added test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e7a53079
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e7a53079
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e7a53079
Branch: refs/heads/ignite-4436-2
Commit: e7a5307911dff1df93d003b2ef98aeca96a89dac
Parents: db5da76
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Thu Feb 9 16:44:41 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Thu Feb 9 16:44:41 2017 +0700
----------------------------------------------------------------------
.../internal/processors/job/GridJobWorker.java | 4 +
.../internal/GridContinuousTaskSelfTest.java | 79 ++++++++++++++++++++
2 files changed, 83 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7a53079/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 6a00d96..acefde7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -617,6 +617,10 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
// Finish here only if not held by this thread.
if (!HOLD.get())
finishJob(res, ex, sndRes);
+ else
+ // Make sure flag is not set for current thread.
+ // This may happen in case of nested internal task call with continuation.
+ HOLD.set(false);
ctx.job().currentTaskSession(null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7a53079/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
index 88667d9..d224fc5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
@@ -21,10 +21,12 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.Callable;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
@@ -43,7 +45,9 @@ import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.compute.ComputeTaskSessionAttributeListener;
import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
+import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.JobContextResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.resources.TaskContinuousMapperResource;
@@ -51,6 +55,7 @@ import org.apache.ignite.resources.TaskSessionResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
+import org.jetbrains.annotations.Nullable;
/**
* Continuous task test.
@@ -195,6 +200,80 @@ public class GridContinuousTaskSelfTest extends GridCommonAbstractTest {
}
}
+ /**
+ * @throws Exception If test failed.
+ */
+ public void testClosureWithNestedInternalTask() throws Exception {
+ try {
+ IgniteEx ignite = startGrid(0);
+
+ ComputeTaskInternalFuture<String> fut = ignite.context().closure().callAsync(GridClosureCallMode.BALANCE, new Callable<String>() {
+ /** */
+ @IgniteInstanceResource
+ private IgniteEx g;
+
+ @Override public String call() throws Exception {
+ return g.compute(g.cluster()).execute(NestedHoldccTask.class, null);
+ }
+ }, ignite.cluster().nodes());
+
+ assertEquals("DONE", fut.get(3000));
+ }
+ finally {
+ stopGrid(0, true);
+ }
+ }
+
+ /** Test task with continuation. */
+ @GridInternal
+ public static class NestedHoldccTask extends ComputeTaskAdapter<String, String> {
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable String arg) throws IgniteException {
+ Map<ComputeJob, ClusterNode> map = new HashMap<>();
+
+ for (ClusterNode node : subgrid)
+ map.put(new NestedHoldccJob(), node);
+
+ return map;
+
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public String reduce(List<ComputeJobResult> results) throws IgniteException {
+ return results.get(0).getData();
+ }
+ }
+
+ /** Test job. */
+ public static class NestedHoldccJob extends ComputeJobAdapter {
+ /** */
+ @JobContextResource
+ private ComputeJobContext jobCtx;
+
+ /** */
+ private int cnt = 0;
+
+ /** {@inheritDoc} */
+ @Override public Object execute() throws IgniteException {
+ if (cnt < 1) {
+ cnt++;
+
+ jobCtx.holdcc();
+
+ new Timer().schedule(new TimerTask() {
+ @Override public void run() {
+ jobCtx.callcc();
+ }
+ }, 500);
+
+ return "NOT DONE";
+ }
+
+ return "DONE";
+ }
+ }
+
/** */
@SuppressWarnings({"PublicInnerClass"})
public static class TestMultipleHoldccCallsClosure implements IgniteClosure<Object, Boolean> {