You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/09/20 06:35:08 UTC
[rocketmq] branch develop updated: [ISSUE #5089] Delegate plugin store configuration by broker
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d5170e5ba [ISSUE #5089] Delegate plugin store configuration by broker
d5170e5ba is described below
commit d5170e5ba374db7f6db3652477aa48b2ce28cbf8
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Tue Sep 20 14:34:54 2022 +0800
[ISSUE #5089] Delegate plugin store configuration by broker
---
.../apache/rocketmq/broker/BrokerController.java | 6 +-
.../store}/plugin/AbstractPluginMessageStore.java | 2 +-
.../store}/plugin/MessageStoreFactory.java | 91 ++++++++-------
.../store}/plugin/MessageStorePluginContext.java | 126 +++++++++++----------
4 files changed, 114 insertions(+), 111 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index e0262a105..6c79559c3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -69,8 +69,8 @@ import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
-import org.apache.rocketmq.broker.plugin.MessageStoreFactory;
-import org.apache.rocketmq.broker.plugin.MessageStorePluginContext;
+import org.apache.rocketmq.store.plugin.MessageStoreFactory;
+import org.apache.rocketmq.store.plugin.MessageStorePluginContext;
import org.apache.rocketmq.broker.processor.AckMessageProcessor;
import org.apache.rocketmq.broker.processor.AdminBrokerProcessor;
import org.apache.rocketmq.broker.processor.ChangeInvisibleTimeProcessor;
@@ -729,7 +729,7 @@ public class BrokerController {
}
this.brokerStats = new BrokerStats(defaultMessageStore);
//load plugin
- MessageStorePluginContext context = new MessageStorePluginContext(this, messageStoreConfig, brokerStatsManager, messageArrivingListener);
+ MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig, configuration);
this.messageStore = MessageStoreFactory.build(context, defaultMessageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
if (this.brokerConfig.isEnableControllerMode()) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
similarity index 99%
rename from broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
rename to store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 3164337f7..47f6cc9d7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.broker.plugin;
+package org.apache.rocketmq.store.plugin;
import java.nio.ByteBuffer;
import java.util.HashMap;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java b/store/src/main/java/org/apache/rocketmq/store/plugin/MessageStoreFactory.java
similarity index 87%
rename from broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
rename to store/src/main/java/org/apache/rocketmq/store/plugin/MessageStoreFactory.java
index b64ab5a40..8d929ea56 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
+++ b/store/src/main/java/org/apache/rocketmq/store/plugin/MessageStoreFactory.java
@@ -1,46 +1,45 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.plugin;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import org.apache.rocketmq.store.MessageStore;
-
-public final class MessageStoreFactory {
- public final static MessageStore build(MessageStorePluginContext context,
- MessageStore messageStore) throws IOException {
- String plugin = context.getBrokerConfig().getMessageStorePlugIn();
- if (plugin != null && plugin.trim().length() != 0) {
- String[] pluginClasses = plugin.split(",");
- for (int i = pluginClasses.length - 1; i >= 0; --i) {
- String pluginClass = pluginClasses[i];
- try {
- @SuppressWarnings("unchecked")
- Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>)Class.forName(pluginClass);
- Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class);
- AbstractPluginMessageStore pluginMessageStore = construct.newInstance(context, messageStore);
- messageStore = pluginMessageStore;
- }
- catch (Throwable e) {
- throw new RuntimeException("Initialize plugin's class: " + pluginClass + " not found!", e);
- }
- }
- }
- return messageStore;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.plugin;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import org.apache.rocketmq.store.MessageStore;
+
+public final class MessageStoreFactory {
+ public static MessageStore build(MessageStorePluginContext context,
+ MessageStore messageStore) throws IOException {
+ String plugin = context.getBrokerConfig().getMessageStorePlugIn();
+ if (plugin != null && plugin.trim().length() != 0) {
+ String[] pluginClasses = plugin.split(",");
+ for (int i = pluginClasses.length - 1; i >= 0; --i) {
+ String pluginClass = pluginClasses[i];
+ try {
+ @SuppressWarnings("unchecked")
+ Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass);
+ Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class);
+ AbstractPluginMessageStore pluginMessageStore = construct.newInstance(context, messageStore);
+ messageStore = pluginMessageStore;
+ } catch (Throwable e) {
+ throw new RuntimeException("Initialize plugin's class: " + pluginClass + " not found!", e);
+ }
+ }
+ }
+ return messageStore;
+ }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java b/store/src/main/java/org/apache/rocketmq/store/plugin/MessageStorePluginContext.java
similarity index 73%
rename from broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
rename to store/src/main/java/org/apache/rocketmq/store/plugin/MessageStorePluginContext.java
index c132cf93d..b10f9f2b5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
+++ b/store/src/main/java/org/apache/rocketmq/store/plugin/MessageStorePluginContext.java
@@ -1,61 +1,65 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.plugin;
-
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.store.MessageArrivingListener;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-
-public class MessageStorePluginContext {
- private BrokerController controller;
- private MessageStoreConfig messageStoreConfig;
- private BrokerStatsManager brokerStatsManager;
- private MessageArrivingListener messageArrivingListener;
-
- public MessageStorePluginContext(BrokerController controller, MessageStoreConfig messageStoreConfig,
- BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener) {
- super();
- this.messageStoreConfig = messageStoreConfig;
- this.brokerStatsManager = brokerStatsManager;
- this.messageArrivingListener = messageArrivingListener;
- this.controller = controller;
- }
-
- public MessageStoreConfig getMessageStoreConfig() {
- return messageStoreConfig;
- }
-
- public BrokerStatsManager getBrokerStatsManager() {
- return brokerStatsManager;
- }
-
- public MessageArrivingListener getMessageArrivingListener() {
- return messageArrivingListener;
- }
-
- public BrokerConfig getBrokerConfig() {
- return controller.getBrokerConfig();
- }
-
- public BrokerController getController() {
- return controller;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.plugin;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.Configuration;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.store.MessageArrivingListener;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+
+public class MessageStorePluginContext {
+ private MessageStoreConfig messageStoreConfig;
+ private BrokerStatsManager brokerStatsManager;
+ private MessageArrivingListener messageArrivingListener;
+ private BrokerConfig brokerConfig;
+ private final Configuration configuration;
+
+ public MessageStorePluginContext(MessageStoreConfig messageStoreConfig,
+ BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener,
+ BrokerConfig brokerConfig, Configuration configuration) {
+ super();
+ this.messageStoreConfig = messageStoreConfig;
+ this.brokerStatsManager = brokerStatsManager;
+ this.messageArrivingListener = messageArrivingListener;
+ this.brokerConfig = brokerConfig;
+ this.configuration = configuration;
+ }
+
+ public MessageStoreConfig getMessageStoreConfig() {
+ return messageStoreConfig;
+ }
+
+ public BrokerStatsManager getBrokerStatsManager() {
+ return brokerStatsManager;
+ }
+
+ public MessageArrivingListener getMessageArrivingListener() {
+ return messageArrivingListener;
+ }
+
+ public BrokerConfig getBrokerConfig() {
+ return brokerConfig;
+ }
+
+ public void registerConfiguration(Object config) {
+ MixAll.properties2Object(configuration.getAllConfigs(), config);
+ configuration.registerConfig(config);
+ }
+}