You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2019/06/19 14:41:23 UTC

[hadoop] branch trunk updated: HDDS-1454. GC other system pause events can trigger pipeline destroy for all the nodes in the cluster. Contributed by Supratim Deka (#852)

This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9d68425  HDDS-1454. GC other system pause events can trigger pipeline destroy for all the nodes in the cluster. Contributed by Supratim Deka (#852)
9d68425 is described below

commit 9d6842501c88304ca24062d2463480bc7fbe5e57
Author: supratimdeka <46...@users.noreply.github.com>
AuthorDate: Wed Jun 19 20:11:16 2019 +0530

    HDDS-1454. GC other system pause events can trigger pipeline destroy for all the nodes in the cluster. Contributed by Supratim Deka (#852)
---
 .../hadoop/hdds/scm/node/NodeStateManager.java     | 167 +++++++++++++++++----
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  29 ++++
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |  92 ++++++++++++
 3 files changed, 258 insertions(+), 30 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index c54944b..08a68be 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.node;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
@@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.util.*;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 
@@ -117,6 +119,26 @@ public class NodeStateManager implements Runnable, Closeable {
   private final long deadNodeIntervalMs;
 
   /**
+   * The future is used to pause/unpause the scheduled checks.
+   */
+  private ScheduledFuture<?> healthCheckFuture;
+
+  /**
+   * Test utility - tracks if health check has been paused (unit tests).
+   */
+  private boolean checkPaused;
+
+  /**
+   * timestamp of the latest heartbeat check process.
+   */
+  private long lastHealthCheck;
+
+  /**
+   * number of times the heart beat check was skipped.
+   */
+  private long skippedHealthChecks;
+
+  /**
    * Constructs a NodeStateManager instance with the given configuration.
    *
    * @param conf Configuration
@@ -143,10 +165,11 @@ public class NodeStateManager implements Runnable, Closeable {
     executorService = HadoopExecutors.newScheduledThreadPool(1,
         new ThreadFactoryBuilder().setDaemon(true)
             .setNameFormat("SCM Heartbeat Processing Thread - %d").build());
-    //BUG:BUG TODO: The return value is ignored, if an exception is thrown in
-    // the executing funtion, it will be ignored.
-    executorService.schedule(this, heartbeatCheckerIntervalMs,
-        TimeUnit.MILLISECONDS);
+
+    skippedHealthChecks = 0;
+    checkPaused = false; // accessed only from test functions
+
+    scheduleNextHealthCheck();
   }
 
   /**
@@ -464,6 +487,42 @@ public class NodeStateManager implements Runnable, Closeable {
   @Override
   public void run() {
 
+    if (shouldSkipCheck()) {
+      skippedHealthChecks++;
+      LOG.info("Detected long delay in scheduling HB processing thread. "
+          + "Skipping heartbeat checks for one iteration.");
+    } else {
+      checkNodesHealth();
+    }
+
+    // we purposefully make this non-deterministic. Instead of using a
+    // scheduleAtFixedFrequency  we will just go to sleep
+    // and wake up at the next rendezvous point, which is currentTime +
+    // heartbeatCheckerIntervalMs. This leads to the issue that we are now
+    // heart beating not at a fixed cadence, but clock tick + time taken to
+    // work.
+    //
+    // This time taken to work can skew the heartbeat processor thread.
+    // The reason why we don't care is because of the following reasons.
+    //
+    // 1. checkerInterval is general many magnitudes faster than datanode HB
+    // frequency.
+    //
+    // 2. if we have too much nodes, the SCM would be doing only HB
+    // processing, this could lead to SCM's CPU starvation. With this
+    // approach we always guarantee that  HB thread sleeps for a little while.
+    //
+    // 3. It is possible that we will never finish processing the HB's in the
+    // thread. But that means we have a mis-configured system. We will warn
+    // the users by logging that information.
+    //
+    // 4. And the most important reason, heartbeats are not blocked even if
+    // this thread does not run, they will go into the processing queue.
+    scheduleNextHealthCheck();
+  }
+
+  private void checkNodesHealth() {
+
     /*
      *
      *          staleNodeDeadline                healthyNodeDeadline
@@ -558,41 +617,36 @@ public class NodeStateManager implements Runnable, Closeable {
           heartbeatCheckerIntervalMs);
     }
 
-    // we purposefully make this non-deterministic. Instead of using a
-    // scheduleAtFixedFrequency  we will just go to sleep
-    // and wake up at the next rendezvous point, which is currentTime +
-    // heartbeatCheckerIntervalMs. This leads to the issue that we are now
-    // heart beating not at a fixed cadence, but clock tick + time taken to
-    // work.
-    //
-    // This time taken to work can skew the heartbeat processor thread.
-    // The reason why we don't care is because of the following reasons.
-    //
-    // 1. checkerInterval is general many magnitudes faster than datanode HB
-    // frequency.
-    //
-    // 2. if we have too much nodes, the SCM would be doing only HB
-    // processing, this could lead to SCM's CPU starvation. With this
-    // approach we always guarantee that  HB thread sleeps for a little while.
-    //
-    // 3. It is possible that we will never finish processing the HB's in the
-    // thread. But that means we have a mis-configured system. We will warn
-    // the users by logging that information.
-    //
-    // 4. And the most important reason, heartbeats are not blocked even if
-    // this thread does not run, they will go into the processing queue.
+  }
+
+  private void scheduleNextHealthCheck() {
 
     if (!Thread.currentThread().isInterrupted() &&
         !executorService.isShutdown()) {
       //BUGBUG: The return future needs to checked here to make sure the
       // exceptions are handled correctly.
-      executorService.schedule(this, heartbeatCheckerIntervalMs,
-          TimeUnit.MILLISECONDS);
+      healthCheckFuture = executorService.schedule(this,
+          heartbeatCheckerIntervalMs, TimeUnit.MILLISECONDS);
     } else {
-      LOG.info("Current Thread is interrupted, shutting down HB processing " +
+      LOG.warn("Current Thread is interrupted, shutting down HB processing " +
           "thread for Node Manager.");
     }
 
+    lastHealthCheck = Time.monotonicNow();
+  }
+
+  /**
+   * if the time since last check exceeds the stale|dead node interval, skip.
+   * such long delays might be caused by a JVM pause. SCM cannot make reliable
+   * conclusions about datanode health in such situations.
+   * @return : true indicates skip HB checks
+   */
+  private boolean shouldSkipCheck() {
+
+    long currentTime = Time.monotonicNow();
+    long minInterval = Math.min(staleNodeIntervalMs, deadNodeIntervalMs);
+
+    return ((currentTime - lastHealthCheck) >= minInterval);
   }
 
   /**
@@ -640,4 +694,57 @@ public class NodeStateManager implements Runnable, Closeable {
       Thread.currentThread().interrupt();
     }
   }
+
+  /**
+   * Test Utility : return number of times heartbeat check was skipped.
+   * @return : count of times HB process was skipped
+   */
+  @VisibleForTesting
+  long getSkippedHealthChecks() {
+    return skippedHealthChecks;
+  }
+
+  /**
+   * Test Utility : Pause the periodic node hb check.
+   * @return ScheduledFuture for the scheduled check that got cancelled.
+   */
+  @VisibleForTesting
+  ScheduledFuture pause() {
+
+    if (executorService.isShutdown() || checkPaused) {
+      return null;
+    }
+
+    checkPaused = healthCheckFuture.cancel(false);
+
+    return healthCheckFuture;
+  }
+
+  /**
+   * Test utility : unpause the periodic node hb check.
+   * @return ScheduledFuture for the next scheduled check
+   */
+  @VisibleForTesting
+  ScheduledFuture unpause() {
+
+    if (executorService.isShutdown()) {
+      return null;
+    }
+
+    if (checkPaused) {
+      Preconditions.checkState(((healthCheckFuture == null)
+          || healthCheckFuture.isCancelled()
+          || healthCheckFuture.isDone()));
+
+      checkPaused = false;
+      /**
+       * We do not call scheduleNextHealthCheck because we are
+       * not updating the lastHealthCheck timestamp.
+       */
+      healthCheckFuture = executorService.schedule(this,
+          heartbeatCheckerIntervalMs, TimeUnit.MILLISECONDS);
+    }
+
+    return healthCheckFuture;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index eaa2255..a85271e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.node;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
@@ -72,6 +73,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
 import java.util.stream.Collectors;
 
 /**
@@ -580,4 +582,31 @@ public class SCMNodeManager implements NodeManager {
       return null;
     }
   }
+
+  /**
+   * Test utility to stop heartbeat check process.
+   * @return ScheduledFuture of next scheduled check that got cancelled.
+   */
+  @VisibleForTesting
+  ScheduledFuture pauseHealthCheck() {
+    return nodeStateManager.pause();
+  }
+
+  /**
+   * Test utility to resume the paused heartbeat check process.
+   * @return ScheduledFuture of the next scheduled check
+   */
+  @VisibleForTesting
+  ScheduledFuture unpauseHealthCheck() {
+    return nodeStateManager.unpause();
+  }
+
+  /**
+   * Test utility to get the count of skipped heartbeat check iterations.
+   * @return count of skipped heartbeat check iterations
+   */
+  @VisibleForTesting
+  long getSkippedHealthChecks() {
+    return nodeStateManager.getSkippedHealthChecks();
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 60fc204..ae81071 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -53,6 +53,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -346,6 +348,96 @@ public class TestSCMNodeManager {
   }
 
   /**
+   * Simulate a JVM Pause by pausing the health check process
+   * Ensure that none of the nodes with heartbeats become Dead or Stale.
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws AuthenticationException
+   */
+  @Test
+  public void testScmHandleJvmPause()
+      throws IOException, InterruptedException, AuthenticationException {
+    final int healthCheckInterval = 200; // milliseconds
+    final int heartbeatInterval = 1; // seconds
+    final int staleNodeInterval = 3; // seconds
+    final int deadNodeInterval = 6; // seconds
+    ScheduledFuture schedFuture;
+
+    OzoneConfiguration conf = getConf();
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
+        healthCheckInterval, MILLISECONDS);
+    conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
+        heartbeatInterval, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL,
+        staleNodeInterval, SECONDS);
+    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL,
+        deadNodeInterval, SECONDS);
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      DatanodeDetails node1 =
+          TestUtils.createRandomDatanodeAndRegister(nodeManager);
+      DatanodeDetails node2 =
+          TestUtils.createRandomDatanodeAndRegister(nodeManager);
+
+      nodeManager.processHeartbeat(node1);
+      nodeManager.processHeartbeat(node2);
+
+      // Sleep so that heartbeat processing thread gets to run.
+      Thread.sleep(1000);
+
+      //Assert all nodes are healthy.
+      assertEquals(2, nodeManager.getAllNodes().size());
+      assertEquals(2, nodeManager.getNodeCount(HEALTHY));
+
+      /**
+       * Simulate a JVM Pause and subsequent handling in following steps:
+       * Step 1 : stop heartbeat check process for stale node interval
+       * Step 2 : resume heartbeat check
+       * Step 3 : wait for 1 iteration of heartbeat check thread
+       * Step 4 : retrieve the state of all nodes - assert all are HEALTHY
+       * Step 5 : heartbeat for node1
+       * [TODO : what if there is scheduling delay of test thread in Step 5?]
+       * Step 6 : wait for some time to allow iterations of check process
+       * Step 7 : retrieve the state of all nodes -  assert node2 is STALE
+       * and node1 is HEALTHY
+       */
+
+      // Step 1 : stop health check process (simulate JVM pause)
+      nodeManager.pauseHealthCheck();
+      Thread.sleep(MILLISECONDS.convert(staleNodeInterval, SECONDS));
+
+      // Step 2 : resume health check
+      assertTrue("Unexpected, already skipped heartbeat checks",
+          (nodeManager.getSkippedHealthChecks() == 0));
+      schedFuture = nodeManager.unpauseHealthCheck();
+
+      // Step 3 : wait for 1 iteration of health check
+      try {
+        schedFuture.get();
+        assertTrue("We did not skip any heartbeat checks",
+            nodeManager.getSkippedHealthChecks() > 0);
+      } catch (ExecutionException e) {
+        assertEquals("Unexpected exception waiting for Scheduled Health Check",
+            0, 1);
+      }
+
+      // Step 4 : all nodes should still be HEALTHY
+      assertEquals(2, nodeManager.getAllNodes().size());
+      assertEquals(2, nodeManager.getNodeCount(HEALTHY));
+
+      // Step 5 : heartbeat for node1
+      nodeManager.processHeartbeat(node1);
+
+      // Step 6 : wait for health check process to run
+      Thread.sleep(1000);
+
+      // Step 7 : node2 should transition to STALE
+      assertEquals(1, nodeManager.getNodeCount(HEALTHY));
+      assertEquals(1, nodeManager.getNodeCount(STALE));
+    }
+  }
+
+  /**
    * Check for NPE when datanodeDetails is passed null for sendHeartbeat.
    *
    * @throws IOException


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org