You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/05 12:54:38 UTC
[rocketmq-streams] 40/46: add dbinit module,
it will create tables into user's database,
the first version can support MySql only
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit 320904471e1b4c975404598dcf045beaa284f6cf
Author: xstorm1 <xs...@live.cn>
AuthorDate: Tue Aug 3 14:42:10 2021 +0800
add dbinit module, it will create tables into user's database, the first version can support MySql only
---
pom.xml | 2 +-
.../streams/common/configure/ConfigureFileKey.java | 1 +
.../rocketmq/streams/db/driver/orm/ORMUtil.java | 23 +++
rocketmq-streams-dbinit/pom.xml | 38 ++++
.../streams/dbinit/mysql/delegate/DBDelegate.java | 9 +
.../dbinit/mysql/delegate/DBDelegateFactory.java | 27 +++
.../streams/dbinit/mysql/delegate/DBType.java | 6 +
.../dbinit/mysql/delegate/MysqlDelegate.java | 48 +++++
.../src/main/resources/tables_mysql_innodb.sql | 199 +++++++++++++++++++++
9 files changed, 352 insertions(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 3675792..2969816 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,13 +31,13 @@
<module>rocketmq-streams-dim</module>
<module>rocketmq-streams-transport-minio</module>
<module>rocketmq-streams-script</module>
- <module>rocketmq-streams-script-python</module>
<module>rocketmq-streams-configurable</module>
<module>rocketmq-streams-serviceloader</module>
<module>rocketmq-streams-filter</module>
<module>rocketmq-streams-schedule</module>
<module>rocketmq-streams-lease</module>
<module>rocketmq-streams-db-operator</module>
+ <module>rocketmq-streams-dbinit</module>
<module>rocketmq-streams-window</module>
<module>rocketmq-streams-clients</module>
<module>rocketmq-streams-channel-rocketmq</module>
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
index 316a3b2..e6a796d 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
@@ -27,6 +27,7 @@ public interface ConfigureFileKey {
/**
* 数据库url
*/
+ String DB_TYPE = "dipper.rds.jdbc.type";
String JDBC_URL = "dipper.rds.jdbc.url";
String JDBC_USERNAME = "dipper.rds.jdbc.username";
String JDBC_PASSWORD = "dipper.rds.jdbc.password";
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java
index 20529b0..670cb76 100644
--- a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java
@@ -161,6 +161,29 @@ public class ORMUtil {
}
}
+
+ public static boolean executeSQL(String sql, Object paras, String driver, final String url, final String userName,
+ final String password) {
+ if (paras != null) {
+ sql = SQLUtil.parseIbatisSQL(paras, sql);
+ }
+ JDBCDriver dataSource = null;
+ try {
+ dataSource = DriverBuilder.createDriver(driver, url, userName, password);
+ dataSource.execute(sql);
+ return true;
+ } catch (Exception e) {
+ String errorMsg = ("execute sql error ,the sql is " + sql + ". the error msg is " + e.getMessage());
+ LOG.error(errorMsg);
+ e.printStackTrace();
+ throw new RuntimeException(errorMsg, e);
+ } finally {
+ if (dataSource != null) {
+ dataSource.destroy();
+ }
+ }
+ }
+
/**
* 把一个对象的字段拼接成where条件,如果字段值为null,不拼接
*
diff --git a/rocketmq-streams-dbinit/pom.xml b/rocketmq-streams-dbinit/pom.xml
new file mode 100644
index 0000000..3434616
--- /dev/null
+++ b/rocketmq-streams-dbinit/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-streams</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>rocketmq-streams-dbinit</artifactId>
+ <name>ROCKETMQ STREAMS :: dbinit</name>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+
+ <includes>
+ <include>**/*.sql</include>
+ <include>**/*.properties</include>
+ </includes>
+
+
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ </build>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-db-operator</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegate.java b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegate.java
new file mode 100644
index 0000000..2737fb5
--- /dev/null
+++ b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegate.java
@@ -0,0 +1,9 @@
+package org.apache.rocketmq.streams.dbinit.mysql.delegate;
+
+public interface DBDelegate {
+
+ public void init(String driver, String url, String userName,
+ String password);
+
+ public void init();
+}
diff --git a/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegateFactory.java b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegateFactory.java
new file mode 100644
index 0000000..3b02516
--- /dev/null
+++ b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegateFactory.java
@@ -0,0 +1,27 @@
+package org.apache.rocketmq.streams.dbinit.mysql.delegate;
+
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+
+public class DBDelegateFactory {
+
+ public static DBDelegate getDelegate() {
+ String dbType = ComponentCreator.getProperties().getProperty(ConfigureFileKey.DB_TYPE);
+ if (dbType == null || "".equalsIgnoreCase(dbType)) {
+ dbType = DBType.DB_MYSQL;
+ }
+ if (DBType.DB_MYSQL.equalsIgnoreCase(dbType)) {
+ return new MysqlDelegate();
+ }
+
+ return new MysqlDelegate();
+ }
+
+ public static DBDelegate getDelegate(String dbType) {
+ if (DBType.DB_MYSQL.equalsIgnoreCase(dbType)) {
+ return new MysqlDelegate();
+ }
+
+ return new MysqlDelegate();
+ }
+}
diff --git a/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBType.java b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBType.java
new file mode 100644
index 0000000..d8908f8
--- /dev/null
+++ b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBType.java
@@ -0,0 +1,6 @@
+package org.apache.rocketmq.streams.dbinit.mysql.delegate;
+
+public class DBType {
+
+ public static final String DB_MYSQL = "MYSQL";
+}
diff --git a/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/MysqlDelegate.java b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/MysqlDelegate.java
new file mode 100644
index 0000000..1295368
--- /dev/null
+++ b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/MysqlDelegate.java
@@ -0,0 +1,48 @@
+package org.apache.rocketmq.streams.dbinit.mysql.delegate;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
+
+import java.io.IOException;
+import java.net.URL;
+
+public class MysqlDelegate implements DBDelegate {
+
+ public static final Log LOG = LogFactory.getLog(MysqlDelegate.class);
+
+
+ @Override
+ public void init(String driver, final String url, final String userName,
+ final String password) {
+ String[] sqls = loadSqls();
+ for (String sql : sqls) {
+ ORMUtil.executeSQL(sql, null, driver, url, userName, password);
+ }
+ }
+
+ @Override
+ public void init() {
+ String[] sqls = loadSqls();
+ for (String sql : sqls) {
+ ORMUtil.executeSQL(sql, null);
+ }
+ }
+
+ private String[] loadSqls() {
+ String[] sqls = null;
+ URL url = this.getClass().getClassLoader().getResource("tables_mysql_innodb.sql");
+ try {
+ String tables = FileUtil.loadFileContent(url.openStream());
+ sqls = tables.split(";");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Init db sqls : " + tables);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return sqls;
+ }
+
+}
diff --git a/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql b/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql
new file mode 100644
index 0000000..dc5771f
--- /dev/null
+++ b/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql
@@ -0,0 +1,199 @@
+CREATE TABLE IF NOT EXISTS `window_max_value` (
+ `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+ `gmt_create` datetime NOT NULL,
+ `gmt_modified` datetime NOT NULL,
+ `max_value` bigint(20) unsigned NOT NULL,
+ `max_event_time` bigint(20) unsigned NOT NULL,
+ `msg_key` varchar(256) NOT NULL,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uk__ket` (`msg_key`(250)),
+ KEY `idx_modifytime` (`gmt_modified`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS `window_value` (
+ `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+ `gmt_create` datetime NOT NULL,
+ `gmt_modified` datetime NOT NULL,
+ `start_time` varchar(20) NOT NULL,
+ `end_time` varchar(20) NOT NULL,
+ `max_offset` longtext,
+ `group_by` text,
+ `agg_column_result` longtext,
+ `computed_column_result` longtext,
+ `version` varchar(64) DEFAULT NULL,
+ `name_space` varchar(256) DEFAULT NULL,
+ `configure_name` varchar(256) DEFAULT NULL,
+ `msg_key` varchar(64) NOT NULL,
+ `window_instance_id` varchar(64) NOT NULL,
+ `partition` varchar(512) DEFAULT NULL,
+ `partition_num` bigint(20) DEFAULT NULL,
+ `fire_time` varchar(20) DEFAULT NULL,
+ `update_version` bigint(20) unsigned DEFAULT NULL,
+ `update_flag` bigint(20) DEFAULT NULL,
+ `window_instance_partition_id` varchar(64) DEFAULT NULL,
+ `type` varchar(64) DEFAULT NULL,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uk_window_state` (`msg_key`),
+ KEY `idx_window_instance_shuffle` (`window_instance_partition_id`,`partition_num`),
+ KEY `idx_window_instance_firetime` (`window_instance_partition_id`,`fire_time`),
+ KEY `idx_window` (`name_space`(128),`configure_name`(128),`partition`(128))
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS `window_task` (
+ `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+ `gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ `gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ `task_id` varchar(64) NOT NULL,
+ `untreated_flag` int(11) NOT NULL DEFAULT '0',
+ `group_by_value` varchar(1024) NOT NULL,
+ `task_owner` varchar(256) NOT NULL,
+ `task_send_time` datetime DEFAULT NULL,
+ `send_task_msg` text NOT NULL,
+ `msg_send_time` bigint(20) DEFAULT NULL,
+ `name` varchar(128) NOT NULL,
+ `start_time` varchar(20) NOT NULL,
+ `end_time` varchar(20) NOT NULL,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uk_taskid` (`task_id`),
+ KEY `idx_flag_modifytime` (`name`,`untreated_flag`,`gmt_modified`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS `window_instance` (
+ `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+ `gmt_create` datetime NOT NULL,
+ `gmt_modified` datetime NOT NULL,
+ `start_time` varchar(20) NOT NULL,
+ `end_time` varchar(20) NOT NULL,
+ `fire_time` varchar(20) NOT NULL,
+ `window_name` varchar(128) NOT NULL,
+ `window_name_space` varchar(128) NOT NULL,
+ `status` tinyint(4) NOT NULL DEFAULT '0',
+ `version` int(11) DEFAULT '0',
+ `window_instance_key` varchar(128) DEFAULT NULL,
+ `window_instance_name` varchar(128) DEFAULT NULL,
+ `window_Instance_split_name` varchar(128) DEFAULT NULL,
+ `split_id` varchar(128) DEFAULT NULL,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uk_window_instance_uniq_index` (`window_instance_key`),
+ KEY `idx_gmt_modified` (`fire_time`,`window_name`,`window_name_space`,`status`),
+ KEY `idx_windowinstance_name` (`window_instance_name`),
+ KEY `idx_windowinstance_split_name` (`window_Instance_split_name`),
+ KEY `idx_windowinstance_split_name_firetime` (`window_Instance_split_name`,`fire_time`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS `lease_info` (
+ `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+ `gmt_create` datetime NOT NULL,
+ `gmt_modified` datetime NOT NULL,
+ `lease_name` varchar(255) NOT NULL,
+ `lease_user_ip` varchar(255) NOT NULL,
+ `lease_end_time` varchar(255) NOT NULL,
+ `status` int(11) NOT NULL DEFAULT '1',
+ `version` bigint(20) NOT NULL,
+ `candidate_lease_ip` varchar(255) DEFAULT NULL,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uk_name` (`lease_name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS `dipper_sql_configure` (
+ `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+ `gmt_create` datetime NOT NULL,
+ `gmt_modified` datetime NOT NULL,
+ `namespace` varchar(32) NOT NULL,
+ `type` varchar(32) NOT NULL,
+ `name` varchar(128) NOT NULL,
+ `json_value` longtext NOT NULL,
+ `request_id` varchar(128) NOT NULL,
+ `account_id` varchar(32) NOT NULL,
+ `account_name` varchar(32) NOT NULL,
+ `account_nickname` varchar(32) NOT NULL,
+ `client_ip` varchar(64) NOT NULL,
+ `status` tinyint(3) unsigned NOT NULL DEFAULT '0',
+ `is_publish` int(11) NOT NULL DEFAULT '0',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uk_namespace_type_name` (`namespace`,`type`,`name`),
+ KEY `idx_namespace` (`namespace`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS `dipper_configure` (
+ `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+ `gmt_create` datetime NOT NULL,
+ `gmt_modified` datetime NOT NULL,
+ `namespace` varchar(32) NOT NULL,
+ `type` varchar(32) NOT NULL,
+ `name` varchar(128) NOT NULL,
+ `json_value` text NOT NULL,
+ `request_id` varchar(128) NOT NULL,
+ `account_id` varchar(32) NOT NULL,
+ `account_name` varchar(32) NOT NULL,
+ `account_nickname` varchar(32) NOT NULL,
+ `client_ip` varchar(64) NOT NULL,
+ `status` tinyint(3) unsigned NOT NULL DEFAULT '0',
+ `isPublish` int(1) NOT NULL DEFAULT '0',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uk_namespace_type_name` (`namespace`,`type`,`name`),
+ KEY `idx_namespace` (`namespace`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS `join_right_state` (
+ `id` bigint(20) NOT NULL AUTO_INCREMENT,
+ `gmt_create` datetime DEFAULT NULL,
+ `gmt_modified` datetime DEFAULT NULL,
+ `window_id` bigint(20) DEFAULT NULL,
+ `window_name` varchar(200) DEFAULT NULL,
+ `window_name_space` varchar(45) DEFAULT NULL,
+ `message_id` varchar(200) DEFAULT NULL,
+ `message_key` varchar(32) DEFAULT NULL,
+ `message_time` datetime DEFAULT NULL,
+ `message_body` longtext,
+ `msg_key` varchar(400) DEFAULT NULL,
+ `window_instance_id` varchar(200) DEFAULT NULL,
+ `partition` varchar(200) DEFAULT NULL,
+ `partition_num` bigint(20) DEFAULT NULL,
+ `window_instance_partition_id` varchar(200) DEFAULT NULL,
+ `version` varchar(64) DEFAULT NULL,
+ `update_flag` bigint(20) DEFAULT NULL,
+ `name_space` varchar(256) DEFAULT NULL,
+ `configure_name` varchar(256) DEFAULT NULL,
+ `type` varchar(64) DEFAULT NULL,
+ `name` varchar(64) DEFAULT NULL,
+ `update_version` bigint(20) unsigned DEFAULT NULL,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uk_message_id_unique` (`message_id`),
+ KEY `idx_message_key_index` (`message_key`),
+ KEY `idx_gmt_create_index` (`gmt_create`),
+ KEY `idx_window_name_index` (`window_name`(70)),
+ KEY `idx_message_key_gmt_create_index` (`message_key`,`gmt_create`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS `join_left_state` (
+ `id` bigint(20) NOT NULL AUTO_INCREMENT,
+ `gmt_create` datetime DEFAULT NULL,
+ `gmt_modified` datetime DEFAULT NULL,
+ `window_id` bigint(20) DEFAULT NULL,
+ `window_name` varchar(200) DEFAULT NULL,
+ `window_name_space` varchar(45) DEFAULT NULL,
+ `message_id` varchar(200) DEFAULT NULL,
+ `message_key` varchar(32) DEFAULT NULL,
+ `message_time` datetime DEFAULT NULL,
+ `message_body` longtext,
+ `msg_key` varchar(400) DEFAULT NULL,
+ `window_instance_id` varchar(200) DEFAULT NULL,
+ `partition` varchar(200) DEFAULT NULL,
+ `partition_num` bigint(20) DEFAULT NULL,
+ `window_instance_partition_id` varchar(200) DEFAULT NULL,
+ `version` varchar(64) DEFAULT NULL,
+ `update_flag` bigint(20) DEFAULT NULL,
+ `name_space` varchar(256) DEFAULT NULL,
+ `configure_name` varchar(256) DEFAULT NULL,
+ `type` varchar(64) DEFAULT NULL,
+ `name` varchar(64) DEFAULT NULL,
+ `update_version` bigint(20) unsigned DEFAULT NULL,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uk_message_id_unique` (`message_id`),
+ KEY `idx_message_key_index` (`message_key`),
+ KEY `idx_gmt_create_index` (`gmt_create`),
+ KEY `idx_window_name_index` (`window_name`(70)),
+ KEY `idx_message_key_gmt_create_index` (`message_key`,`gmt_create`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+