You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/09/25 20:19:08 UTC

svn commit: r1390035 - in /hbase/trunk/hbase-common/src: main/java/org/apache/hadoop/hbase/util/Threads.java test/java/org/apache/hadoop/hbase/util/TestThreads.java

Author: larsh
Date: Tue Sep 25 18:19:08 2012
New Revision: 1390035

URL: http://svn.apache.org/viewvc?rev=1390035&view=rev
Log:
HBASE-6637 Missed new files

Added:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
    hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java

Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1390035&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java Tue Sep 25 18:19:08 2012
@@ -0,0 +1,232 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.PrintWriter;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Thread Utility
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class Threads {
+  protected static final Log LOG = LogFactory.getLog(Threads.class);
+  private static final AtomicInteger poolNumber = new AtomicInteger(1);
+  /**
+   * Utility method that sets name, daemon status and starts passed thread.
+   * @param t thread to run
+   * @return Returns the passed Thread <code>t</code>.
+   */
+  public static Thread setDaemonThreadRunning(final Thread t) {
+    return setDaemonThreadRunning(t, t.getName());
+  }
+
+  /**
+   * Utility method that sets name, daemon status and starts passed thread.
+   * @param t thread to frob
+   * @param name new name
+   * @return Returns the passed Thread <code>t</code>.
+   */
+  public static Thread setDaemonThreadRunning(final Thread t,
+    final String name) {
+    return setDaemonThreadRunning(t, name, null);
+  }
+
+  /**
+   * Utility method that sets name, daemon status and starts passed thread.
+   * @param t thread to frob
+   * @param name new name
+   * @param handler A handler to set on the thread.  Pass null if want to
+   * use default handler.
+   * @return Returns the passed Thread <code>t</code>.
+   */
+  public static Thread setDaemonThreadRunning(final Thread t,
+    final String name, final UncaughtExceptionHandler handler) {
+    t.setName(name);
+    if (handler != null) {
+      t.setUncaughtExceptionHandler(handler);
+    }
+    t.setDaemon(true);
+    t.start();
+    return t;
+  }
+
+  /**
+   * Shutdown passed thread using isAlive and join.
+   * @param t Thread to shutdown
+   */
+  public static void shutdown(final Thread t) {
+    shutdown(t, 0);
+  }
+
+  /**
+   * Shutdown passed thread using isAlive and join.
+   * @param joinwait Pass 0 if we're to wait forever.
+   * @param t Thread to shutdown
+   */
+  public static void shutdown(final Thread t, final long joinwait) {
+    if (t == null) return;
+    while (t.isAlive()) {
+      try {
+        t.join(joinwait);
+      } catch (InterruptedException e) {
+        LOG.warn(t.getName() + "; joinwait=" + joinwait, e);
+      }
+    }
+  }
+
+
+  /**
+   * @param t Waits on the passed thread to die dumping a threaddump every
+   * minute while its up.
+   * @throws InterruptedException
+   */
+  public static void threadDumpingIsAlive(final Thread t)
+  throws InterruptedException {
+    if (t == null) {
+      return;
+    }
+
+    while (t.isAlive()) {
+      t.join(60 * 1000);
+      if (t.isAlive()) {
+        ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
+            "Automatic Stack Trace every 60 seconds waiting on " +
+            t.getName());
+      }
+    }
+  }
+
+  /**
+   * @param millis How long to sleep for in milliseconds.
+   */
+  public static void sleep(long millis) {
+    try {
+      Thread.sleep(millis);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Sleeps for the given amount of time even if interrupted. Preserves
+   * the interrupt status.
+   * @param msToWait the amount of time to sleep in milliseconds
+   */
+  public static void sleepWithoutInterrupt(final long msToWait) {
+    long timeMillis = System.currentTimeMillis();
+    long endTime = timeMillis + msToWait;
+    boolean interrupted = false;
+    while (timeMillis < endTime) {
+      try {
+        Thread.sleep(endTime - timeMillis);
+      } catch (InterruptedException ex) {
+        interrupted = true;
+      }
+      timeMillis = System.currentTimeMillis();
+    }
+
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * Create a new CachedThreadPool with a bounded number as the maximum 
+   * thread size in the pool.
+   * 
+   * @param maxCachedThread the maximum thread could be created in the pool
+   * @param timeout the maximum time to wait
+   * @param unit the time unit of the timeout argument
+   * @param threadFactory the factory to use when creating new threads
+   * @return threadPoolExecutor the cachedThreadPool with a bounded number 
+   * as the maximum thread size in the pool. 
+   */
+  public static ThreadPoolExecutor getBoundedCachedThreadPool(
+      int maxCachedThread, long timeout, TimeUnit unit,
+      ThreadFactory threadFactory) {
+    ThreadPoolExecutor boundedCachedThreadPool =
+      new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout,
+        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
+    // allow the core pool threads timeout and terminate
+    boundedCachedThreadPool.allowCoreThreadTimeOut(true);
+    return boundedCachedThreadPool;
+  }
+  
+  
+  /**
+   * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
+   * with a common prefix.
+   * @param prefix The prefix of every created Thread's name
+   * @return a {@link java.util.concurrent.ThreadFactory} that names threads
+   */
+  public static ThreadFactory getNamedThreadFactory(final String prefix) {
+    SecurityManager s = System.getSecurityManager();
+    final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
+        .getThreadGroup();
+
+    return new ThreadFactory() {
+      final AtomicInteger threadNumber = new AtomicInteger(1);
+      private final int poolNumber = Threads.poolNumber.getAndIncrement();
+      final ThreadGroup group = threadGroup;
+
+      @Override
+      public Thread newThread(Runnable r) {
+        final String name = prefix + "-pool-" + poolNumber + "-thread-"
+            + threadNumber.getAndIncrement();
+        return new Thread(group, r, name);
+      }
+    };
+  }
+
+  /**
+   * Get a named {@link ThreadFactory} that just builds daemon threads
+   * @param prefix name prefix for all threads created from the factory
+   * @return a thread factory that creates named, daemon threads
+   */
+  public static ThreadFactory newDaemonThreadFactory(final String prefix) {
+    final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
+    return new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = namedFactory.newThread(r);
+        if (!t.isDaemon()) {
+          t.setDaemon(true);
+        }
+        if (t.getPriority() != Thread.NORM_PRIORITY) {
+          t.setPriority(Thread.NORM_PRIORITY);
+        }
+        return t;
+      }
+
+    };
+  }
+}

