You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2018/08/21 02:14:45 UTC

[2/2] hadoop git commit: HADOOP-15679. ShutdownHookManager shutdown time needs to be configurable & extended. Contributed by Steve Loughran.

HADOOP-15679. ShutdownHookManager shutdown time needs to be configurable & extended. Contributed by Steve Loughran.

Change-Id: Ifd4a6e3f796b4dc88e97f63066289e2534c77a29


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/95fcdc04
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/95fcdc04
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/95fcdc04

Branch: refs/heads/branch-3.1
Commit: 95fcdc04355920697337db1136ffe463d4794616
Parents: 3712b79
Author: Steve Loughran <st...@apache.org>
Authored: Mon Aug 20 18:44:14 2018 -0700
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Aug 20 18:48:49 2018 -0700

----------------------------------------------------------------------
 .../fs/CommonConfigurationKeysPublic.java       |   9 +
 .../apache/hadoop/util/ShutdownHookManager.java | 169 ++++++++--
 .../src/main/resources/core-default.xml         |  16 +
 .../hadoop/util/TestShutdownHookManager.java    | 328 +++++++++++++++----
 4 files changed, 418 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/95fcdc04/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
index 132c9bf..7a91993 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
@@ -890,5 +890,14 @@ public class CommonConfigurationKeysPublic {
           HADOOP_SECURITY_SENSITIVE_CONFIG_KEYS);
   public static final String HADOOP_SYSTEM_TAGS = "hadoop.system.tags";
   public static final String HADOOP_CUSTOM_TAGS = "hadoop.custom.tags";
+
+  /** Configuration option for the shutdown hook manager shutdown time:
+   *  {@value}. */
+  public static final String SERVICE_SHUTDOWN_TIMEOUT =
+      "hadoop.service.shutdown.timeout";
+
+  /** Default shutdown hook timeout: {@value} seconds. */
+  public static final long SERVICE_SHUTDOWN_TIMEOUT_DEFAULT = 30;
+
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95fcdc04/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java
index 153f92b..2ca8e55 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java
@@ -17,11 +17,17 @@
  */
 package org.apache.hadoop.util;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -34,6 +40,9 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.SERVICE_SHUTDOWN_TIMEOUT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.SERVICE_SHUTDOWN_TIMEOUT_DEFAULT;
+
 /**
  * The <code>ShutdownHookManager</code> enables running shutdownHook
  * in a deterministic order, higher priority first.
@@ -42,53 +51,55 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * This class registers a single JVM shutdownHook and run all the
  * shutdownHooks registered to it (to this class) in order based on their
  * priority.
+ *
+ * Unless a hook was registered with a shutdown explicitly set through
+ * {@link #addShutdownHook(Runnable, int, long, TimeUnit)},
+ * the shutdown time allocated to it is set by the configuration option
+ * {@link CommonConfigurationKeysPublic#SERVICE_SHUTDOWN_TIMEOUT} in
+ * {@code core-site.xml}, with a default value of
+ * {@link CommonConfigurationKeysPublic#SERVICE_SHUTDOWN_TIMEOUT_DEFAULT}
+ * seconds.
  */
