You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2022/09/22 01:30:20 UTC
[rocketmq-streams] branch main updated: feature(configuable) support store configuable in db
This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new d19c775e feature(configuable) support store configuable in db
new 9b90e048 Merge pull request #218 from ni-ze/supportRsqldb
d19c775e is described below
commit d19c775e560bcb967308b3f869e9d26af80904b6
Author: 维章 <un...@gmail.com>
AuthorDate: Wed Sep 21 20:40:23 2022 +0800
feature(configuable) support store configuable in db
---
.../streams/common/configure/ConfigureFileKey.java | 39 ++------------
.../service/AbstractConfigurableService.java | 3 +-
.../service/ConfigurableServiceFactory.java | 4 +-
.../streams/db/configuable/DBConfigureService.java | 63 +++++++++++++---------
4 files changed, 44 insertions(+), 65 deletions(-)
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
index fef876a6..ec65c70e 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.streams.common.component.ComponentCreator;
*/
public interface ConfigureFileKey {
- String CONNECT_TYPE = "dipper.configurable.service.type";
+ String CONNECT_TYPE = "configurable.storage.type";
/**
* 数据库url
*/
@@ -31,27 +31,11 @@ public interface ConfigureFileKey {
String JDBC_URL = "jdbc.url";
String JDBC_USERNAME = "jdbc.username";
String JDBC_PASSWORD = "jdbc.password";
- String LEASE_CONSISTENT_HASH_SUFFIX = "dipper.lease.consistent.hash.suffix";
String JDBC_DRIVER = "jdbc.driver";
String JDBC_TABLE_NAME = "table.name";
String SECRECY = "dipper.configure.sec.key";
String SECRECY_DEFAULT = "";
- /**
- * 情报的连接信息
- */
- String INTELLIGENCE_JDBC_URL = "intelligence.rds.jdbc.url";
- String INTELLIGENCE_JDBC_USERNAME = "intelligence.rds.jdbc.username";
- String INTELLIGENCE_JDBC_PASSWORD = "intelligence.rds.jdbc.password";
- String INTELLIGENCE_SWTICH = "intelligence.switch.open";
- String INTELLIGENCE_TIP_DB_ENDPOINT = "intelligence.tip.db.endpoint";
- String INTELLIGENCE_AK = "intelligence_ak";
- String INTELLIGENCE_SK = "intelligence_sk";
- String INTELLIGENCE_REGION = "intelligence_region";
-
- /*
- * regex engine option, "hyperscan or re2j". when this option not set, use hyperscan default
- */
String DIPPER_REGEX_ENGINE = "dipper.regex.engine.option";
/**
@@ -61,26 +45,9 @@ public interface ConfigureFileKey {
String DIPPER_RUNNING_STATUS_DEFAULT = "true";
- /**
- * 代表常量,不需要在配置文件配置
- */
- String JDBC_COMPATIBILITY_RULEENGINE_TABLE_NAME = "ruleengine_configure";
- /**
- * 如果需要兼容老规则引擎规则,且规则存储在ruleengine_configure中时,设置为true。如果老规则迁移到了dipper_configure, 这个值不需要设置或设置成false即可。兼容老的规则引擎,老规则引擎的namespace 是name_space需要通过这个配置告诉resource做适配。
- */
- String JDBC_COMPATIBILITY_OLD_RULEENGINE = "mysql.compatibility.old.ruleengine";
- /**
- *
- */
+
String POLLING_TIME = "dipper.configurable.polling.time";
- /**
- * 代理dbchannel的class,需要继承JDBCDataSource抽象类。如果配置这个参数,则会给dbchannel增加一层代理,所有需要db访问的,都是通过open api发送sql给代理
- */
- String DB_PROXY_CLASS_NAME = ComponentCreator.DB_PROXY_CLASS_NAME;
- /**
- * 创建channel的服务
- */
- String DIPPER_INSTANCE_CHANNEL_CREATOR_SERVICE_NAME = ComponentCreator.DIPPER_INSTANCE_CHANNEL_CREATOR_SERVICE_NAME;
+
/**
* 默认的文件存储transport的name
*/
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
index 37eb2786..7126d04c 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
@@ -270,7 +270,8 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
return isUpdate;
}
- @Override public void insert(IConfigurable configurable) {
+ @Override
+ public void insert(IConfigurable configurable) {
insertConfigurable(configurable);
}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java
index a7a29da5..315db848 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java
@@ -20,20 +20,20 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
import org.apache.rocketmq.streams.common.utils.ReflectUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent;
public class ConfigurableServiceFactory {
private static ServiceLoaderComponent<IConfigurableService> configurableServiceLoaderComponent = ServiceLoaderComponent.getInstance(IConfigurableService.class);
- public static final String CONFIGURABLE_SERVICE_TYPE = "dipper.configurable.service.type";
private static final Log LOG = LogFactory.getLog(ConfigurableServiceFactory.class);
public static IConfigurableService createConfigurableService(Properties properties) {
try {
Properties properties1 = new Properties();
properties1.putAll(properties);
- String type = properties1.getProperty(CONFIGURABLE_SERVICE_TYPE);
+ String type = properties1.getProperty(ConfigureFileKey.CONNECT_TYPE);
if (StringUtil.isEmpty(type)) {
type = IConfigurableService.MEMORY_SERVICE_NAME;
}
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java
index 234b4ab7..5e9a88cf 100644
--- a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java
@@ -44,40 +44,56 @@ import org.apache.rocketmq.streams.db.driver.JDBCDriver;
*/
public class DBConfigureService extends AbstractConfigurableService implements IPropertyEnable {
-
private static final Log LOG = LogFactory.getLog(DBConfigureService.class);
- private String jdbcdriver;
+
+ private String jdbcDriver;
private String url;
private String userName;
private String password;
private String tableName = "dipper_configure";
- @Deprecated
- private boolean isCompatibilityOldRuleEngine = false;//兼容老规则引擎使用,正常场景不需要理会
- public DBConfigureService(String jdbcdriver, String url, String userName, String password) {
- this(jdbcdriver, url, userName, password, null);
+ private boolean tableCreated = false;
+
+ public DBConfigureService(String jdbcDriver, String url, String userName, String password) {
+ this(jdbcDriver, url, userName, password, null);
}
- public DBConfigureService(String jdbcdriver, String url, String userName, String password, String tableName) {
+ public DBConfigureService(String jdbcDriver, String url, String userName, String password, String tableName) {
this.url = url;
- this.jdbcdriver = jdbcdriver;
+ this.jdbcDriver = jdbcDriver;
this.userName = userName;
this.password = password;
this.tableName = tableName;
- LOG.info("DBConfigureService resource ,the info is: driver:" + this.jdbcdriver + ",url:" + this.url
+ LOG.info("DBConfigureService resource ,the info is: driver:" + this.jdbcDriver + ",url:" + this.url
+ ",username:" + userName + ",password:" + password);
- regJdbcDriver(jdbcdriver);
+ regJdbcDriver(jdbcDriver);
}
public DBConfigureService() {
+ createTableIfNotExist();
}
- /**
- * @param properties
- */
public DBConfigureService(Properties properties) {
super(properties);
initProperty(properties);
+ createTableIfNotExist();
+ }
+
+ private void createTableIfNotExist() {
+ if (tableCreated) {
+ return;
+ }
+
+ synchronized (this) {
+ if (!tableCreated) {
+ JDBCDriver jdbcDriver = createResouce();
+
+ String sql = (Configure.createTableSQL(tableName));
+ jdbcDriver.execute(sql);
+
+ tableCreated = true;
+ }
+ }
}
@Override
@@ -107,9 +123,7 @@ public class DBConfigureService extends AbstractConfigurableService implements I
JDBCDriver resource = createResouce();
try {
String namespace = "namespace";
- if (isCompatibilityOldRuleEngine && AbstractComponent.JDBC_COMPATIBILITY_RULEENGINE_TABLE_NAME.equals(tableName)) {
- namespace = "name_space";
- }
+
String sql = "SELECT * FROM `" + tableName + "` WHERE " + namespace + " in (" + SQLUtil.createInSql(namespaces) + ") and status =1";
if (StringUtil.isNotEmpty(type)) {
sql = sql + " and type='" + type + "'";
@@ -198,12 +212,12 @@ public class DBConfigureService extends AbstractConfigurableService implements I
}
protected JDBCDriver createResouce() {
- JDBCDriver resource = DriverBuilder.createDriver(this.jdbcdriver, this.url, this.userName, this.password);
+ JDBCDriver resource = DriverBuilder.createDriver(this.jdbcDriver, this.url, this.userName, this.password);
return resource;
}
- public void setJdbcdriver(String jdbcdriver) {
- this.jdbcdriver = jdbcdriver;
+ public void setJdbcDriver(String jdbcDriver) {
+ this.jdbcDriver = jdbcDriver;
}
public void setUrl(String url) {
@@ -233,21 +247,18 @@ public class DBConfigureService extends AbstractConfigurableService implements I
@Override
public void initProperty(Properties properties) {
- this.jdbcdriver = properties.getProperty(AbstractComponent.JDBC_DRIVER);
- regJdbcDriver(jdbcdriver);
+ this.jdbcDriver = properties.getProperty(AbstractComponent.JDBC_DRIVER);
+ regJdbcDriver(jdbcDriver);
this.url = properties.getProperty(AbstractComponent.JDBC_URL);
this.userName = properties.getProperty(AbstractComponent.JDBC_USERNAME);
this.password = properties.getProperty(AbstractComponent.JDBC_PASSWORD);
String tableName = properties.getProperty(AbstractComponent.JDBC_TABLE_NAME);
- String isCompatibilityOldRuleEngine = properties.getProperty(AbstractComponent.JDBC_COMPATIBILITY_OLD_RULEENGINE);
- if (StringUtil.isNotEmpty(isCompatibilityOldRuleEngine)) {
- this.isCompatibilityOldRuleEngine = true;
- }
+
if (StringUtil.isNotEmpty(tableName)) {
this.tableName = tableName;
}
LOG.info(
- "Properties resource ,the info is: driver:" + this.jdbcdriver + ",url:" + this.url + ",username:" + userName
+ "Properties resource ,the info is: driver:" + this.jdbcDriver + ",url:" + this.url + ",username:" + userName
+ ",password:" + password);
}