You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "divijvaidya (via GitHub)" <gi...@apache.org> on 2023/06/08 14:19:27 UTC

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

divijvaidya commented on code in PR #13828:
URL: https://github.com/apache/kafka/pull/13828#discussion_r1223088072


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() {
         });
     }
 
+    public void endPoint(Optional<EndPoint> endpoint) {
+        this.endpoint = endpoint;
+    }
+
     private void configureRLMM() {
         final Map<String, Object> rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps());
 
         rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
         rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
+        endpoint.ifPresent(e -> {
+            rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port());

Review Comment:
   please use the constant `CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG`



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() {
         });
     }
 
+    public void endPoint(Optional<EndPoint> endpoint) {
+        this.endpoint = endpoint;
+    }
+
     private void configureRLMM() {
         final Map<String, Object> rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps());
 
         rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
         rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
+        endpoint.ifPresent(e -> {

Review Comment:
   missing property `cluster.id` as per https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java#L49



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -280,7 +281,8 @@ class KafkaServer(
         _brokerState = BrokerState.RECOVERY
         logManager.startup(zkClient.getAllTopicsInCluster())
 
-        remoteLogManager = createRemoteLogManager(config)
+        val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
+        remoteLogManager = createRemoteLogManager(remoteLogManagerConfig)

Review Comment:
   s/remoteLogManager/remoteLogManagerOpt



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -504,6 +506,13 @@ class KafkaServer(
           new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
             KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
+        remoteLogManager.foreach(rlm => {
+          val listenerName = ListenerName.normalised(remoteLogManagerConfig.remoteLogMetadataManagerListenerName())
+          val endpoint = brokerInfo.broker.endPoints.find(e => e.listenerName.equals(listenerName))
+            .orElse(Some(brokerInfo.broker.endPoints.head))

Review Comment:
   this means that endpoint will never be optional (since we are picking up the first broker endpoint when it's not configrued). Right? In that case, can we make it mandatory please in RemoteLogManager?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() {
         });
     }
 
+    public void endPoint(Optional<EndPoint> endpoint) {
+        this.endpoint = endpoint;
+    }
+
     private void configureRLMM() {
         final Map<String, Object> rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps());
 
         rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
         rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
+        endpoint.ifPresent(e -> {
+            rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port());
+            rlmmProps.put("security.protocol", e.securityProtocol().name);

Review Comment:
   please use the constant CommonClientConfigs.SECURITY_PROTOCOL_CONFIG



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -280,7 +281,8 @@ class KafkaServer(
         _brokerState = BrokerState.RECOVERY
         logManager.startup(zkClient.getAllTopicsInCluster())
 
-        remoteLogManager = createRemoteLogManager(config)
+        val remoteLogManagerConfig = new RemoteLogManagerConfig(config)

Review Comment:
   should we create this only when `remoteLogManagerConfig.enableRemoteStorageSystem()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org