You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2019/06/19 14:41:23 UTC
[hadoop] branch trunk updated: HDDS-1454. GC other system pause
events can trigger pipeline destroy for all the nodes in the cluster.
Contributed by Supratim Deka (#852)
This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9d68425 HDDS-1454. GC other system pause events can trigger pipeline destroy for all the nodes in the cluster. Contributed by Supratim Deka (#852)
9d68425 is described below
commit 9d6842501c88304ca24062d2463480bc7fbe5e57
Author: supratimdeka <46...@users.noreply.github.com>
AuthorDate: Wed Jun 19 20:11:16 2019 +0530
HDDS-1454. GC other system pause events can trigger pipeline destroy for all the nodes in the cluster. Contributed by Supratim Deka (#852)
---
.../hadoop/hdds/scm/node/NodeStateManager.java | 167 +++++++++++++++++----
.../hadoop/hdds/scm/node/SCMNodeManager.java | 29 ++++
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 92 ++++++++++++
3 files changed, 258 insertions(+), 30 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index c54944b..08a68be 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.node;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
@@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
@@ -117,6 +119,26 @@ public class NodeStateManager implements Runnable, Closeable {
private final long deadNodeIntervalMs;
/**
+ * The future is used to pause/unpause the scheduled checks.
+ */
+ private ScheduledFuture<?> healthCheckFuture;
+
+ /**
+ * Test utility - tracks if health check has been paused (unit tests).
+ */
+ private boolean checkPaused;
+
+ /**
+ * timestamp of the latest heartbeat check process.
+ */
+ private long lastHealthCheck;
+
+ /**
+ * number of times the heart beat check was skipped.
+ */
+ private long skippedHealthChecks;
+
+ /**
* Constructs a NodeStateManager instance with the given configuration.
*
* @param conf Configuration
@@ -143,10 +165,11 @@ public class NodeStateManager implements Runnable, Closeable {
executorService = HadoopExecutors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("SCM Heartbeat Processing Thread - %d").build());
- //BUG:BUG TODO: The return value is ignored, if an exception is thrown in
- // the executing funtion, it will be ignored.
- executorService.schedule(this, heartbeatCheckerIntervalMs,
- TimeUnit.MILLISECONDS);
+
+ skippedHealthChecks = 0;
+ checkPaused = false; // accessed only from test functions
+
+ scheduleNextHealthCheck();
}
/**
@@ -464,6 +487,42 @@ public class NodeStateManager implements Runnable, Closeable {
@Override
public void run() {
+ if (shouldSkipCheck()) {
+ skippedHealthChecks++;
+ LOG.info("Detected long delay in scheduling HB processing thread. "
+ + "Skipping heartbeat checks for one iteration.");
+ } else {
+ checkNodesHealth();
+ }
+
+ // we purposefully make this non-deterministic. Instead of using a
+ // scheduleAtFixedFrequency we will just go to sleep
+ // and wake up at the next rendezvous point, which is currentTime +
+ // heartbeatCheckerIntervalMs. This leads to the issue that we are now
+ // heart beating not at a fixed cadence, but clock tick + time taken to
+ // work.
+ //
+ // This time taken to work can skew the heartbeat processor thread.
+ // The reason why we don't care is because of the following reasons.
+ //
+ // 1. checkerInterval is general many magnitudes faster than datanode HB
+ // frequency.
+ //
+ // 2. if we have too much nodes, the SCM would be doing only HB
+ // processing, this could lead to SCM's CPU starvation. With this
+ // approach we always guarantee that HB thread sleeps for a little while.
+ //
+ // 3. It is possible that we will never finish processing the HB's in the
+ // thread. But that means we have a mis-configured system. We will warn
+ // the users by logging that information.
+ //
+ // 4. And the most important reason, heartbeats are not blocked even if
+ // this thread does not run, they will go into the processing queue.
+ scheduleNextHealthCheck();
+ }
+
+ private void checkNodesHealth() {
+
/*
*
* staleNodeDeadline healthyNodeDeadline
@@ -558,41 +617,36 @@ public class NodeStateManager implements Runnable, Closeable {
heartbeatCheckerIntervalMs);
}
- // we purposefully make this non-deterministic. Instead of using a
- // scheduleAtFixedFrequency we will just go to sleep
- // and wake up at the next rendezvous point, which is currentTime +
- // heartbeatCheckerIntervalMs. This leads to the issue that we are now
- // heart beating not at a fixed cadence, but clock tick + time taken to
- // work.
- //
- // This time taken to work can skew the heartbeat processor thread.
- // The reason why we don't care is because of the following reasons.
- //
- // 1. checkerInterval is general many magnitudes faster than datanode HB
- // frequency.
- //
- // 2. if we have too much nodes, the SCM would be doing only HB
- // processing, this could lead to SCM's CPU starvation. With this
- // approach we always guarantee that HB thread sleeps for a little while.
- //
- // 3. It is possible that we will never finish processing the HB's in the
- // thread. But that means we have a mis-configured system. We will warn
- // the users by logging that information.
- //
- // 4. And the most important reason, heartbeats are not blocked even if
- // this thread does not run, they will go into the processing queue.
+ }
+
+ private void scheduleNextHealthCheck() {
if (!Thread.currentThread().isInterrupted() &&
!executorService.isShutdown()) {
//BUGBUG: The return future needs to checked here to make sure the
// exceptions are handled correctly.
- executorService.schedule(this, heartbeatCheckerIntervalMs,
- TimeUnit.MILLISECONDS);
+ healthCheckFuture = executorService.schedule(this,
+ heartbeatCheckerIntervalMs, TimeUnit.MILLISECONDS);
} else {
- LOG.info("Current Thread is interrupted, shutting down HB processing " +
+ LOG.warn("Current Thread is interrupted, shutting down HB processing " +
"thread for Node Manager.");
}
+ lastHealthCheck = Time.monotonicNow();
+ }
+
+ /**
+ * if the time since last check exceeds the stale|dead node interval, skip.
+ * such long delays might be caused by a JVM pause. SCM cannot make reliable
+ * conclusions about datanode health in such situations.
+ * @return : true indicates skip HB checks
+ */
+ private boolean shouldSkipCheck() {
+
+ long currentTime = Time.monotonicNow();
+ long minInterval = Math.min(staleNodeIntervalMs, deadNodeIntervalMs);
+
+ return ((currentTime - lastHealthCheck) >= minInterval);
}
/**
@@ -640,4 +694,57 @@ public class NodeStateManager implements Runnable, Closeable {
Thread.currentThread().interrupt();
}
}
+
+ /**
+ * Test Utility : return number of times heartbeat check was skipped.
+ * @return : count of times HB process was skipped
+ */
+ @VisibleForTesting
+ long getSkippedHealthChecks() {
+ return skippedHealthChecks;
+ }
+
+ /**
+ * Test Utility : Pause the periodic node hb check.
+ * @return ScheduledFuture for the scheduled check that got cancelled.
+ */
+ @VisibleForTesting
+ ScheduledFuture pause() {
+
+ if (executorService.isShutdown() || checkPaused) {
+ return null;
+ }
+
+ checkPaused = healthCheckFuture.cancel(false);
+
+ return healthCheckFuture;
+ }
+
+ /**
+ * Test utility : unpause the periodic node hb check.
+ * @return ScheduledFuture for the next scheduled check
+ */
+ @VisibleForTesting
+ ScheduledFuture unpause() {
+
+ if (executorService.isShutdown()) {
+ return null;
+ }
+
+ if (checkPaused) {
+ Preconditions.checkState(((healthCheckFuture == null)
+ || healthCheckFuture.isCancelled()
+ || healthCheckFuture.isDone()));
+
+ checkPaused = false;
+ /**
+ * We do not call scheduleNextHealthCheck because we are
+ * not updating the lastHealthCheck timestamp.
+ */
+ healthCheckFuture = executorService.schedule(this,
+ heartbeatCheckerIntervalMs, TimeUnit.MILLISECONDS);
+ }
+
+ return healthCheckFuture;
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index eaa2255..a85271e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.node;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
@@ -72,6 +73,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
/**
@@ -580,4 +582,31 @@ public class SCMNodeManager implements NodeManager {
return null;
}
}
+
+ /**
+ * Test utility to stop heartbeat check process.
+ * @return ScheduledFuture of next scheduled check that got cancelled.
+ */
+ @VisibleForTesting
+ ScheduledFuture pauseHealthCheck() {
+ return nodeStateManager.pause();
+ }
+
+ /**
+ * Test utility to resume the paused heartbeat check process.
+ * @return ScheduledFuture of the next scheduled check
+ */
+ @VisibleForTesting
+ ScheduledFuture unpauseHealthCheck() {
+ return nodeStateManager.unpause();
+ }
+
+ /**
+ * Test utility to get the count of skipped heartbeat check iterations.
+ * @return count of skipped heartbeat check iterations
+ */
+ @VisibleForTesting
+ long getSkippedHealthChecks() {
+ return nodeStateManager.getSkippedHealthChecks();
+ }
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 60fc204..ae81071 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -53,6 +53,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -346,6 +348,96 @@ public class TestSCMNodeManager {
}
/**
+ * Simulate a JVM Pause by pausing the health check process
+ * Ensure that none of the nodes with heartbeats become Dead or Stale.
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws AuthenticationException
+ */
+ @Test
+ public void testScmHandleJvmPause()
+ throws IOException, InterruptedException, AuthenticationException {
+ final int healthCheckInterval = 200; // milliseconds
+ final int heartbeatInterval = 1; // seconds
+ final int staleNodeInterval = 3; // seconds
+ final int deadNodeInterval = 6; // seconds
+ ScheduledFuture schedFuture;
+
+ OzoneConfiguration conf = getConf();
+ conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
+ healthCheckInterval, MILLISECONDS);
+ conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
+ heartbeatInterval, SECONDS);
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL,
+ staleNodeInterval, SECONDS);
+ conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL,
+ deadNodeInterval, SECONDS);
+
+ try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+ DatanodeDetails node1 =
+ TestUtils.createRandomDatanodeAndRegister(nodeManager);
+ DatanodeDetails node2 =
+ TestUtils.createRandomDatanodeAndRegister(nodeManager);
+
+ nodeManager.processHeartbeat(node1);
+ nodeManager.processHeartbeat(node2);
+
+ // Sleep so that heartbeat processing thread gets to run.
+ Thread.sleep(1000);
+
+ //Assert all nodes are healthy.
+ assertEquals(2, nodeManager.getAllNodes().size());
+ assertEquals(2, nodeManager.getNodeCount(HEALTHY));
+
+ /**
+ * Simulate a JVM Pause and subsequent handling in following steps:
+ * Step 1 : stop heartbeat check process for stale node interval
+ * Step 2 : resume heartbeat check
+ * Step 3 : wait for 1 iteration of heartbeat check thread
+ * Step 4 : retrieve the state of all nodes - assert all are HEALTHY
+ * Step 5 : heartbeat for node1
+ * [TODO : what if there is scheduling delay of test thread in Step 5?]
+ * Step 6 : wait for some time to allow iterations of check process
+ * Step 7 : retrieve the state of all nodes - assert node2 is STALE
+ * and node1 is HEALTHY
+ */
+
+ // Step 1 : stop health check process (simulate JVM pause)
+ nodeManager.pauseHealthCheck();
+ Thread.sleep(MILLISECONDS.convert(staleNodeInterval, SECONDS));
+
+ // Step 2 : resume health check
+ assertTrue("Unexpected, already skipped heartbeat checks",
+ (nodeManager.getSkippedHealthChecks() == 0));
+ schedFuture = nodeManager.unpauseHealthCheck();
+
+ // Step 3 : wait for 1 iteration of health check
+ try {
+ schedFuture.get();
+ assertTrue("We did not skip any heartbeat checks",
+ nodeManager.getSkippedHealthChecks() > 0);
+ } catch (ExecutionException e) {
+ assertEquals("Unexpected exception waiting for Scheduled Health Check",
+ 0, 1);
+ }
+
+ // Step 4 : all nodes should still be HEALTHY
+ assertEquals(2, nodeManager.getAllNodes().size());
+ assertEquals(2, nodeManager.getNodeCount(HEALTHY));
+
+ // Step 5 : heartbeat for node1
+ nodeManager.processHeartbeat(node1);
+
+ // Step 6 : wait for health check process to run
+ Thread.sleep(1000);
+
+ // Step 7 : node2 should transition to STALE
+ assertEquals(1, nodeManager.getNodeCount(HEALTHY));
+ assertEquals(1, nodeManager.getNodeCount(STALE));
+ }
+ }
+
+ /**
* Check for NPE when datanodeDetails is passed null for sendHeartbeat.
*
* @throws IOException
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org