You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "junrao (via GitHub)" <gi...@apache.org> on 2023/04/05 20:24:32 UTC
[GitHub] [kafka] junrao commented on a diff in pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager
junrao commented on code in PR #13487:
URL: https://github.com/apache/kafka/pull/13487#discussion_r1158975829
##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -17,39 +17,97 @@
package kafka.log.remote
import kafka.cluster.Partition
+import kafka.log.UnifiedLog
import kafka.server.KafkaConfig
import kafka.utils.Logging
import org.apache.kafka.common._
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
-import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, KafkaThread, Time, Utils}
+import org.apache.kafka.server.common.CheckpointFile.CheckpointWriteBuffer
import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
import org.apache.kafka.server.log.remote.storage._
+import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
+import org.apache.kafka.storage.internals.log.EpochEntry
-import java.io.{Closeable, InputStream}
+import java.io._
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.nio.file.Path
import java.security.{AccessController, PrivilegedAction}
-import java.util
import java.util.Optional
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
+import java.{lang, util}
+import scala.collection.Searching._
import scala.collection.Set
import scala.jdk.CollectionConverters._
+class RLMScheduledThreadPool(poolSize: Int) extends Logging {
+
+ private val scheduledThreadPool: ScheduledThreadPoolExecutor = {
+ val threadPool = new ScheduledThreadPoolExecutor(poolSize)
+ threadPool.setRemoveOnCancelPolicy(true)
+ threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+ threadPool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
+ threadPool.setThreadFactory(new ThreadFactory {
+ private val sequence = new AtomicInteger()
+
+ override def newThread(r: Runnable): Thread = {
+ KafkaThread.daemon("kafka-rlm-thread-pool-" + sequence.incrementAndGet(), r)
+ }
+ })
+
+ threadPool
+ }
+
+ def resizePool(size: Int): Unit = {
+ info(s"Resizing pool from ${scheduledThreadPool.getCorePoolSize} to $size")
+ scheduledThreadPool.setCorePoolSize(size)
+ }
+
+ def poolSize(): Int = scheduledThreadPool.getCorePoolSize
+
+ def getIdlePercent(): Double = {
+ 1 - scheduledThreadPool.getActiveCount().asInstanceOf[Double] / scheduledThreadPool.getCorePoolSize.asInstanceOf[Double]
+ }
+
+ def scheduleWithFixedDelay(runnable: Runnable, initialDelay: Long, delay: Long,
+ timeUnit: TimeUnit): ScheduledFuture[_] = {
+ info(s"Scheduling runnable $runnable with initial delay: $initialDelay, fixed delay: $delay")
+ scheduledThreadPool.scheduleWithFixedDelay(runnable, initialDelay, delay, timeUnit)
+ }
+
+ def shutdown(): Boolean = {
+ info("Shutting down scheduled thread pool")
+ scheduledThreadPool.shutdownNow()
+ //waits for 2 mins to terminate the current tasks
+ scheduledThreadPool.awaitTermination(2, TimeUnit.MINUTES)
+ }
+}
+
/**
* This class is responsible for
- * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances.
+ * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances
* - receives any leader and follower replica events and partition stop events and act on them
- * - also provides APIs to fetch indexes, metadata about remote log segments.
+ * - also provides APIs to fetch indexes, metadata about remote log segments
+ * - copying log segments to remote storage
*
* @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level.
* @param brokerId id of the current broker.
* @param logDir directory of Kafka log segments.
+ * @param time Time instance.
Review Comment:
Since this is a relatively new class and we are in the process of migrating scala classes to java, would it be better to write this class in java but still keep it in the core module? This way, we don't need to write the new code twice, once in scala and another in java.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org