Added: hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java?rev=1390035&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java (added)
+++ hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java Tue Sep 25 18:19:08 2012
@@ -0,0 +1,74 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+public class TestThreads {
+  private static final Log LOG = LogFactory.getLog(TestThreads.class);
+
+  private static final int SLEEP_TIME_MS = 5000;
+  private static final int TOLERANCE_MS = (int) (0.05 * SLEEP_TIME_MS);
+
+  private volatile boolean wasInterrupted;
+
+  @Test(timeout=6000)
+  public void testSleepWithoutInterrupt() throws InterruptedException {
+    Thread sleeper = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        LOG.debug("Sleeper thread: sleeping for " + SLEEP_TIME_MS);
+        Threads.sleepWithoutInterrupt(SLEEP_TIME_MS);
+        LOG.debug("Sleeper thread: finished sleeping");
+        wasInterrupted = Thread.currentThread().isInterrupted();
+      }
+    });
+    LOG.debug("Starting sleeper thread (" + SLEEP_TIME_MS + " ms)");
+    sleeper.start();
+    long startTime = System.currentTimeMillis();
+    LOG.debug("Main thread: sleeping for 500 ms");
+    Threads.sleep(500);
+
+    LOG.debug("Interrupting the sleeper thread and sleeping for 2000 ms");
+    sleeper.interrupt();
+    Threads.sleep(2000);
+
+    LOG.debug("Interrupting the sleeper thread and sleeping for 1000 ms");
+    sleeper.interrupt();
+    Threads.sleep(1000);
+
+    LOG.debug("Interrupting the sleeper thread again");
+    sleeper.interrupt();
+    sleeper.join();
+
+    assertTrue("sleepWithoutInterrupt did not preserve the thread's " +
+        "interrupted status", wasInterrupted);
+
+    long timeElapsed = System.currentTimeMillis() - startTime;
+    assertTrue("Elapsed time " + timeElapsed + " ms is out of the expected " +
+        "range of the sleep time " + SLEEP_TIME_MS,
+        Math.abs(timeElapsed - SLEEP_TIME_MS) < TOLERANCE_MS);
+    LOG.debug("Target sleep time: " + SLEEP_TIME_MS + ", time elapsed: " +
+        timeElapsed);
+  }
+}
\ No newline at end of file