You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/09/09 13:43:50 UTC
[kafka] branch 3.6 updated: MINOR: Removed the RSM and RLMM classpath config validator (#14358)
This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 2a56edc0ea7 MINOR: Removed the RSM and RLMM classpath config validator (#14358)
2a56edc0ea7 is described below
commit 2a56edc0ea76612c011254302fbc4d5af9ba461b
Author: Kamal Chandraprakash <kc...@uber.com>
AuthorDate: Sat Sep 9 19:02:42 2023 +0530
MINOR: Removed the RSM and RLMM classpath config validator (#14358)
- RSM and RLMM classpath can be empty since it's optional so removed the non-empty string validator
- Fix getting the `localTieredStorage` by brokerId after stopping a broker.
Reviewers: Christo Lolov <lo...@amazon.com>, Luke Chen <sh...@gmail.com>, Satish Duggana <sa...@apache.org>
---
.../server/log/remote/storage/RemoteLogManagerConfig.java | 4 ++--
.../kafka/server/log/remote/storage/LocalTieredStorage.java | 4 ++++
.../apache/kafka/tiered/storage/TieredStorageTestContext.java | 11 +++++++++--
3 files changed, 15 insertions(+), 4 deletions(-)
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index f6363639713..32dcfe37311 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -171,7 +171,7 @@ public final class RemoteLogManagerConfig {
REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC)
.define(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, STRING,
null,
- new ConfigDef.NonEmptyString(),
+ null,
MEDIUM,
REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC)
.define(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
@@ -183,7 +183,7 @@ public final class RemoteLogManagerConfig {
.define(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP,
STRING,
null,
- new ConfigDef.NonEmptyString(),
+ null,
MEDIUM,
REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC)
.define(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, STRING,
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
index 43c09ccd908..64131d15559 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
@@ -561,4 +561,8 @@ public final class LocalTieredStorage implements RemoteStorageManager {
}
return SEGMENT;
}
+
+ public int brokerId() {
+ return brokerId;
+ }
}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
index 1975a1690cf..59acae74ad3 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
@@ -271,11 +271,18 @@ public final class TieredStorageTestContext implements AutoCloseable {
public LocalTieredStorageSnapshot takeTieredStorageSnapshot() {
int aliveBrokerId = harness.aliveBrokers().head().config().brokerId();
- return LocalTieredStorageSnapshot.takeSnapshot(remoteStorageManagers.get(aliveBrokerId));
+ return LocalTieredStorageSnapshot.takeSnapshot(remoteStorageManager(aliveBrokerId));
}
public LocalTieredStorageHistory tieredStorageHistory(int brokerId) {
- return remoteStorageManagers.get(brokerId).getHistory();
+ return remoteStorageManager(brokerId).getHistory();
+ }
+
+ public LocalTieredStorage remoteStorageManager(int brokerId) {
+ return remoteStorageManagers.stream()
+ .filter(rsm -> rsm.brokerId() == brokerId)
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("No remote storage manager found for broker " + brokerId));
}
public List<LocalTieredStorage> remoteStorageManagers() {