-public class ShutdownHookManager {
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class ShutdownHookManager {
 
   private static final ShutdownHookManager MGR = new ShutdownHookManager();
 
   private static final Logger LOG =
       LoggerFactory.getLogger(ShutdownHookManager.class);
-  private static final long TIMEOUT_DEFAULT = 10;
-  private static final TimeUnit TIME_UNIT_DEFAULT = TimeUnit.SECONDS;
+
+  /** Minimum shutdown timeout: {@value} second(s). */
+  public static final long TIMEOUT_MINIMUM = 1;
+
+  /** The default time unit used: seconds. */
+  public static final TimeUnit TIME_UNIT_DEFAULT = TimeUnit.SECONDS;
 
   private static final ExecutorService EXECUTOR =
       HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-          .setDaemon(true).build());
+          .setDaemon(true)
+          .setNameFormat("shutdown-hook-%01d")
+          .build());
+
   static {
     try {
       Runtime.getRuntime().addShutdownHook(
         new Thread() {
           @Override
           public void run() {
-            MGR.shutdownInProgress.set(true);
-            for (HookEntry entry: MGR.getShutdownHooksInOrder()) {
-              Future<?> future = EXECUTOR.submit(entry.getHook());
-              try {
-                future.get(entry.getTimeout(), entry.getTimeUnit());
-              } catch (TimeoutException ex) {
-                future.cancel(true);
-                LOG.warn("ShutdownHook '" + entry.getHook().getClass().
-                    getSimpleName() + "' timeout, " + ex.toString(), ex);
-              } catch (Throwable ex) {
-                LOG.warn("ShutdownHook '" + entry.getHook().getClass().
-                    getSimpleName() + "' failed, " + ex.toString(), ex);
-              }
-            }
-            try {
-              EXECUTOR.shutdown();
-              if (!EXECUTOR.awaitTermination(TIMEOUT_DEFAULT,
-                  TIME_UNIT_DEFAULT)) {
-                LOG.error("ShutdownHookManger shutdown forcefully.");
-                EXECUTOR.shutdownNow();
-              }
-              LOG.debug("ShutdownHookManger complete shutdown.");
-            } catch (InterruptedException ex) {
-              LOG.error("ShutdownHookManger interrupted while waiting for " +
-                  "termination.", ex);
-              EXECUTOR.shutdownNow();
-              Thread.currentThread().interrupt();
+            if (MGR.shutdownInProgress.getAndSet(true)) {
+              LOG.info("Shutdown process invoked a second time: ignoring");
+              return;
             }
+            long started = System.currentTimeMillis();
+            int timeoutCount = executeShutdown();
+            long ended = System.currentTimeMillis();
+            LOG.debug(String.format(
+                "Completed shutdown in %.3f seconds; Timeouts: %d",
+                (ended-started)/1000.0, timeoutCount));
+            // each of the hooks have executed; now shut down the
+            // executor itself.
+            shutdownExecutor(new Configuration());
           }
         }
       );
@@ -99,18 +110,92 @@ public class ShutdownHookManager {
   }
 
   /**
+   * Execute the shutdown.
+   * This is exposed purely for testing: do not invoke it.
+   * @return the number of shutdown hooks which timed out.
+   */
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  static int executeShutdown() {
+    int timeouts = 0;
+    for (HookEntry entry: MGR.getShutdownHooksInOrder()) {
+      Future<?> future = EXECUTOR.submit(entry.getHook());
+      try {
+        future.get(entry.getTimeout(), entry.getTimeUnit());
+      } catch (TimeoutException ex) {
+        timeouts++;
+        future.cancel(true);
+        LOG.warn("ShutdownHook '" + entry.getHook().getClass().
+            getSimpleName() + "' timeout, " + ex.toString(), ex);
+      } catch (Throwable ex) {
+        LOG.warn("ShutdownHook '" + entry.getHook().getClass().
+            getSimpleName() + "' failed, " + ex.toString(), ex);
+      }
+    }
+    return timeouts;
+  }
+
+  /**
+   * Shutdown the executor thread itself.
+   * @param conf the configuration containing the shutdown timeout setting.
+   */
+  private static void shutdownExecutor(final Configuration conf) {
+    try {
+      EXECUTOR.shutdown();
+      long shutdownTimeout = getShutdownTimeout(conf);
+      if (!EXECUTOR.awaitTermination(
+          shutdownTimeout,
+          TIME_UNIT_DEFAULT)) {
+        // timeout waiting for the
+        LOG.error("ShutdownHookManger shutdown forcefully after"
+            + " {} seconds.", shutdownTimeout);
+        EXECUTOR.shutdownNow();
+      }
+      LOG.debug("ShutdownHookManger completed shutdown.");
+    } catch (InterruptedException ex) {
+      // interrupted.
+      LOG.error("ShutdownHookManger interrupted while waiting for " +
+          "termination.", ex);
+      EXECUTOR.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
    * Return <code>ShutdownHookManager</code> singleton.
    *
    * @return <code>ShutdownHookManager</code> singleton.
    */
+  @InterfaceAudience.Public
   public static ShutdownHookManager get() {
     return MGR;
   }
 
   /**
+   * Get the shutdown timeout in seconds, from the supplied
+   * configuration.
+   * @param conf configuration to use.
+   * @return a timeout, always greater than or equal to {@link #TIMEOUT_MINIMUM}
+   */
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  static long getShutdownTimeout(Configuration conf) {
+    long duration = conf.getTimeDuration(
+        SERVICE_SHUTDOWN_TIMEOUT,
+        SERVICE_SHUTDOWN_TIMEOUT_DEFAULT,
+        TIME_UNIT_DEFAULT);
+    if (duration < TIMEOUT_MINIMUM) {
+      duration = TIMEOUT_MINIMUM;
+    }
+    return duration;
+  }
+
+  /**
    * Private structure to store ShutdownHook, its priority and timeout
    * settings.
    */
+  @InterfaceAudience.Private
+  @VisibleForTesting
   static class HookEntry {
     private final Runnable hook;
     private final int priority;
@@ -118,7 +203,9 @@ public class ShutdownHookManager {
     private final TimeUnit unit;
 
     HookEntry(Runnable hook, int priority) {
-      this(hook, priority, TIMEOUT_DEFAULT, TIME_UNIT_DEFAULT);
+      this(hook, priority,
+          getShutdownTimeout(new Configuration()),
+          TIME_UNIT_DEFAULT);
     }
 
     HookEntry(Runnable hook, int priority, long timeout, TimeUnit unit) {
@@ -176,10 +263,12 @@ public class ShutdownHookManager {
    *
    * @return the list of shutdownHooks in order of execution.
    */
+  @InterfaceAudience.Private
+  @VisibleForTesting
   List<HookEntry> getShutdownHooksInOrder() {
     List<HookEntry> list;
     synchronized (MGR.hooks) {
-      list = new ArrayList<HookEntry>(MGR.hooks);
+      list = new ArrayList<>(MGR.hooks);
     }
     Collections.sort(list, new Comparator<HookEntry>() {
 
@@ -200,6 +289,8 @@ public class ShutdownHookManager {
    * @param shutdownHook shutdownHook <code>Runnable</code>
    * @param priority priority of the shutdownHook.
    */
+  @InterfaceAudience.Public
+  @InterfaceStability.Stable
   public void addShutdownHook(Runnable shutdownHook, int priority) {
     if (shutdownHook == null) {
       throw new IllegalArgumentException("shutdownHook cannot be NULL");
@@ -223,6 +314,8 @@ public class ShutdownHookManager {
    * @param timeout timeout of the shutdownHook
    * @param unit unit of the timeout <code>TimeUnit</code>
    */
+  @InterfaceAudience.Public
+  @InterfaceStability.Stable
   public void addShutdownHook(Runnable shutdownHook, int priority, long timeout,
       TimeUnit unit) {
     if (shutdownHook == null) {
@@ -242,6 +335,8 @@ public class ShutdownHookManager {
    * @return TRUE if the shutdownHook was registered and removed,
    * FALSE otherwise.
    */
+  @InterfaceAudience.Public
+  @InterfaceStability.Stable
   public boolean removeShutdownHook(Runnable shutdownHook) {
     if (shutdownInProgress.get()) {
       throw new IllegalStateException("Shutdown in progress, cannot remove a " +
@@ -256,6 +351,8 @@ public class ShutdownHookManager {
    * @param shutdownHook shutdownHook to check if registered.
    * @return TRUE/FALSE depending if the shutdownHook is is registered.
    */
+  @InterfaceAudience.Public
+  @InterfaceStability.Stable
   public boolean hasShutdownHook(Runnable shutdownHook) {
     return hooks.contains(new HookEntry(shutdownHook, 0));
   }
@@ -265,6 +362,8 @@ public class ShutdownHookManager {
    * 
    * @return TRUE if the shutdown is in progress, otherwise FALSE.
    */
+  @InterfaceAudience.Public
+  @InterfaceStability.Stable
   public boolean isShutdownInProgress() {
     return shutdownInProgress.get();
   }
@@ -272,7 +371,9 @@ public class ShutdownHookManager {
   /**
    * clear all registered shutdownHooks.
    */
+  @InterfaceAudience.Public
+  @InterfaceStability.Stable
   public void clearShutdownHooks() {
     hooks.clear();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95fcdc04/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 14e4bc4..8c71b42 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -553,6 +553,22 @@
     </description>
 </property>
 
+  <property>
+    <name>hadoop.service.shutdown.timeout</name>
+    <value>30s</value>
+    <description>
+      Timeout to wait for each shutdown operation to complete.
+      If a hook takes longer than this time to complete, it will be interrupted,
+      so the service will shutdown. This allows the service shutdown
+      to recover from a blocked operation.
+      Some shutdown hooks may need more time than this, for example when
+      a large amount of data needs to be uploaded to an object store.
+      In this situation: increase the timeout.
+
+      The minimum duration of the timeout is 1 second, "1s".
+    </description>
+</property>
+
 <property>
   <name>hadoop.rpc.protection</name>
   <value>authentication</value>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95fcdc04/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownHookManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownHookManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownHookManager.java
index 2aa5e95..03fa903 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownHookManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownHookManager.java
@@ -17,97 +17,285 @@
  */
 package org.apache.hadoop.util;
 
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.slf4j.LoggerFactory;
-import org.junit.Assert;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
 import org.junit.Test;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
 
 import static java.lang.Thread.sleep;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.SERVICE_SHUTDOWN_TIMEOUT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.SERVICE_SHUTDOWN_TIMEOUT_DEFAULT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class TestShutdownHookManager {
+
   static final Logger LOG =
       LoggerFactory.getLogger(TestShutdownHookManager.class.getName());
 
+  /**
+   * remove all the shutdown hooks so that they never get invoked later
+   * on in this test process.
+   */
+  @After
+  public void clearShutdownHooks() {
+    ShutdownHookManager.get().clearShutdownHooks();
+  }
+
+  /**
+   * Verify hook registration, then execute the hook callback stage
+   * of shutdown to verify invocation, execution order and timeout
+   * processing.
+   */
   @Test
   public void shutdownHookManager() {
     ShutdownHookManager mgr = ShutdownHookManager.get();
-    Assert.assertNotNull(mgr);
-    Assert.assertEquals(0, mgr.getShutdownHooksInOrder().size());
-    Runnable hook1 = new Runnable() {
-      @Override
-      public void run() {
-        LOG.info("Shutdown hook1 complete.");
-      }
-    };
-    Runnable hook2 = new Runnable() {
-      @Override
-      public void run() {
-        LOG.info("Shutdown hook2 complete.");
-      }
-    };
-
-    Runnable hook3 = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          sleep(3000);
-          LOG.info("Shutdown hook3 complete.");
-        } catch (InterruptedException ex) {
-          LOG.info("Shutdown hook3 interrupted exception:",
-              ExceptionUtils.getStackTrace(ex));
-          Assert.fail("Hook 3 should not timeout.");
-        }
-      }
-    };
-
-    Runnable hook4 = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          sleep(3500);
-          LOG.info("Shutdown hook4 complete.");
-          Assert.fail("Hook 4 should timeout");
-        } catch (InterruptedException ex) {
-          LOG.info("Shutdown hook4 interrupted exception:",
-              ExceptionUtils.getStackTrace(ex));
-        }
-      }
-    };
+    assertNotNull("No ShutdownHookManager", mgr);
+    assertEquals(0, mgr.getShutdownHooksInOrder().size());
+    Hook hook1 = new Hook("hook1", 0, false);
+    Hook hook2 = new Hook("hook2", 0, false);
+    Hook hook3 = new Hook("hook3", 1000, false);
+    Hook hook4 = new Hook("hook4", 25000, true);
+    Hook hook5 = new Hook("hook5",
+        (SERVICE_SHUTDOWN_TIMEOUT_DEFAULT + 1) * 1000, true);
 
     mgr.addShutdownHook(hook1, 0);
-    Assert.assertTrue(mgr.hasShutdownHook(hook1));
-    Assert.assertEquals(1, mgr.getShutdownHooksInOrder().size());
-    Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(0).getHook());
-    mgr.removeShutdownHook(hook1);
-    Assert.assertFalse(mgr.hasShutdownHook(hook1));
+    assertTrue(mgr.hasShutdownHook(hook1));
+    assertEquals(1, mgr.getShutdownHooksInOrder().size());
+    assertEquals(hook1, mgr.getShutdownHooksInOrder().get(0).getHook());
+    assertTrue(mgr.removeShutdownHook(hook1));
+    assertFalse(mgr.hasShutdownHook(hook1));
+    assertFalse(mgr.removeShutdownHook(hook1));
 
     mgr.addShutdownHook(hook1, 0);
-    Assert.assertTrue(mgr.hasShutdownHook(hook1));
-    Assert.assertEquals(1, mgr.getShutdownHooksInOrder().size());
-    Assert.assertTrue(mgr.hasShutdownHook(hook1));
-    Assert.assertEquals(1, mgr.getShutdownHooksInOrder().size());
+    assertTrue(mgr.hasShutdownHook(hook1));
+    assertEquals(1, mgr.getShutdownHooksInOrder().size());
+    assertEquals(SERVICE_SHUTDOWN_TIMEOUT_DEFAULT,
+        mgr.getShutdownHooksInOrder().get(0).getTimeout());
 
     mgr.addShutdownHook(hook2, 1);
-    Assert.assertTrue(mgr.hasShutdownHook(hook1));
-    Assert.assertTrue(mgr.hasShutdownHook(hook2));
-    Assert.assertEquals(2, mgr.getShutdownHooksInOrder().size());
-    Assert.assertEquals(hook2, mgr.getShutdownHooksInOrder().get(0).getHook());
-    Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(1).getHook());
+    assertTrue(mgr.hasShutdownHook(hook1));
+    assertTrue(mgr.hasShutdownHook(hook2));
+    assertEquals(2, mgr.getShutdownHooksInOrder().size());
+    assertEquals(hook2, mgr.getShutdownHooksInOrder().get(0).getHook());
+    assertEquals(hook1, mgr.getShutdownHooksInOrder().get(1).getHook());
 
     // Test hook finish without timeout
     mgr.addShutdownHook(hook3, 2, 4, TimeUnit.SECONDS);
-    Assert.assertTrue(mgr.hasShutdownHook(hook3));
-    Assert.assertEquals(hook3, mgr.getShutdownHooksInOrder().get(0).getHook());
-    Assert.assertEquals(4, mgr.getShutdownHooksInOrder().get(0).getTimeout());
-
-    // Test hook finish with timeout
-    mgr.addShutdownHook(hook4, 3, 2, TimeUnit.SECONDS);
-    Assert.assertTrue(mgr.hasShutdownHook(hook4));
-    Assert.assertEquals(hook4, mgr.getShutdownHooksInOrder().get(0).getHook());
-    Assert.assertEquals(2, mgr.getShutdownHooksInOrder().get(0).getTimeout());
-    LOG.info("Shutdown starts here");
+    assertTrue(mgr.hasShutdownHook(hook3));
+    assertEquals(hook3, mgr.getShutdownHooksInOrder().get(0).getHook());
+    assertEquals(4, mgr.getShutdownHooksInOrder().get(0).getTimeout());
+
+    // Test hook finish with timeout; highest priority
+    int hook4timeout = 2;
+    mgr.addShutdownHook(hook4, 3, hook4timeout, TimeUnit.SECONDS);
+    assertTrue(mgr.hasShutdownHook(hook4));
+    assertEquals(hook4, mgr.getShutdownHooksInOrder().get(0).getHook());
+    assertEquals(2, mgr.getShutdownHooksInOrder().get(0).getTimeout());
+
+    // a default timeout hook and verify it gets the default timeout
+    mgr.addShutdownHook(hook5, 5);
+    ShutdownHookManager.HookEntry hookEntry5 = mgr.getShutdownHooksInOrder()
+        .get(0);
+    assertEquals(hook5, hookEntry5.getHook());
+    assertEquals("default timeout not used",
+        ShutdownHookManager.getShutdownTimeout(new Configuration()),
+        hookEntry5.getTimeout());
+    assertEquals("hook priority", 5, hookEntry5.getPriority());
+    // remove this to avoid a longer sleep in the test run
+    assertTrue("failed to remove " + hook5,
+        mgr.removeShutdownHook(hook5));
+
+
+    // now execute the hook shutdown sequence
+    INVOCATION_COUNT.set(0);
+    LOG.info("invoking executeShutdown()");
+    int timeouts = ShutdownHookManager.executeShutdown();
+    LOG.info("Shutdown completed");
+    assertEquals("Number of timed out hooks", 1, timeouts);
+
+    List<ShutdownHookManager.HookEntry> hooks
+        = mgr.getShutdownHooksInOrder();
+
+    // analyze the hooks
+    for (ShutdownHookManager.HookEntry entry : hooks) {
+      Hook hook = (Hook) entry.getHook();
+      assertTrue("Was not invoked " + hook, hook.invoked);
+      // did any hook raise an exception?
+      hook.maybeThrowAssertion();
+    }
+
+    // check the state of some of the invoked hooks
+    // hook4 was invoked first, but it timed out.
+    assertEquals("Expected to be invoked first " + hook4,
+        1, hook4.invokedOrder);
+    assertFalse("Expected to time out " + hook4, hook4.completed);
+
+
+    // hook1 completed, but in order after the others, so its start time
+    // is the longest.
+    assertTrue("Expected to complete " + hook1, hook1.completed);
+    long invocationInterval = hook1.startTime - hook4.startTime;
+    assertTrue("invocation difference too short " + invocationInterval,
+        invocationInterval >= hook4timeout * 1000);
+    assertTrue("sleeping hook4 blocked other threads for " + invocationInterval,
+        invocationInterval < hook4.sleepTime);
+
+    // finally, clear the hooks
+    mgr.clearShutdownHooks();
+    // and verify that the hooks are empty
+    assertFalse(mgr.hasShutdownHook(hook1));
+    assertEquals("shutdown hook list is not empty",
+        0,
+        mgr.getShutdownHooksInOrder().size());
+  }
+
+  @Test
+  public void testShutdownTimeoutConfiguration() throws Throwable {
+    // set the shutdown timeout and verify it can be read back.
+    Configuration conf = new Configuration();
+    long shutdownTimeout = 5;
+    conf.setTimeDuration(SERVICE_SHUTDOWN_TIMEOUT,
+        shutdownTimeout, TimeUnit.SECONDS);
+    assertEquals(SERVICE_SHUTDOWN_TIMEOUT,
+        shutdownTimeout,
+        ShutdownHookManager.getShutdownTimeout(conf));
+  }
+
+  /**
+   * Verify that low timeouts simply fall back to
+   * {@link ShutdownHookManager#TIMEOUT_MINIMUM}.
+   */
+  @Test
+  public void testShutdownTimeoutBadConfiguration() throws Throwable {
+    // set the shutdown timeout and verify it can be read back.
+    Configuration conf = new Configuration();
+    long shutdownTimeout = 50;
+    conf.setTimeDuration(SERVICE_SHUTDOWN_TIMEOUT,
+        shutdownTimeout, TimeUnit.NANOSECONDS);
+    assertEquals(SERVICE_SHUTDOWN_TIMEOUT,
+        ShutdownHookManager.TIMEOUT_MINIMUM,
+        ShutdownHookManager.getShutdownTimeout(conf));
+  }
+
+  /**
+   * Verifies that a hook cannot be re-registered: an attempt to do so
+   * will simply be ignored.
+   */
+  @Test
+  public void testDuplicateRegistration() throws Throwable {
+    ShutdownHookManager mgr = ShutdownHookManager.get();
+    Hook hook = new Hook("hook1", 0, false);
+
+    // add the hook
+    mgr.addShutdownHook(hook, 2, 1, TimeUnit.SECONDS);
+
+    // add it at a higher priority. This will be ignored.
+    mgr.addShutdownHook(hook, 5);
+    List<ShutdownHookManager.HookEntry> hookList
+        = mgr.getShutdownHooksInOrder();
+    assertEquals("Hook added twice", 1, hookList.size());
+    ShutdownHookManager.HookEntry entry = hookList.get(0);
+    assertEquals("priority of hook", 2, entry.getPriority());
+    assertEquals("timeout of hook", 1, entry.getTimeout());
+
+    // remove the hook
+    assertTrue("failed to remove hook " + hook, mgr.removeShutdownHook(hook));
+    // which will fail a second time
+    assertFalse("expected hook removal to fail", mgr.removeShutdownHook(hook));
+
+    // now register it
+    mgr.addShutdownHook(hook, 5);
+    hookList = mgr.getShutdownHooksInOrder();
+    entry = hookList.get(0);
+    assertEquals("priority of hook", 5, entry.getPriority());
+    assertNotEquals("timeout of hook", 1, entry.getTimeout());
+
+  }
+
+  private static final AtomicInteger INVOCATION_COUNT = new AtomicInteger();
+
+  /**
+   * Hooks for testing; save state for ease of asserting on
+   * invocation.
+   */
+  private class Hook implements Runnable {
+
+    private final String name;
+    private final long sleepTime;
+    private final boolean expectFailure;
+    private AssertionError assertion;
+    private boolean invoked;
+    private int invokedOrder;
+    private boolean completed;
+    private boolean interrupted;
+    private long startTime;
+
+    Hook(final String name,
+        final long sleepTime,
+        final boolean expectFailure) {
+      this.name = name;
+      this.sleepTime = sleepTime;
+      this.expectFailure = expectFailure;
+    }
+
+    @Override
+    public void run() {
+      try {
+        invoked = true;
+        invokedOrder = INVOCATION_COUNT.incrementAndGet();
+        startTime = System.currentTimeMillis();
+        LOG.info("Starting shutdown of {} with sleep time of {}",
+            name, sleepTime);
+        if (sleepTime > 0) {
+          sleep(sleepTime);
+        }
+        LOG.info("Completed shutdown of {}", name);
+        completed = true;
+        if (expectFailure) {
+          assertion = new AssertionError("Expected a failure of " + name);
+        }
+      } catch (InterruptedException ex) {
+        LOG.info("Shutdown {} interrupted exception", name, ex);
+        interrupted = true;
+        if (!expectFailure) {
+          assertion = new AssertionError("Timeout of " + name, ex);
+        }
+      }
+      maybeThrowAssertion();
+    }
+
+    /**
+     * Raise any exception generated during the shutdown process.
+     * @throws AssertionError any assertion from the shutdown.
+     */
+    void maybeThrowAssertion() throws AssertionError {
+      if (assertion != null) {
+        throw assertion;
+      }
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder("Hook{");
+      sb.append("name='").append(name).append('\'');
+      sb.append(", sleepTime=").append(sleepTime);
+      sb.append(", expectFailure=").append(expectFailure);
+      sb.append(", invoked=").append(invoked);
+      sb.append(", invokedOrder=").append(invokedOrder);
+      sb.append(", completed=").append(completed);
+      sb.append(", interrupted=").append(interrupted);
+      sb.append('}');
+      return sb.toString();
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org