You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/09/09 13:43:50 UTC

[kafka] branch 3.6 updated: MINOR: Removed the RSM and RLMM classpath config validator (#14358)

This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 2a56edc0ea7 MINOR: Removed the RSM and RLMM classpath config validator (#14358)
2a56edc0ea7 is described below

commit 2a56edc0ea76612c011254302fbc4d5af9ba461b
Author: Kamal Chandraprakash <kc...@uber.com>
AuthorDate: Sat Sep 9 19:02:42 2023 +0530

    MINOR: Removed the RSM and RLMM classpath config validator (#14358)
    
    - RSM and RLMM classpath can be empty since it's optional so removed the non-empty string validator
    - Fix getting the `localTieredStorage` by brokerId after stopping a broker.
    
    Reviewers: Christo Lolov <lo...@amazon.com>, Luke Chen <sh...@gmail.com>, Satish Duggana <sa...@apache.org>
---
 .../server/log/remote/storage/RemoteLogManagerConfig.java     |  4 ++--
 .../kafka/server/log/remote/storage/LocalTieredStorage.java   |  4 ++++
 .../apache/kafka/tiered/storage/TieredStorageTestContext.java | 11 +++++++++--
 3 files changed, 15 insertions(+), 4 deletions(-)

diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index f6363639713..32dcfe37311 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -171,7 +171,7 @@ public final class RemoteLogManagerConfig {
                                   REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC)
                   .define(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, STRING,
                                   null,
-                                  new ConfigDef.NonEmptyString(),
+                                  null,
                                   MEDIUM,
                                   REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC)
                   .define(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
@@ -183,7 +183,7 @@ public final class RemoteLogManagerConfig {
                   .define(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP,
                                   STRING,
                                   null,
-                                  new ConfigDef.NonEmptyString(),
+                                  null,
                                   MEDIUM,
                                   REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC)
                   .define(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, STRING,
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
index 43c09ccd908..64131d15559 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
@@ -561,4 +561,8 @@ public final class LocalTieredStorage implements RemoteStorageManager {
         }
         return SEGMENT;
     }
+
+    public int brokerId() {
+        return brokerId;
+    }
 }
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
index 1975a1690cf..59acae74ad3 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
@@ -271,11 +271,18 @@ public final class TieredStorageTestContext implements AutoCloseable {
 
     public LocalTieredStorageSnapshot takeTieredStorageSnapshot() {
         int aliveBrokerId = harness.aliveBrokers().head().config().brokerId();
-        return LocalTieredStorageSnapshot.takeSnapshot(remoteStorageManagers.get(aliveBrokerId));
+        return LocalTieredStorageSnapshot.takeSnapshot(remoteStorageManager(aliveBrokerId));
     }
 
     public LocalTieredStorageHistory tieredStorageHistory(int brokerId) {
-        return remoteStorageManagers.get(brokerId).getHistory();
+        return remoteStorageManager(brokerId).getHistory();
+    }
+
+    public LocalTieredStorage remoteStorageManager(int brokerId) {
+        return remoteStorageManagers.stream()
+                .filter(rsm -> rsm.brokerId() == brokerId)
+                .findFirst()
+                .orElseThrow(() -> new IllegalArgumentException("No remote storage manager found for broker " + brokerId));
     }
 
     public List<LocalTieredStorage> remoteStorageManagers() {