You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2018/10/04 17:05:28 UTC
ignite git commit: IGNITE-9744 Fix SYSTEM_WORKER_TERMINATION
detection in general case - Fixes #4876.
Repository: ignite
Updated Branches:
refs/heads/master c59791196 -> a6d0bd4b5
IGNITE-9744 Fix SYSTEM_WORKER_TERMINATION detection in general case - Fixes #4876.
Signed-off-by: Ivan Rakov <ir...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a6d0bd4b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a6d0bd4b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a6d0bd4b
Branch: refs/heads/master
Commit: a6d0bd4b5551231b0516eaf0ebd8112b45bba86d
Parents: c597911
Author: Andrey Kuznetsov <st...@gmail.com>
Authored: Thu Oct 4 19:45:07 2018 +0300
Committer: Ivan Rakov <ir...@apache.org>
Committed: Thu Oct 4 19:45:07 2018 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 3 +
.../ignite/internal/util/nio/GridNioServer.java | 6 ++
.../ignite/internal/util/worker/GridWorker.java | 4 +-
.../ignite/internal/worker/WorkersRegistry.java | 3 +
.../failure/SystemWorkersTerminationTest.java | 72 ++++----------------
.../ignite/failure/TestFailureHandler.java | 10 +--
6 files changed, 35 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4c254b0..cc1fd33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -2548,6 +2548,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
else if (err != null)
cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
+ else
+ // In case of reconnectNeeded == true, prevent general-case termination handling.
+ cancel();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index e4c96b4..f9fda8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1830,6 +1830,9 @@ public class GridNioServer<T> {
}
else if (err != null)
lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err);
+ else
+ // In case of closed == true, prevent general-case termination handling.
+ cancel();
}
}
@@ -2906,6 +2909,9 @@ public class GridNioServer<T> {
lsnr.onFailure(CRITICAL_ERROR, err);
else if (err != null)
lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err);
+ else
+ // In case of closed == true, prevent general-case termination handling.
+ cancel();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
index 3d9163d..3f779da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
@@ -99,12 +99,12 @@ public abstract class GridWorker implements Runnable {
/** {@inheritDoc} */
@Override public final void run() {
+ updateHeartbeat();
+
// Runner thread must be recorded first as other operations
// may depend on it being present.
runner = Thread.currentThread();
- updateHeartbeat();
-
if (log.isDebugEnabled())
log.debug("Grid runnable started: " + name);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
index 55740a4..848bb59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
@@ -143,6 +143,9 @@ public class WorkersRegistry implements GridWorkerListener {
/** {@inheritDoc} */
@Override public void onStopped(GridWorker w) {
+ if (!w.isCancelled())
+ workerFailedHnd.apply(w, SYSTEM_WORKER_TERMINATION);
+
unregister(w.runner().getName());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java
index 638e6f1..1cebe22 100644
--- a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java
@@ -17,19 +17,15 @@
package org.apache.ignite.failure;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.worker.WorkersRegistry;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.thread.IgniteThread;
@@ -37,11 +33,8 @@ import org.apache.ignite.thread.IgniteThread;
* Tests system critical workers termination.
*/
public class SystemWorkersTerminationTest extends GridCommonAbstractTest {
- /** Handler latch. */
- private static volatile CountDownLatch hndLatch;
-
/** */
- private static final long FAILURE_DETECTION_TIMEOUT = 5_000;
+ private static volatile String failureHndThreadName;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -58,8 +51,6 @@ public class SystemWorkersTerminationTest extends GridCommonAbstractTest {
cfg.setDataStorageConfiguration(dsCfg);
- cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT);
-
return cfg;
}
@@ -84,62 +75,28 @@ public class SystemWorkersTerminationTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testTermination() throws Exception {
- Ignite ignite = ignite(0);
-
- ignite.cluster().active(true);
-
- WorkersRegistry registry = ((IgniteKernal)ignite).context().workersRegistry();
-
- Collection<String> threadNames = new ArrayList<>(registry.names());
-
- int cnt = 0;
-
- for (String threadName : threadNames) {
- log.info("Worker termination: " + threadName);
-
- hndLatch = new CountDownLatch(1);
-
- GridWorker w = registry.worker(threadName);
-
- Thread t = w.runner();
-
- t.interrupt();
-
- assertTrue(hndLatch.await(3, TimeUnit.SECONDS));
-
- log.info("Worker is terminated: " + threadName);
-
- cnt++;
- }
-
- assertEquals(threadNames.size(), cnt);
- }
-
- /**
- * @throws Exception If failed.
- */
public void testSyntheticWorkerTermination() throws Exception {
- hndLatch = new CountDownLatch(1);
-
IgniteEx ignite = grid(0);
- GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) {
+ WorkersRegistry registry = ignite.context().workersRegistry();
+
+ long fdTimeout = ignite.configuration().getFailureDetectionTimeout();
+
+ GridWorker worker = new GridWorker(ignite.name(), "test-worker", log, registry) {
@Override protected void body() throws InterruptedException {
- Thread.sleep(ignite.configuration().getFailureDetectionTimeout() / 2);
+ Thread.sleep(fdTimeout / 2);
}
};
- new IgniteThread(worker).start();
+ IgniteThread thread = new IgniteThread(worker);
- while (worker.runner() == null)
- Thread.sleep(10);
+ failureHndThreadName = null;
- ignite.context().workersRegistry().register(worker);
+ thread.start();
- worker.runner().join();
+ thread.join();
- assertTrue(hndLatch.await(ignite.configuration().getFailureDetectionTimeout() * 2, TimeUnit.MILLISECONDS));
+ assertTrue(GridTestUtils.waitForCondition(() -> thread.getName().equals(failureHndThreadName), fdTimeout * 2));
}
/**
@@ -157,7 +114,8 @@ public class SystemWorkersTerminationTest extends GridCommonAbstractTest {
private class TestFailureHandler extends AbstractFailureHandler {
/** {@inheritDoc} */
@Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
- hndLatch.countDown();
+ if (failureCtx.type() == FailureType.SYSTEM_WORKER_TERMINATION)
+ failureHndThreadName = Thread.currentThread().getName();
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java
index 09dce9b..5ac75d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java
+++ b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java
@@ -52,12 +52,14 @@ public class TestFailureHandler extends AbstractFailureHandler {
/** {@inheritDoc} */
@Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
- this.failureCtx = failureCtx;
+ if (this.failureCtx == null) {
+ this.failureCtx = failureCtx;
- if (latch != null)
- latch.countDown();
+ if (latch != null)
+ latch.countDown();
- ignite.log().warning("Handled ignite failure: " + failureCtx);
+ ignite.log().warning("Handled ignite failure: " + failureCtx);
+ }
return invalidate;
}