You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2022/09/22 01:30:20 UTC

[rocketmq-streams] branch main updated: feature(configuable) support store configuable in db

This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new d19c775e feature(configuable) support store configuable in db
     new 9b90e048 Merge pull request #218 from ni-ze/supportRsqldb
d19c775e is described below

commit d19c775e560bcb967308b3f869e9d26af80904b6
Author: 维章 <un...@gmail.com>
AuthorDate: Wed Sep 21 20:40:23 2022 +0800

    feature(configuable) support store configuable in db
---
 .../streams/common/configure/ConfigureFileKey.java | 39 ++------------
 .../service/AbstractConfigurableService.java       |  3 +-
 .../service/ConfigurableServiceFactory.java        |  4 +-
 .../streams/db/configuable/DBConfigureService.java | 63 +++++++++++++---------
 4 files changed, 44 insertions(+), 65 deletions(-)

diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
index fef876a6..ec65c70e 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.streams.common.component.ComponentCreator;
  */
 public interface ConfigureFileKey {
 
-    String CONNECT_TYPE = "dipper.configurable.service.type";
+    String CONNECT_TYPE = "configurable.storage.type";
     /**
      * 数据库url
      */
@@ -31,27 +31,11 @@ public interface ConfigureFileKey {
     String JDBC_URL = "jdbc.url";
     String JDBC_USERNAME = "jdbc.username";
     String JDBC_PASSWORD = "jdbc.password";
-    String LEASE_CONSISTENT_HASH_SUFFIX = "dipper.lease.consistent.hash.suffix";
     String JDBC_DRIVER = "jdbc.driver";
     String JDBC_TABLE_NAME = "table.name";
     String SECRECY = "dipper.configure.sec.key";
     String SECRECY_DEFAULT = "";
 
-    /**
-     * 情报的连接信息
-     */
-    String INTELLIGENCE_JDBC_URL = "intelligence.rds.jdbc.url";
-    String INTELLIGENCE_JDBC_USERNAME = "intelligence.rds.jdbc.username";
-    String INTELLIGENCE_JDBC_PASSWORD = "intelligence.rds.jdbc.password";
-    String INTELLIGENCE_SWTICH = "intelligence.switch.open";
-    String INTELLIGENCE_TIP_DB_ENDPOINT = "intelligence.tip.db.endpoint";
-    String INTELLIGENCE_AK = "intelligence_ak";
-    String INTELLIGENCE_SK = "intelligence_sk";
-    String INTELLIGENCE_REGION = "intelligence_region";
-
-    /*
-     * regex engine option, "hyperscan or re2j". when this option not set, use hyperscan default
-     */
     String DIPPER_REGEX_ENGINE = "dipper.regex.engine.option";
 
     /**
@@ -61,26 +45,9 @@ public interface ConfigureFileKey {
 
     String DIPPER_RUNNING_STATUS_DEFAULT = "true";
 
-    /**
-     * 代表常量,不需要在配置文件配置
-     */
-    String JDBC_COMPATIBILITY_RULEENGINE_TABLE_NAME = "ruleengine_configure";
-    /**
-     * 如果需要兼容老规则引擎规则,且规则存储在ruleengine_configure中时,设置为true。如果老规则迁移到了dipper_configure, 这个值不需要设置或设置成false即可。兼容老的规则引擎,老规则引擎的namespace 是name_space需要通过这个配置告诉resource做适配。
-     */
-    String JDBC_COMPATIBILITY_OLD_RULEENGINE = "mysql.compatibility.old.ruleengine";
-    /**
-     *
-     */
+
     String POLLING_TIME = "dipper.configurable.polling.time";
-    /**
-     * 代理dbchannel的class,需要继承JDBCDataSource抽象类。如果配置这个参数,则会给dbchannel增加一层代理,所有需要db访问的,都是通过open api发送sql给代理
-     */
-    String DB_PROXY_CLASS_NAME = ComponentCreator.DB_PROXY_CLASS_NAME;
-    /**
-     * 创建channel的服务
-     */
-    String DIPPER_INSTANCE_CHANNEL_CREATOR_SERVICE_NAME = ComponentCreator.DIPPER_INSTANCE_CHANNEL_CREATOR_SERVICE_NAME;
+
     /**
      * 默认的文件存储transport的name
      */
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
index 37eb2786..7126d04c 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
@@ -270,7 +270,8 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
         return isUpdate;
     }
 
-    @Override public void insert(IConfigurable configurable) {
+    @Override
+    public void insert(IConfigurable configurable) {
         insertConfigurable(configurable);
     }
 
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java
index a7a29da5..315db848 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java
@@ -20,20 +20,20 @@ import java.util.Properties;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
 import org.apache.rocketmq.streams.common.utils.ReflectUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent;
 
 public class ConfigurableServiceFactory {
     private static ServiceLoaderComponent<IConfigurableService> configurableServiceLoaderComponent = ServiceLoaderComponent.getInstance(IConfigurableService.class);
-    public static final String CONFIGURABLE_SERVICE_TYPE = "dipper.configurable.service.type";
     private static final Log LOG = LogFactory.getLog(ConfigurableServiceFactory.class);
 
     public static IConfigurableService createConfigurableService(Properties properties) {
         try {
             Properties properties1 = new Properties();
             properties1.putAll(properties);
-            String type = properties1.getProperty(CONFIGURABLE_SERVICE_TYPE);
+            String type = properties1.getProperty(ConfigureFileKey.CONNECT_TYPE);
             if (StringUtil.isEmpty(type)) {
                 type = IConfigurableService.MEMORY_SERVICE_NAME;
             }
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java
index 234b4ab7..5e9a88cf 100644
--- a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java
@@ -44,40 +44,56 @@ import org.apache.rocketmq.streams.db.driver.JDBCDriver;
  */
 
 public class DBConfigureService extends AbstractConfigurableService implements IPropertyEnable {
-
     private static final Log LOG = LogFactory.getLog(DBConfigureService.class);
-    private String jdbcdriver;
+
+    private String jdbcDriver;
     private String url;
     private String userName;
     private String password;
     private String tableName = "dipper_configure";
-    @Deprecated
-    private boolean isCompatibilityOldRuleEngine = false;//兼容老规则引擎使用,正常场景不需要理会
 
-    public DBConfigureService(String jdbcdriver, String url, String userName, String password) {
-        this(jdbcdriver, url, userName, password, null);
+    private boolean tableCreated = false;
+
+    public DBConfigureService(String jdbcDriver, String url, String userName, String password) {
+        this(jdbcDriver, url, userName, password, null);
     }
 
-    public DBConfigureService(String jdbcdriver, String url, String userName, String password, String tableName) {
+    public DBConfigureService(String jdbcDriver, String url, String userName, String password, String tableName) {
         this.url = url;
-        this.jdbcdriver = jdbcdriver;
+        this.jdbcDriver = jdbcDriver;
         this.userName = userName;
         this.password = password;
         this.tableName = tableName;
-        LOG.info("DBConfigureService resource ,the info is: driver:" + this.jdbcdriver + ",url:" + this.url
+        LOG.info("DBConfigureService resource ,the info is: driver:" + this.jdbcDriver + ",url:" + this.url
             + ",username:" + userName + ",password:" + password);
-        regJdbcDriver(jdbcdriver);
+        regJdbcDriver(jdbcDriver);
     }
 
     public DBConfigureService() {
+        createTableIfNotExist();
     }
 
-    /**
-     * @param properties
-     */
     public DBConfigureService(Properties properties) {
         super(properties);
         initProperty(properties);
+        createTableIfNotExist();
+    }
+
+    private void createTableIfNotExist() {
+        if (tableCreated) {
+            return;
+        }
+
+        synchronized (this) {
+            if (!tableCreated) {
+                JDBCDriver jdbcDriver = createResouce();
+
+                String sql = (Configure.createTableSQL(tableName));
+                jdbcDriver.execute(sql);
+
+                tableCreated = true;
+            }
+        }
     }
 
     @Override
@@ -107,9 +123,7 @@ public class DBConfigureService extends AbstractConfigurableService implements I
         JDBCDriver resource = createResouce();
         try {
             String namespace = "namespace";
-            if (isCompatibilityOldRuleEngine && AbstractComponent.JDBC_COMPATIBILITY_RULEENGINE_TABLE_NAME.equals(tableName)) {
-                namespace = "name_space";
-            }
+
             String sql = "SELECT * FROM `" + tableName + "` WHERE " + namespace + " in (" + SQLUtil.createInSql(namespaces) + ") and status =1";
             if (StringUtil.isNotEmpty(type)) {
                 sql = sql + " and type='" + type + "'";
@@ -198,12 +212,12 @@ public class DBConfigureService extends AbstractConfigurableService implements I
     }
 
     protected JDBCDriver createResouce() {
-        JDBCDriver resource = DriverBuilder.createDriver(this.jdbcdriver, this.url, this.userName, this.password);
+        JDBCDriver resource = DriverBuilder.createDriver(this.jdbcDriver, this.url, this.userName, this.password);
         return resource;
     }
 
-    public void setJdbcdriver(String jdbcdriver) {
-        this.jdbcdriver = jdbcdriver;
+    public void setJdbcDriver(String jdbcDriver) {
+        this.jdbcDriver = jdbcDriver;
     }
 
     public void setUrl(String url) {
@@ -233,21 +247,18 @@ public class DBConfigureService extends AbstractConfigurableService implements I
 
     @Override
     public void initProperty(Properties properties) {
-        this.jdbcdriver = properties.getProperty(AbstractComponent.JDBC_DRIVER);
-        regJdbcDriver(jdbcdriver);
+        this.jdbcDriver = properties.getProperty(AbstractComponent.JDBC_DRIVER);
+        regJdbcDriver(jdbcDriver);
         this.url = properties.getProperty(AbstractComponent.JDBC_URL);
         this.userName = properties.getProperty(AbstractComponent.JDBC_USERNAME);
         this.password = properties.getProperty(AbstractComponent.JDBC_PASSWORD);
         String tableName = properties.getProperty(AbstractComponent.JDBC_TABLE_NAME);
-        String isCompatibilityOldRuleEngine = properties.getProperty(AbstractComponent.JDBC_COMPATIBILITY_OLD_RULEENGINE);
-        if (StringUtil.isNotEmpty(isCompatibilityOldRuleEngine)) {
-            this.isCompatibilityOldRuleEngine = true;
-        }
+
         if (StringUtil.isNotEmpty(tableName)) {
             this.tableName = tableName;
         }
         LOG.info(
-            "Properties resource ,the info is: driver:" + this.jdbcdriver + ",url:" + this.url + ",username:" + userName
+            "Properties resource ,the info is: driver:" + this.jdbcDriver + ",url:" + this.url + ",username:" + userName
                 + ",password:" + password);
     }