You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/07/16 06:35:58 UTC

[ignite] branch master updated: IGNITE-15099 Fix concurrent heartbeat update while in blocking section for system workers - Fixes #9259.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a47ff44  IGNITE-15099 Fix concurrent heartbeat update while in blocking section for system workers - Fixes #9259.
a47ff44 is described below

commit a47ff44f40a1dbdeaa03289966ce8055e5b0127f
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Fri Jul 16 09:32:54 2021 +0300

    IGNITE-15099 Fix concurrent heartbeat update while in blocking section for system workers - Fixes #9259.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../ignite/internal/util/worker/GridWorker.java    | 18 +++++++-
 .../ignite/failure/SystemWorkersBlockingTest.java  | 52 ++++++++++++++++++++++
 2 files changed, 68 insertions(+), 2 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
index 5926b9c..615d506 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.worker;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -56,6 +57,10 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
     /** Timestamp to be updated by this worker periodically to indicate it's up and running. */
     private volatile long heartbeatTs;
 
+    /** Atomic field updater to change heartbeat. */
+    private static final AtomicLongFieldUpdater<GridWorker> HEARTBEAT_UPDATER =
+        AtomicLongFieldUpdater.newUpdater(GridWorker.class, "heartbeatTs");
+
     /** Mutex for finish awaiting. */
     private final Object mux = new Object();
 
@@ -273,7 +278,16 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
 
     /** {@inheritDoc} */
     @Override public void updateHeartbeat() {
-        heartbeatTs = U.currentTimeMillis();
+        long curTs = U.currentTimeMillis();
+        long hbTs = heartbeatTs;
+
+        // Avoid heartbeat update while in the blocking section.
+        while (hbTs < curTs) {
+            if (HEARTBEAT_UPDATER.compareAndSet(this, hbTs, curTs))
+                return;
+
+            hbTs = heartbeatTs;
+        }
     }
 
     /** {@inheritDoc} */
@@ -283,7 +297,7 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
 
     /** {@inheritDoc} */
     @Override public void blockingSectionEnd() {
-        updateHeartbeat();
+        heartbeatTs = U.currentTimeMillis();
     }
 
     /** Can be called from {@link #runner()} thread to perform idleness handling. */
diff --git a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
index ccfc507..57495da 100644
--- a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.internal.worker.WorkersRegistry;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -127,6 +128,57 @@ public class SystemWorkersBlockingTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBlockingSection() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch blockingSectionLatch = new CountDownLatch(1);
+        CountDownLatch endLatch = new CountDownLatch(1);
+
+        GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) {
+            @Override protected void body() {
+                blockingSectionBegin();
+
+                try {
+                    startLatch.countDown();
+
+                    blockingSectionLatch.await();
+                }
+                catch (Exception ignore) {
+                    // No-op.
+                }
+                finally {
+                    blockingSectionEnd();
+
+                    endLatch.countDown();
+                }
+            }
+        };
+
+        runWorker(worker);
+
+        ignite.context().workersRegistry().register(worker);
+
+        startLatch.await();
+
+        // Check that concurrent heartbeat update doesn't affect the blocking section.
+        worker.updateHeartbeat();
+
+        Thread.sleep(2 * SYSTEM_WORKER_BLOCKED_TIMEOUT);
+
+        blockingSectionLatch.countDown();
+
+        endLatch.await();
+
+        assertNull(failureError.get());
+
+        assertTrue(worker.heartbeatTs() <= U.currentTimeMillis());
+    }
+
+    /**
      * Tests that repeatedly calling {@link WorkersRegistry#onIdle} in single registered {@link GridWorker}
      * doesn't lead to infinite loop.
      *