You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "junrao (via GitHub)" <gi...@apache.org> on 2023/04/05 20:24:32 UTC

[GitHub] [kafka] junrao commented on a diff in pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager

junrao commented on code in PR #13487:
URL: https://github.com/apache/kafka/pull/13487#discussion_r1158975829


##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -17,39 +17,97 @@
 package kafka.log.remote
 
 import kafka.cluster.Partition
+import kafka.log.UnifiedLog
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
 import org.apache.kafka.common._
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
-import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, KafkaThread, Time, Utils}
+import org.apache.kafka.server.common.CheckpointFile.CheckpointWriteBuffer
 import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
 import org.apache.kafka.server.log.remote.storage._
+import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
+import org.apache.kafka.storage.internals.log.EpochEntry
 
-import java.io.{Closeable, InputStream}
+import java.io._
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.nio.file.Path
 import java.security.{AccessController, PrivilegedAction}
-import java.util
 import java.util.Optional
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
+import java.{lang, util}
+import scala.collection.Searching._
 import scala.collection.Set
 import scala.jdk.CollectionConverters._
 
+class RLMScheduledThreadPool(poolSize: Int) extends Logging {
+
+  private val scheduledThreadPool: ScheduledThreadPoolExecutor = {
+    val threadPool = new ScheduledThreadPoolExecutor(poolSize)
+    threadPool.setRemoveOnCancelPolicy(true)
+    threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+    threadPool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
+    threadPool.setThreadFactory(new ThreadFactory {
+      private val sequence = new AtomicInteger()
+
+      override def newThread(r: Runnable): Thread = {
+        KafkaThread.daemon("kafka-rlm-thread-pool-" + sequence.incrementAndGet(), r)
+      }
+    })
+
+    threadPool
+  }
+
+  def resizePool(size: Int): Unit = {
+    info(s"Resizing pool from ${scheduledThreadPool.getCorePoolSize} to $size")
+    scheduledThreadPool.setCorePoolSize(size)
+  }
+
+  def poolSize(): Int = scheduledThreadPool.getCorePoolSize
+
+  def getIdlePercent(): Double = {
+    1 - scheduledThreadPool.getActiveCount().asInstanceOf[Double] / scheduledThreadPool.getCorePoolSize.asInstanceOf[Double]
+  }
+
+  def scheduleWithFixedDelay(runnable: Runnable, initialDelay: Long, delay: Long,
+                             timeUnit: TimeUnit): ScheduledFuture[_] = {
+    info(s"Scheduling runnable $runnable with initial delay: $initialDelay, fixed delay: $delay")
+    scheduledThreadPool.scheduleWithFixedDelay(runnable, initialDelay, delay, timeUnit)
+  }
+
+  def shutdown(): Boolean = {
+    info("Shutting down scheduled thread pool")
+    scheduledThreadPool.shutdownNow()
+    //waits for 2 mins to terminate the current tasks
+    scheduledThreadPool.awaitTermination(2, TimeUnit.MINUTES)
+  }
+}
+
 /**
  * This class is responsible for
- *  - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances.
+ *  - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances
  *  - receives any leader and follower replica events and partition stop events and act on them
- *  - also provides APIs to fetch indexes, metadata about remote log segments.
+ *  - also provides APIs to fetch indexes, metadata about remote log segments
+ *  - copying log segments to remote storage
  *
  * @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level.
  * @param brokerId  id of the current broker.
  * @param logDir    directory of Kafka log segments.
+ * @param time      Time instance.

Review Comment:
   Since this is a relatively new class and we are in the process of migrating scala classes to java, would it be better to write this class in java but still keep it in the core module? This way, we don't need to write the new code twice, once in scala and another in java.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org