You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2018/10/04 17:05:28 UTC

ignite git commit: IGNITE-9744 Fix SYSTEM_WORKER_TERMINATION detection in general case - Fixes #4876.

Repository: ignite
Updated Branches:
  refs/heads/master c59791196 -> a6d0bd4b5


IGNITE-9744 Fix SYSTEM_WORKER_TERMINATION detection in general case - Fixes #4876.

Signed-off-by: Ivan Rakov <ir...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a6d0bd4b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a6d0bd4b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a6d0bd4b

Branch: refs/heads/master
Commit: a6d0bd4b5551231b0516eaf0ebd8112b45bba86d
Parents: c597911
Author: Andrey Kuznetsov <st...@gmail.com>
Authored: Thu Oct 4 19:45:07 2018 +0300
Committer: Ivan Rakov <ir...@apache.org>
Committed: Thu Oct 4 19:45:07 2018 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |  3 +
 .../ignite/internal/util/nio/GridNioServer.java |  6 ++
 .../ignite/internal/util/worker/GridWorker.java |  4 +-
 .../ignite/internal/worker/WorkersRegistry.java |  3 +
 .../failure/SystemWorkersTerminationTest.java   | 72 ++++----------------
 .../ignite/failure/TestFailureHandler.java      | 10 +--
 6 files changed, 35 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4c254b0..cc1fd33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -2548,6 +2548,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
                 else if (err != null)
                     cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
+                else
+                    // In case of reconnectNeeded == true, prevent general-case termination handling.
+                    cancel();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index e4c96b4..f9fda8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1830,6 +1830,9 @@ public class GridNioServer<T> {
                 }
                 else if (err != null)
                     lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err);
+                else
+                    // In case of closed == true, prevent general-case termination handling.
+                    cancel();
             }
         }
 
@@ -2906,6 +2909,9 @@ public class GridNioServer<T> {
                     lsnr.onFailure(CRITICAL_ERROR, err);
                 else if (err != null)
                     lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err);
+                else
+                    // In case of closed == true, prevent general-case termination handling.
+                    cancel();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
index 3d9163d..3f779da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
@@ -99,12 +99,12 @@ public abstract class GridWorker implements Runnable {
 
     /** {@inheritDoc} */
     @Override public final void run() {
+        updateHeartbeat();
+
         // Runner thread must be recorded first as other operations
         // may depend on it being present.
         runner = Thread.currentThread();
 
-        updateHeartbeat();
-
         if (log.isDebugEnabled())
             log.debug("Grid runnable started: " + name);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
index 55740a4..848bb59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
@@ -143,6 +143,9 @@ public class WorkersRegistry implements GridWorkerListener {
 
     /** {@inheritDoc} */
     @Override public void onStopped(GridWorker w) {
+        if (!w.isCancelled())
+            workerFailedHnd.apply(w, SYSTEM_WORKER_TERMINATION);
+
         unregister(w.runner().getName());
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java
index 638e6f1..1cebe22 100644
--- a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersTerminationTest.java
@@ -17,19 +17,15 @@
 
 package org.apache.ignite.failure;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.internal.worker.WorkersRegistry;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.thread.IgniteThread;
 
@@ -37,11 +33,8 @@ import org.apache.ignite.thread.IgniteThread;
  * Tests system critical workers termination.
  */
 public class SystemWorkersTerminationTest extends GridCommonAbstractTest {
-    /** Handler latch. */
-    private static volatile CountDownLatch hndLatch;
-
     /** */
-    private static final long FAILURE_DETECTION_TIMEOUT = 5_000;
+    private static volatile String failureHndThreadName;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -58,8 +51,6 @@ public class SystemWorkersTerminationTest extends GridCommonAbstractTest {
 
         cfg.setDataStorageConfiguration(dsCfg);
 
-        cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT);
-
         return cfg;
     }
 
@@ -84,62 +75,28 @@ public class SystemWorkersTerminationTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testTermination() throws Exception {
-        Ignite ignite = ignite(0);
-
-        ignite.cluster().active(true);
-
-        WorkersRegistry registry = ((IgniteKernal)ignite).context().workersRegistry();
-
-        Collection<String> threadNames = new ArrayList<>(registry.names());
-
-        int cnt = 0;
-
-        for (String threadName : threadNames) {
-            log.info("Worker termination: " + threadName);
-
-            hndLatch = new CountDownLatch(1);
-
-            GridWorker w = registry.worker(threadName);
-
-            Thread t = w.runner();
-
-            t.interrupt();
-
-            assertTrue(hndLatch.await(3, TimeUnit.SECONDS));
-
-            log.info("Worker is terminated: " + threadName);
-
-            cnt++;
-        }
-
-        assertEquals(threadNames.size(), cnt);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
     public void testSyntheticWorkerTermination() throws Exception {
-        hndLatch = new CountDownLatch(1);
-
         IgniteEx ignite = grid(0);
 
-        GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) {
+        WorkersRegistry registry = ignite.context().workersRegistry();
+
+        long fdTimeout = ignite.configuration().getFailureDetectionTimeout();
+
+        GridWorker worker = new GridWorker(ignite.name(), "test-worker", log, registry) {
             @Override protected void body() throws InterruptedException {
-                Thread.sleep(ignite.configuration().getFailureDetectionTimeout() / 2);
+                Thread.sleep(fdTimeout / 2);
             }
         };
 
-        new IgniteThread(worker).start();
+        IgniteThread thread = new IgniteThread(worker);
 
-        while (worker.runner() == null)
-            Thread.sleep(10);
+        failureHndThreadName = null;
 
-        ignite.context().workersRegistry().register(worker);
+        thread.start();
 
-        worker.runner().join();
+        thread.join();
 
-        assertTrue(hndLatch.await(ignite.configuration().getFailureDetectionTimeout() * 2, TimeUnit.MILLISECONDS));
+        assertTrue(GridTestUtils.waitForCondition(() -> thread.getName().equals(failureHndThreadName), fdTimeout * 2));
     }
 
     /**
@@ -157,7 +114,8 @@ public class SystemWorkersTerminationTest extends GridCommonAbstractTest {
     private class TestFailureHandler extends AbstractFailureHandler {
         /** {@inheritDoc} */
         @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
-            hndLatch.countDown();
+            if (failureCtx.type() == FailureType.SYSTEM_WORKER_TERMINATION)
+                failureHndThreadName = Thread.currentThread().getName();
 
             return false;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6d0bd4b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java
index 09dce9b..5ac75d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java
+++ b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java
@@ -52,12 +52,14 @@ public class TestFailureHandler extends AbstractFailureHandler {
 
     /** {@inheritDoc} */
     @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
-        this.failureCtx = failureCtx;
+        if (this.failureCtx == null) {
+            this.failureCtx = failureCtx;
 
-        if (latch != null)
-            latch.countDown();
+            if (latch != null)
+                latch.countDown();
 
-        ignite.log().warning("Handled ignite failure: " + failureCtx);
+            ignite.log().warning("Handled ignite failure: " + failureCtx);
+        }
 
         return invalidate;
     }