You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/16 10:44:26 UTC
kafka git commit: MINOR: Consolidate Utils.newThread,
Utils.daemonThread and KafkaThread
Repository: kafka
Updated Branches:
refs/heads/trunk 54a3718a9 -> 6f1d4c693
MINOR: Consolidate Utils.newThread, Utils.daemonThread and KafkaThread
Removed the first two in favour of the latter.
Author: Kamal C <ka...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3350 from Kamal15/utilcleanup
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6f1d4c69
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6f1d4c69
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6f1d4c69
Branch: refs/heads/trunk
Commit: 6f1d4c693307d1c5d1a178d56e403bb7084d537f
Parents: 54a3718
Author: Kamal C <ka...@gmail.com>
Authored: Fri Jun 16 11:43:03 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Jun 16 11:43:10 2017 +0100
----------------------------------------------------------------------
.../apache/kafka/common/metrics/Metrics.java | 3 ++-
.../common/security/kerberos/KerberosLogin.java | 6 ++---
.../apache/kafka/common/utils/KafkaThread.java | 12 +++++++--
.../org/apache/kafka/common/utils/Shell.java | 4 +--
.../org/apache/kafka/common/utils/Utils.java | 28 --------------------
.../main/scala/kafka/network/SocketServer.scala | 8 +++---
.../kafka/server/KafkaRequestHandler.scala | 4 +--
.../scala/kafka/tools/SimpleConsumerShell.scala | 6 ++---
core/src/main/scala/kafka/utils/CoreUtils.scala | 4 +--
.../main/scala/kafka/utils/KafkaScheduler.scala | 4 +--
.../main/scala/kafka/utils/timer/Timer.scala | 4 +--
11 files changed, 32 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index c4cd676..0b4507b 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@@ -142,7 +143,7 @@ public class Metrics implements Closeable {
// Creating a daemon thread to not block shutdown
this.metricsScheduler.setThreadFactory(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
- return Utils.newThread("SensorExpiryThread", runnable, true);
+ return KafkaThread.daemon("SensorExpiryThread", runnable);
}
});
this.metricsScheduler.scheduleAtFixedRate(new ExpireSensorTask(), 30, 30, TimeUnit.SECONDS);
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index fe30a01..a0aad54 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -27,9 +27,9 @@ import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.security.authenticator.AbstractLogin;
import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Shell;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,7 +129,7 @@ public class KerberosLogin extends AbstractLogin {
// TGT's existing expiry date and the configured minTimeBeforeRelogin. For testing and development,
// you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running:
// "modprinc -maxlife 3mins <principal>" in kadmin.
- t = Utils.newThread(String.format("kafka-kerberos-refresh-thread-%s", principal), new Runnable() {
+ t = KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s", principal), new Runnable() {
public void run() {
log.info("[Principal={}]: TGT refresh thread started.", principal);
while (true) { // renewal thread's main loop. if it exits from here, thread will exit.
@@ -253,7 +253,7 @@ public class KerberosLogin extends AbstractLogin {
}
}
}
- }, true);
+ });
t.start();
return loginContext;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
index 3eb025b..dd634a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
@@ -25,6 +25,14 @@ import org.slf4j.LoggerFactory;
public class KafkaThread extends Thread {
private final Logger log = LoggerFactory.getLogger(getClass());
+
+ public static KafkaThread daemon(final String name, Runnable runnable) {
+ return new KafkaThread(name, runnable, true);
+ }
+
+ public static KafkaThread nonDaemon(final String name, Runnable runnable) {
+ return new KafkaThread(name, runnable, false);
+ }
public KafkaThread(final String name, boolean daemon) {
super(name);
@@ -38,9 +46,9 @@ public class KafkaThread extends Thread {
private void configureThread(final String name, boolean daemon) {
setDaemon(daemon);
- setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
- log.error("Uncaught exception in " + name + ": ", e);
+ log.error("Uncaught exception in thread '{}':", name, e);
}
});
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/clients/src/main/java/org/apache/kafka/common/utils/Shell.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Shell.java b/clients/src/main/java/org/apache/kafka/common/utils/Shell.java
index 33bf3c1..ebfd0ba 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Shell.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Shell.java
@@ -96,7 +96,7 @@ abstract public class Shell {
// read error and input streams as this would free up the buffers
// free the error stream buffer
- Thread errThread = Utils.newThread("kafka-shell-thread", new Runnable() {
+ Thread errThread = KafkaThread.nonDaemon("kafka-shell-thread", new Runnable() {
@Override
public void run() {
try {
@@ -110,7 +110,7 @@ abstract public class Shell {
LOG.warn("Error reading the error stream", ioe);
}
}
- }, false);
+ });
errThread.start();
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 9fbc387..21fbaf4 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -437,34 +437,6 @@ public class Utils {
}
/**
- * Create a new thread
- * @param name The name of the thread
- * @param runnable The work for the thread to do
- * @param daemon Should the thread block JVM shutdown?
- * @return The unstarted thread
- */
- public static Thread newThread(String name, Runnable runnable, boolean daemon) {
- Thread thread = new Thread(runnable, name);
- thread.setDaemon(daemon);
- thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- public void uncaughtException(Thread t, Throwable e) {
- log.error("Uncaught exception in thread '{}':", t.getName(), e);
- }
- });
- return thread;
- }
-
- /**
- * Create a daemon thread
- * @param name The name of the thread
- * @param runnable The runnable to execute in the background
- * @return The unstarted thread
- */
- public static Thread daemonThread(String name, Runnable runnable) {
- return newThread(name, runnable, true);
- }
-
- /**
* Print an error message and shutdown the JVM
* @param message The error message
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 414557e..9088eb5 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerN
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.protocol.types.SchemaException
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.{KafkaThread, Time}
import scala.collection._
import JavaConverters._
@@ -91,7 +91,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
acceptors.put(endpoint, acceptor)
- Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()
+ KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
acceptor.awaitStartup()
processorBeginIndex = processorEndIndex
@@ -253,8 +253,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
this.synchronized {
processors.foreach { processor =>
- Utils.newThread(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
- processor, false).start()
+ KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
+ processor).start()
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 9983e3d..feb07b8 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import com.yammer.metrics.core.Meter
import org.apache.kafka.common.internals.FatalExitError
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.{KafkaThread, Time}
/**
* A thread that answers kafka requests.
@@ -92,7 +92,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
- Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start()
+ KafkaThread.daemon("kafka-request-handler-" + i, runnables(i)).start()
}
def shutdown() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index b3643a3..da8b698 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -27,7 +27,7 @@ import kafka.cluster.BrokerEndPoint
import scala.collection.JavaConverters._
import kafka.common.{MessageFormatter, TopicAndPartition}
import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{KafkaThread, Utils}
/**
* Command line program to dump out messages to standard out using the simple consumer
@@ -201,7 +201,7 @@ object SimpleConsumerShell extends Logging {
val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host,
fetchTargetBroker.port,
10000, 64*1024, clientId)
- val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
+ val thread = KafkaThread.nonDaemon("kafka-simpleconsumer-shell", new Runnable() {
def run() {
var offset = startingOffset
var numMessagesConsumed = 0
@@ -253,7 +253,7 @@ object SimpleConsumerShell extends Logging {
info(s"Consumed $numMessagesConsumed messages")
}
}
- }, false)
+ })
thread.start()
thread.join()
System.out.flush()
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 9ac95b5..0e8855c 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -32,7 +32,7 @@ import scala.collection._
import scala.collection.mutable
import kafka.cluster.EndPoint
import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{KafkaThread, Utils}
/**
* General helper functions!
@@ -66,7 +66,7 @@ object CoreUtils extends Logging {
* @return The unstarted thread
*/
def newThread(name: String, daemon: Boolean)(fun: => Unit): Thread =
- Utils.newThread(name, runnable(fun), daemon)
+ new KafkaThread(name, runnable(fun), daemon)
/**
* Do the given action and log any exceptions thrown without rethrowing them
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/core/src/main/scala/kafka/utils/KafkaScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index ef60c45..8e130cf 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -19,7 +19,7 @@ package kafka.utils
import java.util.concurrent._
import atomic._
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.KafkaThread
/**
* A scheduler for running jobs
@@ -81,7 +81,7 @@ class KafkaScheduler(val threads: Int,
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
executor.setThreadFactory(new ThreadFactory() {
def newThread(runnable: Runnable): Thread =
- Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
+ new KafkaThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
})
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f1d4c69/core/src/main/scala/kafka/utils/timer/Timer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala
index 0538271..ae8caff 100644
--- a/core/src/main/scala/kafka/utils/timer/Timer.scala
+++ b/core/src/main/scala/kafka/utils/timer/Timer.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.utils.threadsafe
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.{KafkaThread, Time}
trait Timer {
/**
@@ -60,7 +60,7 @@ class SystemTimer(executorName: String,
// timeout timer
private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
def newThread(runnable: Runnable): Thread =
- Utils.newThread("executor-"+executorName, runnable, false)
+ KafkaThread.nonDaemon("executor-"+executorName, runnable)
})
private[this] val delayQueue = new DelayQueue[TimerTaskList]()