You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/09 07:38:57 UTC

[GitHub] [kafka] tombentley opened a new pull request #11006: KAFKA-13049: Name the threads used for log recovery

tombentley opened a new pull request #11006:
URL: https://github.com/apache/kafka/pull/11006


   See [KAFKA-13049](https://issues.apache.org/jira/browse/KAFKA-13049)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #11006: KAFKA-13049: Name the threads used for log recovery

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #11006:
URL: https://github.com/apache/kafka/pull/11006#discussion_r668466658



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -312,7 +312,15 @@ class LogManager(logDirs: Seq[File],
       val logDirAbsolutePath = dir.getAbsolutePath
       var hadCleanShutdown: Boolean = false
       try {
-        val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
+        val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, new ThreadFactory {
+          private val factory = Executors.defaultThreadFactory()
+          private val threadNumber = new AtomicInteger(1)

Review comment:
       Only as a means of having unique names when >1 thread per log dir. You can't really infer anything from the number though, so I could remove it if you want.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11006: KAFKA-13049: Name the threads used for log recovery

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11006:
URL: https://github.com/apache/kafka/pull/11006#discussion_r666913262



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -312,7 +312,15 @@ class LogManager(logDirs: Seq[File],
       val logDirAbsolutePath = dir.getAbsolutePath
       var hadCleanShutdown: Boolean = false
       try {
-        val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
+        val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, new ThreadFactory {
+          private val factory = Executors.defaultThreadFactory()
+          private val threadNumber = new AtomicInteger(1)
+          override def newThread(r: Runnable): Thread = {
+            val thread = factory.newThread(r)
+            thread.setName(s"log-recovery(${dir.getAbsolutePath}, ${threadNumber.getAndIncrement()})")

Review comment:
       we can replace `dir.getAbsolutePath` with local variable `logDirAbsolutePath`

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -312,7 +312,15 @@ class LogManager(logDirs: Seq[File],
       val logDirAbsolutePath = dir.getAbsolutePath
       var hadCleanShutdown: Boolean = false
       try {
-        val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
+        val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, new ThreadFactory {
+          private val factory = Executors.defaultThreadFactory()
+          private val threadNumber = new AtomicInteger(1)
+          override def newThread(r: Runnable): Thread = {
+            val thread = factory.newThread(r)
+            thread.setName(s"log-recovery(${dir.getAbsolutePath}, ${threadNumber.getAndIncrement()})")

Review comment:
       nit: Do you think it is better to use dash (-) to concat path and threadNumber?
   s"log-recovery(${dir.getAbsolutePath}**-**${threadNumber.getAndIncrement()})"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #11006: KAFKA-13049: Name the threads used for log recovery

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #11006:
URL: https://github.com/apache/kafka/pull/11006#discussion_r667692668



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -312,7 +312,15 @@ class LogManager(logDirs: Seq[File],
       val logDirAbsolutePath = dir.getAbsolutePath
       var hadCleanShutdown: Boolean = false
       try {
-        val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
+        val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, new ThreadFactory {
+          private val factory = Executors.defaultThreadFactory()
+          private val threadNumber = new AtomicInteger(1)
+          override def newThread(r: Runnable): Thread = {
+            val thread = factory.newThread(r)
+            thread.setName(s"log-recovery(${dir.getAbsolutePath}, ${threadNumber.getAndIncrement()})")

Review comment:
       `logDirAbsolutePath`: Thanks!
   
   better to use dash (-): I don't have a very strong opinion, except dash is more commonly used in a path name than comma, so if the log dir names were like `data`, `data-2`, then it's a bit confusing because the threads could be named like `data-1`, `data-2`, `data-2-1`, `data-2-2` (assuming >=2 threads per log dir), so people might see some problem associated with the thread `data-2` and think that was about the `data-2` directory, when it's really the 2nd thread processing the `data` directory. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #11006: KAFKA-13049: Name the threads used for log recovery

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #11006:
URL: https://github.com/apache/kafka/pull/11006#discussion_r668398858



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -312,7 +312,15 @@ class LogManager(logDirs: Seq[File],
       val logDirAbsolutePath = dir.getAbsolutePath
       var hadCleanShutdown: Boolean = false
       try {
-        val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
+        val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, new ThreadFactory {
+          private val factory = Executors.defaultThreadFactory()
+          private val threadNumber = new AtomicInteger(1)

Review comment:
       Why we need this counter? It seems to me `log-recovery` is good enough.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -312,7 +312,15 @@ class LogManager(logDirs: Seq[File],
       val logDirAbsolutePath = dir.getAbsolutePath
       var hadCleanShutdown: Boolean = false
       try {
-        val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
+        val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, new ThreadFactory {

Review comment:
       line#495 (`shutdown`) creates thread poll also. Does it need a better naming?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #11006: KAFKA-13049: Name the threads used for log recovery

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #11006:
URL: https://github.com/apache/kafka/pull/11006#issuecomment-879596800


   Test failures are unrelated. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #11006: KAFKA-13049: Name the threads used for log recovery

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #11006:
URL: https://github.com/apache/kafka/pull/11006#discussion_r668859528



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -406,6 +398,18 @@ class LogManager(logDirs: Seq[File],
     info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")
   }
 
+  private def logDirExecutor(logDirAbsolutePath: String, threadNamePrefix: String): ExecutorService = {
+    Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, new ThreadFactory {

Review comment:
       How about using following style?
   ```scala
   Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, KafkaThread.nonDaemon(s"$threadNamePrefix-$logDirAbsolutePath", _))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #11006: KAFKA-13049: Name the threads used for log recovery

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #11006:
URL: https://github.com/apache/kafka/pull/11006#issuecomment-878160534


   The test failures are unrelated.
   
   @chia7712 would you be able to review? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #11006: KAFKA-13049: Name the threads used for log recovery

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #11006:
URL: https://github.com/apache/kafka/pull/11006#discussion_r668471349



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -312,7 +312,15 @@ class LogManager(logDirs: Seq[File],
       val logDirAbsolutePath = dir.getAbsolutePath
       var hadCleanShutdown: Boolean = false
       try {
-        val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
+        val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, new ThreadFactory {

Review comment:
       Yes, thanks! I've now factored out a common method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on pull request #11006: KAFKA-13049: Name the threads used for log recovery

Posted by GitBox <gi...@apache.org>.
tombentley commented on pull request #11006:
URL: https://github.com/apache/kafka/pull/11006#issuecomment-879198054


   Will merge on green CI.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley merged pull request #11006: KAFKA-13049: Name the threads used for log recovery

Posted by GitBox <gi...@apache.org>.
tombentley merged pull request #11006:
URL: https://github.com/apache/kafka/pull/11006


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #11006: KAFKA-13049: Name the threads used for log recovery

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #11006:
URL: https://github.com/apache/kafka/pull/11006#discussion_r668888179



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -406,6 +398,18 @@ class LogManager(logDirs: Seq[File],
     info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")
   }
 
+  private def logDirExecutor(logDirAbsolutePath: String, threadNamePrefix: String): ExecutorService = {
+    Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir, new ThreadFactory {

Review comment:
       Yeah, that's much better, thanks! I've inlined the method because it didn't add much. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org