You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/17 12:45:09 UTC

ignite git commit: ignite-1.5 Properly handle duplicated job responses in GridTaskWorker.onResponse.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1537 0bc2e3233 -> 9f40fae17


ignite-1.5 Properly handle duplicated job responses in GridTaskWorker.onResponse.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9f40fae1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9f40fae1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9f40fae1

Branch: refs/heads/ignite-1537
Commit: 9f40fae17d4c1679b79c080d7df2091b08c2c333
Parents: 0bc2e32
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 17 14:44:59 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 17 14:44:59 2015 +0300

----------------------------------------------------------------------
 .../processors/task/GridTaskWorker.java         |   8 +-
 .../ignite/internal/TaskNodeRestartTest.java    | 225 +++++++++++++++++++
 .../IgniteCacheSizeFailoverTest.java            |   2 +-
 .../testsuites/IgniteComputeGridTestSuite.java  |   2 +
 4 files changed, 234 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9f40fae1/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 9315d7c..59d3f90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -696,8 +696,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         if (log.isDebugEnabled())
                             U.warn(log, "Received response for unknown child job (was job presumed failed?): " + res);
 
-                        selfOccupied = true;
+                        res = delayedRess.poll();
 
+                        // We can not return here because there can be more delayed messages in the queue.
                         continue;
                     }
 
@@ -708,7 +709,10 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         if (log.isDebugEnabled())
                             log.debug("Received redundant response for a job (will ignore): " + res);
 
-                        return;
+                        res = delayedRess.poll();
+
+                        // We can not return here because there can be more delayed messages in the queue.
+                        continue;
                     }
 
                     if (!jobRes.getNode().id().equals(res.getNodeId())) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f40fae1/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java
new file mode 100644
index 0000000..323bfaf
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class TaskNodeRestartTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 3;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTaskNodeRestart() throws Exception {
+        final AtomicBoolean finished = new AtomicBoolean();
+
+        final AtomicInteger stopIdx = new AtomicInteger();
+
+        IgniteInternalFuture<?> restartFut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int idx = stopIdx.getAndIncrement();
+
+                int node = NODES + idx;
+
+                while (!finished.get()) {
+                    log.info("Start node: " + node);
+
+                    startGrid(node);
+
+                    U.sleep(300);
+
+                    log.info("Stop node: " + node);
+
+                    stopGrid(node);
+                }
+
+                return null;
+            }
+        }, 2, "stop-thread");
+
+        try {
+            final long stopTime = System.currentTimeMillis() + 60_000;
+
+            final AtomicInteger idx = new AtomicInteger();
+
+            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    int node = idx.getAndIncrement() % NODES;
+
+                    Ignite ignite = ignite(node);
+
+                    log.info("Start thread: " + ignite.name());
+
+                    IgniteCompute compute = ignite.compute();
+
+                    while (U.currentTimeMillis() < stopTime) {
+                        try {
+                            compute.broadcast(new TestCallable());
+
+                            compute.call(new TestCallable());
+
+                            compute.execute(new TestTask1(), null);
+
+                            compute.execute(new TestTask2(), null);
+                        }
+                        catch (IgniteException e) {
+                            log.info("Error: " + e);
+                        }
+                    }
+
+                    return null;
+                }
+            }, 20, "test-thread");
+
+            fut.get(90_000);
+
+            finished.set(true);
+
+            restartFut.get();
+        }
+        finally {
+            finished.set(true);
+
+            restartFut.get(5000);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestTask1 extends ComputeTaskAdapter<Void, Void> {
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Void arg)
+            throws IgniteException {
+            Map<TestJob, ClusterNode> jobs = new HashMap<>();
+
+            for (ClusterNode node : subgrid)
+                jobs.put(new TestJob(), node);
+
+            return jobs;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestTask2 implements ComputeTask<Void, Void> {
+        /** {@inheritDoc} */
+        @Nullable public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Void arg)
+            throws IgniteException {
+            Map<TestJob, ClusterNode> jobs = new HashMap<>();
+
+            for (ClusterNode node : subgrid)
+                jobs.put(new TestJob(), node);
+
+            return jobs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+            return ComputeJobResultPolicy.WAIT;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Void reduce(List<ComputeJobResult> results) {
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestJob extends ComputeJobAdapter {
+        /** {@inheritDoc} */
+        @Override public Object execute() throws IgniteException {
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestCallable implements IgniteCallable<Void> {
+        /** {@inheritDoc} */
+        @Nullable @Override public Void call() throws Exception {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f40fae1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java
index 1738a0d..8513620 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java
@@ -97,7 +97,7 @@ public class IgniteCacheSizeFailoverTest extends GridCommonAbstractTest {
 
                 return null;
             }
-        }, 2, "size-thread");
+        }, 10, "size-thread");
 
         try {
             for (int i = 0; i < 10; i++) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f40fae1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 23f2edc..e2c7e26 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.IgniteComputeEmptyClusterGroupTest;
 import org.apache.ignite.internal.IgniteComputeTopologyExceptionTest;
 import org.apache.ignite.internal.IgniteExecutorServiceTest;
 import org.apache.ignite.internal.IgniteExplicitImplicitDeploymentSelfTest;
+import org.apache.ignite.internal.TaskNodeRestartTest;
 import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManagerSelfTest;
 import org.apache.ignite.internal.managers.checkpoint.GridCheckpointTaskSelfTest;
 import org.apache.ignite.internal.managers.communication.GridCommunicationManagerListenersSelfTest;
@@ -144,6 +145,7 @@ public class IgniteComputeGridTestSuite {
         suite.addTestSuite(IgniteComputeEmptyClusterGroupTest.class);
         suite.addTestSuite(IgniteComputeTopologyExceptionTest.class);
         suite.addTestSuite(GridTaskFailoverAffinityRunTest.class);
+        suite.addTestSuite(TaskNodeRestartTest.class);
 
         return suite;
     }