You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/09/20 06:35:08 UTC

[rocketmq] branch develop updated: [ISSUE #5089] Delegate plugin store configuration by broker

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d5170e5ba [ISSUE #5089] Delegate plugin store configuration by broker
d5170e5ba is described below

commit d5170e5ba374db7f6db3652477aa48b2ce28cbf8
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Tue Sep 20 14:34:54 2022 +0800

    [ISSUE #5089] Delegate plugin store configuration by broker
---
 .../apache/rocketmq/broker/BrokerController.java   |   6 +-
 .../store}/plugin/AbstractPluginMessageStore.java  |   2 +-
 .../store}/plugin/MessageStoreFactory.java         |  91 ++++++++-------
 .../store}/plugin/MessageStorePluginContext.java   | 126 +++++++++++----------
 4 files changed, 114 insertions(+), 111 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index e0262a105..6c79559c3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -69,8 +69,8 @@ import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
 import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
 import org.apache.rocketmq.broker.out.BrokerOuterAPI;
 import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
-import org.apache.rocketmq.broker.plugin.MessageStoreFactory;
-import org.apache.rocketmq.broker.plugin.MessageStorePluginContext;
+import org.apache.rocketmq.store.plugin.MessageStoreFactory;
+import org.apache.rocketmq.store.plugin.MessageStorePluginContext;
 import org.apache.rocketmq.broker.processor.AckMessageProcessor;
 import org.apache.rocketmq.broker.processor.AdminBrokerProcessor;
 import org.apache.rocketmq.broker.processor.ChangeInvisibleTimeProcessor;
@@ -729,7 +729,7 @@ public class BrokerController {
                 }
                 this.brokerStats = new BrokerStats(defaultMessageStore);
                 //load plugin
-                MessageStorePluginContext context = new MessageStorePluginContext(this, messageStoreConfig, brokerStatsManager, messageArrivingListener);
+                MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig, configuration);
                 this.messageStore = MessageStoreFactory.build(context, defaultMessageStore);
                 this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
                 if (this.brokerConfig.isEnableControllerMode()) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
similarity index 99%
rename from broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
rename to store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 3164337f7..47f6cc9d7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.broker.plugin;
+package org.apache.rocketmq.store.plugin;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java b/store/src/main/java/org/apache/rocketmq/store/plugin/MessageStoreFactory.java
similarity index 87%
rename from broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
rename to store/src/main/java/org/apache/rocketmq/store/plugin/MessageStoreFactory.java
index b64ab5a40..8d929ea56 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
+++ b/store/src/main/java/org/apache/rocketmq/store/plugin/MessageStoreFactory.java
@@ -1,46 +1,45 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.plugin;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import org.apache.rocketmq.store.MessageStore;
-
-public final class MessageStoreFactory {
-    public final static MessageStore build(MessageStorePluginContext context,
-        MessageStore messageStore) throws IOException {
-        String plugin = context.getBrokerConfig().getMessageStorePlugIn();
-        if (plugin != null && plugin.trim().length() != 0) {
-            String[] pluginClasses = plugin.split(",");
-            for (int i = pluginClasses.length - 1; i >= 0; --i) {
-                String pluginClass = pluginClasses[i];
-                try {
-                    @SuppressWarnings("unchecked")
-                    Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>)Class.forName(pluginClass);
-                    Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class);
-                    AbstractPluginMessageStore pluginMessageStore = construct.newInstance(context, messageStore);
-                    messageStore = pluginMessageStore;
-                }
-                catch (Throwable e) {
-                    throw new RuntimeException("Initialize plugin's class: " + pluginClass + " not found!", e);
-                }
-            }
-        }
-        return messageStore;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.plugin;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import org.apache.rocketmq.store.MessageStore;
+
+public final class MessageStoreFactory {
+    public static MessageStore build(MessageStorePluginContext context,
+        MessageStore messageStore) throws IOException {
+        String plugin = context.getBrokerConfig().getMessageStorePlugIn();
+        if (plugin != null && plugin.trim().length() != 0) {
+            String[] pluginClasses = plugin.split(",");
+            for (int i = pluginClasses.length - 1; i >= 0; --i) {
+                String pluginClass = pluginClasses[i];
+                try {
+                    @SuppressWarnings("unchecked")
+                    Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass);
+                    Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class);
+                    AbstractPluginMessageStore pluginMessageStore = construct.newInstance(context, messageStore);
+                    messageStore = pluginMessageStore;
+                } catch (Throwable e) {
+                    throw new RuntimeException("Initialize plugin's class: " + pluginClass + " not found!", e);
+                }
+            }
+        }
+        return messageStore;
+    }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java b/store/src/main/java/org/apache/rocketmq/store/plugin/MessageStorePluginContext.java
similarity index 73%
rename from broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
rename to store/src/main/java/org/apache/rocketmq/store/plugin/MessageStorePluginContext.java
index c132cf93d..b10f9f2b5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
+++ b/store/src/main/java/org/apache/rocketmq/store/plugin/MessageStorePluginContext.java
@@ -1,61 +1,65 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.plugin;
-
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.store.MessageArrivingListener;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-
-public class MessageStorePluginContext {
-    private BrokerController controller;
-    private MessageStoreConfig messageStoreConfig;
-    private BrokerStatsManager brokerStatsManager;
-    private MessageArrivingListener messageArrivingListener;
-
-    public MessageStorePluginContext(BrokerController controller, MessageStoreConfig messageStoreConfig,
-        BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener) {
-        super();
-        this.messageStoreConfig = messageStoreConfig;
-        this.brokerStatsManager = brokerStatsManager;
-        this.messageArrivingListener = messageArrivingListener;
-        this.controller = controller;
-    }
-
-    public MessageStoreConfig getMessageStoreConfig() {
-        return messageStoreConfig;
-    }
-
-    public BrokerStatsManager getBrokerStatsManager() {
-        return brokerStatsManager;
-    }
-
-    public MessageArrivingListener getMessageArrivingListener() {
-        return messageArrivingListener;
-    }
-
-    public BrokerConfig getBrokerConfig() {
-        return controller.getBrokerConfig();
-    }
-
-    public BrokerController getController() {
-        return controller;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.plugin;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.Configuration;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.store.MessageArrivingListener;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+
+public class MessageStorePluginContext {
+    private MessageStoreConfig messageStoreConfig;
+    private BrokerStatsManager brokerStatsManager;
+    private MessageArrivingListener messageArrivingListener;
+    private BrokerConfig brokerConfig;
+    private final Configuration configuration;
+
+    public MessageStorePluginContext(MessageStoreConfig messageStoreConfig,
+        BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener,
+        BrokerConfig brokerConfig, Configuration configuration) {
+        super();
+        this.messageStoreConfig = messageStoreConfig;
+        this.brokerStatsManager = brokerStatsManager;
+        this.messageArrivingListener = messageArrivingListener;
+        this.brokerConfig = brokerConfig;
+        this.configuration = configuration;
+    }
+
+    public MessageStoreConfig getMessageStoreConfig() {
+        return messageStoreConfig;
+    }
+
+    public BrokerStatsManager getBrokerStatsManager() {
+        return brokerStatsManager;
+    }
+
+    public MessageArrivingListener getMessageArrivingListener() {
+        return messageArrivingListener;
+    }
+
+    public BrokerConfig getBrokerConfig() {
+        return brokerConfig;
+    }
+
+    public void registerConfiguration(Object config) {
+        MixAll.properties2Object(configuration.getAllConfigs(), config);
+        configuration.registerConfig(config);
+    }
+}