You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/16 10:44:26 UTC

kafka git commit: MINOR: Consolidate Utils.newThread, Utils.daemonThread and KafkaThread

Repository: kafka
Updated Branches:
  refs/heads/trunk 54a3718a9 -> 6f1d4c693


MINOR: Consolidate Utils.newThread, Utils.daemonThread and KafkaThread

Removed the first two in favour of the latter.

Author: Kamal C <ka...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3350 from Kamal15/utilcleanup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6f1d4c69
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6f1d4c69
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6f1d4c69

Branch: refs/heads/trunk
Commit: 6f1d4c693307d1c5d1a178d56e403bb7084d537f
Parents: 54a3718
Author: Kamal C <ka...@gmail.com>
Authored: Fri Jun 16 11:43:03 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Jun 16 11:43:10 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/common/metrics/Metrics.java    |  3 ++-
 .../common/security/kerberos/KerberosLogin.java |  6 ++---
 .../apache/kafka/common/utils/KafkaThread.java  | 12 +++++++--
 .../org/apache/kafka/common/utils/Shell.java    |  4 +--
 .../org/apache/kafka/common/utils/Utils.java    | 28 --------------------
 .../main/scala/kafka/network/SocketServer.scala |  8 +++---
 .../kafka/server/KafkaRequestHandler.scala      |  4 +--
 .../scala/kafka/tools/SimpleConsumerShell.scala |  6 ++---
 core/src/main/scala/kafka/utils/CoreUtils.scala |  4 +--
 .../main/scala/kafka/utils/KafkaScheduler.scala |  4 +--
 .../main/scala/kafka/utils/timer/Timer.scala    |  4 +--
 11 files changed, 32 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index c4cd676..0b4507b 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.metrics;
 
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -142,7 +143,7 @@ public class Metrics implements Closeable {
             // Creating a daemon thread to not block shutdown
             this.metricsScheduler.setThreadFactory(new ThreadFactory() {
                 public Thread newThread(Runnable runnable) {
-                    return Utils.newThread("SensorExpiryThread", runnable, true);
+                    return KafkaThread.daemon("SensorExpiryThread", runnable);
                 }
             });
             this.metricsScheduler.scheduleAtFixedRate(new ExpireSensorTask(), 30, 30, TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index fe30a01..a0aad54 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -27,9 +27,9 @@ import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.security.authenticator.AbstractLogin;
 import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.Shell;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,7 +129,7 @@ public class KerberosLogin extends AbstractLogin {
         // TGT's existing expiry date and the configured minTimeBeforeRelogin. For testing and development,
         // you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running:
         //  "modprinc -maxlife 3mins <principal>" in kadmin.
-        t = Utils.newThread(String.format("kafka-kerberos-refresh-thread-%s", principal), new Runnable() {
+        t = KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s", principal), new Runnable() {
             public void run() {
                 log.info("[Principal={}]: TGT refresh thread started.", principal);
                 while (true) {  // renewal thread's main loop. if it exits from here, thread will exit.
@@ -253,7 +253,7 @@ public class KerberosLogin extends AbstractLogin {
                     }
                 }
             }
-        }, true);
+        });
         t.start();
         return loginContext;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
index 3eb025b..dd634a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
@@ -25,6 +25,14 @@ import org.slf4j.LoggerFactory;
 public class KafkaThread extends Thread {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
+    
+    public static KafkaThread daemon(final String name, Runnable runnable) {
+        return new KafkaThread(name, runnable, true);
+    }
+
+    public static KafkaThread nonDaemon(final String name, Runnable runnable) {
+        return new KafkaThread(name, runnable, false);
+    }
 
     public KafkaThread(final String name, boolean daemon) {
         super(name);
@@ -38,9 +46,9 @@ public class KafkaThread extends Thread {
 
     private void configureThread(final String name, boolean daemon) {
         setDaemon(daemon);
-        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+        setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
             public void uncaughtException(Thread t, Throwable e) {
-                log.error("Uncaught exception in " + name + ": ", e);
+                log.error("Uncaught exception in thread '{}':", name, e);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/clients/src/main/java/org/apache/kafka/common/utils/Shell.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Shell.java b/clients/src/main/java/org/apache/kafka/common/utils/Shell.java
index 33bf3c1..ebfd0ba 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Shell.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Shell.java
@@ -96,7 +96,7 @@ abstract public class Shell {
 
         // read error and input streams as this would free up the buffers
         // free the error stream buffer
-        Thread errThread = Utils.newThread("kafka-shell-thread", new Runnable() {
+        Thread errThread = KafkaThread.nonDaemon("kafka-shell-thread", new Runnable() {
             @Override
             public void run() {
                 try {
@@ -110,7 +110,7 @@ abstract public class Shell {
                     LOG.warn("Error reading the error stream", ioe);
                 }
             }
-        }, false);
+        });
         errThread.start();
 
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 9fbc387..21fbaf4 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -437,34 +437,6 @@ public class Utils {
     }
 
     /**
-     * Create a new thread
-     * @param name The name of the thread
-     * @param runnable The work for the thread to do
-     * @param daemon Should the thread block JVM shutdown?
-     * @return The unstarted thread
-     */
-    public static Thread newThread(String name, Runnable runnable, boolean daemon) {
-        Thread thread = new Thread(runnable, name);
-        thread.setDaemon(daemon);
-        thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-            public void uncaughtException(Thread t, Throwable e) {
-                log.error("Uncaught exception in thread '{}':", t.getName(), e);
-            }
-        });
-        return thread;
-    }
-
-    /**
-     * Create a daemon thread
-     * @param name The name of the thread
-     * @param runnable The runnable to execute in the background
-     * @return The unstarted thread
-     */
-    public static Thread daemonThread(String name, Runnable runnable) {
-        return newThread(name, runnable, true);
-    }
-
-    /**
      * Print an error message and shutdown the JVM
      * @param message The error message
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 414557e..9088eb5 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerN
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.protocol.types.SchemaException
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.{KafkaThread, Time}
 
 import scala.collection._
 import JavaConverters._
@@ -91,7 +91,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
         val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
           processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
         acceptors.put(endpoint, acceptor)
-        Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()
+        KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
         acceptor.awaitStartup()
 
         processorBeginIndex = processorEndIndex
@@ -253,8 +253,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 
   this.synchronized {
     processors.foreach { processor =>
-      Utils.newThread(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
-        processor, false).start()
+      KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
+        processor).start()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 9983e3d..feb07b8 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import com.yammer.metrics.core.Meter
 import org.apache.kafka.common.internals.FatalExitError
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.{KafkaThread, Time}
 
 /**
  * A thread that answers kafka requests.
@@ -92,7 +92,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
   val runnables = new Array[KafkaRequestHandler](numThreads)
   for(i <- 0 until numThreads) {
     runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
-    Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start()
+    KafkaThread.daemon("kafka-request-handler-" + i, runnables(i)).start()
   }
 
   def shutdown() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index b3643a3..da8b698 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -27,7 +27,7 @@ import kafka.cluster.BrokerEndPoint
 import scala.collection.JavaConverters._
 import kafka.common.{MessageFormatter, TopicAndPartition}
 import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{KafkaThread, Utils}
 
 /**
  * Command line program to dump out messages to standard out using the simple consumer
@@ -201,7 +201,7 @@ object SimpleConsumerShell extends Logging {
     val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host,
                                             fetchTargetBroker.port,
                                             10000, 64*1024, clientId)
-    val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
+    val thread = KafkaThread.nonDaemon("kafka-simpleconsumer-shell", new Runnable() {
       def run() {
         var offset = startingOffset
         var numMessagesConsumed = 0
@@ -253,7 +253,7 @@ object SimpleConsumerShell extends Logging {
           info(s"Consumed $numMessagesConsumed messages")
         }
       }
-    }, false)
+    })
     thread.start()
     thread.join()
     System.out.flush()

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 9ac95b5..0e8855c 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -32,7 +32,7 @@ import scala.collection._
 import scala.collection.mutable
 import kafka.cluster.EndPoint
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{KafkaThread, Utils}
 
 /**
  * General helper functions!
@@ -66,7 +66,7 @@ object CoreUtils extends Logging {
     * @return The unstarted thread
     */
   def newThread(name: String, daemon: Boolean)(fun: => Unit): Thread =
-    Utils.newThread(name, runnable(fun), daemon)
+    new KafkaThread(name, runnable(fun), daemon)
 
   /**
    * Do the given action and log any exceptions thrown without rethrowing them

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/core/src/main/scala/kafka/utils/KafkaScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index ef60c45..8e130cf 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -19,7 +19,7 @@ package kafka.utils
 
 import java.util.concurrent._
 import atomic._
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.KafkaThread
 
 /**
  * A scheduler for running jobs
@@ -81,7 +81,7 @@ class KafkaScheduler(val threads: Int,
       executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
       executor.setThreadFactory(new ThreadFactory() {
                                   def newThread(runnable: Runnable): Thread = 
-                                    Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
+                                    new KafkaThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
                                 })
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/core/src/main/scala/kafka/utils/timer/Timer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala
index 0538271..ae8caff 100644
--- a/core/src/main/scala/kafka/utils/timer/Timer.scala
+++ b/core/src/main/scala/kafka/utils/timer/Timer.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import kafka.utils.threadsafe
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.{KafkaThread, Time}
 
 trait Timer {
   /**
@@ -60,7 +60,7 @@ class SystemTimer(executorName: String,
   // timeout timer
   private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
     def newThread(runnable: Runnable): Thread =
-      Utils.newThread("executor-"+executorName, runnable, false)
+      KafkaThread.nonDaemon("executor-"+executorName, runnable)
   })
 
   private[this] val delayQueue = new DelayQueue[TimerTaskList]()