You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/17 12:45:09 UTC
ignite git commit: ignite-1.5 Properly handle duplicated job
responses in GridTaskWorker.onResponse.
Repository: ignite
Updated Branches:
refs/heads/ignite-1537 0bc2e3233 -> 9f40fae17
ignite-1.5 Properly handle duplicated job responses in GridTaskWorker.onResponse.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9f40fae1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9f40fae1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9f40fae1
Branch: refs/heads/ignite-1537
Commit: 9f40fae17d4c1679b79c080d7df2091b08c2c333
Parents: 0bc2e32
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 17 14:44:59 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 17 14:44:59 2015 +0300
----------------------------------------------------------------------
.../processors/task/GridTaskWorker.java | 8 +-
.../ignite/internal/TaskNodeRestartTest.java | 225 +++++++++++++++++++
.../IgniteCacheSizeFailoverTest.java | 2 +-
.../testsuites/IgniteComputeGridTestSuite.java | 2 +
4 files changed, 234 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9f40fae1/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 9315d7c..59d3f90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -696,8 +696,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
if (log.isDebugEnabled())
U.warn(log, "Received response for unknown child job (was job presumed failed?): " + res);
- selfOccupied = true;
+ res = delayedRess.poll();
+ // We can not return here because there can be more delayed messages in the queue.
continue;
}
@@ -708,7 +709,10 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
if (log.isDebugEnabled())
log.debug("Received redundant response for a job (will ignore): " + res);
- return;
+ res = delayedRess.poll();
+
+ // We can not return here because there can be more delayed messages in the queue.
+ continue;
}
if (!jobRes.getNode().id().equals(res.getNodeId())) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/9f40fae1/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java
new file mode 100644
index 0000000..323bfaf
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class TaskNodeRestartTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 3;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(NODES);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTaskNodeRestart() throws Exception {
+ final AtomicBoolean finished = new AtomicBoolean();
+
+ final AtomicInteger stopIdx = new AtomicInteger();
+
+ IgniteInternalFuture<?> restartFut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int idx = stopIdx.getAndIncrement();
+
+ int node = NODES + idx;
+
+ while (!finished.get()) {
+ log.info("Start node: " + node);
+
+ startGrid(node);
+
+ U.sleep(300);
+
+ log.info("Stop node: " + node);
+
+ stopGrid(node);
+ }
+
+ return null;
+ }
+ }, 2, "stop-thread");
+
+ try {
+ final long stopTime = System.currentTimeMillis() + 60_000;
+
+ final AtomicInteger idx = new AtomicInteger();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int node = idx.getAndIncrement() % NODES;
+
+ Ignite ignite = ignite(node);
+
+ log.info("Start thread: " + ignite.name());
+
+ IgniteCompute compute = ignite.compute();
+
+ while (U.currentTimeMillis() < stopTime) {
+ try {
+ compute.broadcast(new TestCallable());
+
+ compute.call(new TestCallable());
+
+ compute.execute(new TestTask1(), null);
+
+ compute.execute(new TestTask2(), null);
+ }
+ catch (IgniteException e) {
+ log.info("Error: " + e);
+ }
+ }
+
+ return null;
+ }
+ }, 20, "test-thread");
+
+ fut.get(90_000);
+
+ finished.set(true);
+
+ restartFut.get();
+ }
+ finally {
+ finished.set(true);
+
+ restartFut.get(5000);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestTask1 extends ComputeTaskAdapter<Void, Void> {
+ /** {@inheritDoc} */
+ @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Void arg)
+ throws IgniteException {
+ Map<TestJob, ClusterNode> jobs = new HashMap<>();
+
+ for (ClusterNode node : subgrid)
+ jobs.put(new TestJob(), node);
+
+ return jobs;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestTask2 implements ComputeTask<Void, Void> {
+ /** {@inheritDoc} */
+ @Nullable public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Void arg)
+ throws IgniteException {
+ Map<TestJob, ClusterNode> jobs = new HashMap<>();
+
+ for (ClusterNode node : subgrid)
+ jobs.put(new TestJob(), node);
+
+ return jobs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ return ComputeJobResultPolicy.WAIT;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Void reduce(List<ComputeJobResult> results) {
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestJob extends ComputeJobAdapter {
+ /** {@inheritDoc} */
+ @Override public Object execute() throws IgniteException {
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestCallable implements IgniteCallable<Void> {
+ /** {@inheritDoc} */
+ @Nullable @Override public Void call() throws Exception {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9f40fae1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java
index 1738a0d..8513620 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java
@@ -97,7 +97,7 @@ public class IgniteCacheSizeFailoverTest extends GridCommonAbstractTest {
return null;
}
- }, 2, "size-thread");
+ }, 10, "size-thread");
try {
for (int i = 0; i < 10; i++) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/9f40fae1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 23f2edc..e2c7e26 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.IgniteComputeEmptyClusterGroupTest;
import org.apache.ignite.internal.IgniteComputeTopologyExceptionTest;
import org.apache.ignite.internal.IgniteExecutorServiceTest;
import org.apache.ignite.internal.IgniteExplicitImplicitDeploymentSelfTest;
+import org.apache.ignite.internal.TaskNodeRestartTest;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManagerSelfTest;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointTaskSelfTest;
import org.apache.ignite.internal.managers.communication.GridCommunicationManagerListenersSelfTest;
@@ -144,6 +145,7 @@ public class IgniteComputeGridTestSuite {
suite.addTestSuite(IgniteComputeEmptyClusterGroupTest.class);
suite.addTestSuite(IgniteComputeTopologyExceptionTest.class);
suite.addTestSuite(GridTaskFailoverAffinityRunTest.class);
+ suite.addTestSuite(TaskNodeRestartTest.class);
return suite;
}