You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/07/16 06:35:58 UTC
[ignite] branch master updated: IGNITE-15099 Fix concurrent
heartbeat update while in blocking section for system workers - Fixes
#9259.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a47ff44 IGNITE-15099 Fix concurrent heartbeat update while in blocking section for system workers - Fixes #9259.
a47ff44 is described below
commit a47ff44f40a1dbdeaa03289966ce8055e5b0127f
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Fri Jul 16 09:32:54 2021 +0300
IGNITE-15099 Fix concurrent heartbeat update while in blocking section for system workers - Fixes #9259.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../ignite/internal/util/worker/GridWorker.java | 18 +++++++-
.../ignite/failure/SystemWorkersBlockingTest.java | 52 ++++++++++++++++++++++
2 files changed, 68 insertions(+), 2 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
index 5926b9c..615d506 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.worker;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -56,6 +57,10 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
/** Timestamp to be updated by this worker periodically to indicate it's up and running. */
private volatile long heartbeatTs;
+ /** Atomic field updater to change heartbeat. */
+ private static final AtomicLongFieldUpdater<GridWorker> HEARTBEAT_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(GridWorker.class, "heartbeatTs");
+
/** Mutex for finish awaiting. */
private final Object mux = new Object();
@@ -273,7 +278,16 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
/** {@inheritDoc} */
@Override public void updateHeartbeat() {
- heartbeatTs = U.currentTimeMillis();
+ long curTs = U.currentTimeMillis();
+ long hbTs = heartbeatTs;
+
+ // Avoid heartbeat update while in the blocking section.
+ while (hbTs < curTs) {
+ if (HEARTBEAT_UPDATER.compareAndSet(this, hbTs, curTs))
+ return;
+
+ hbTs = heartbeatTs;
+ }
}
/** {@inheritDoc} */
@@ -283,7 +297,7 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
/** {@inheritDoc} */
@Override public void blockingSectionEnd() {
- updateHeartbeat();
+ heartbeatTs = U.currentTimeMillis();
}
/** Can be called from {@link #runner()} thread to perform idleness handling. */
diff --git a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
index ccfc507..57495da 100644
--- a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.testframework.GridTestUtils;
@@ -127,6 +128,57 @@ public class SystemWorkersBlockingTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBlockingSection() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch blockingSectionLatch = new CountDownLatch(1);
+ CountDownLatch endLatch = new CountDownLatch(1);
+
+ GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) {
+ @Override protected void body() {
+ blockingSectionBegin();
+
+ try {
+ startLatch.countDown();
+
+ blockingSectionLatch.await();
+ }
+ catch (Exception ignore) {
+ // No-op.
+ }
+ finally {
+ blockingSectionEnd();
+
+ endLatch.countDown();
+ }
+ }
+ };
+
+ runWorker(worker);
+
+ ignite.context().workersRegistry().register(worker);
+
+ startLatch.await();
+
+ // Check that concurrent heartbeat update doesn't affect the blocking section.
+ worker.updateHeartbeat();
+
+ Thread.sleep(2 * SYSTEM_WORKER_BLOCKED_TIMEOUT);
+
+ blockingSectionLatch.countDown();
+
+ endLatch.await();
+
+ assertNull(failureError.get());
+
+ assertTrue(worker.heartbeatTs() <= U.currentTimeMillis());
+ }
+
+ /**
* Tests that repeatedly calling {@link WorkersRegistry#onIdle} in single registered {@link GridWorker}
* doesn't lead to infinite loop.
*