You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/05 12:54:38 UTC

[rocketmq-streams] 40/46: add dbinit module, it will create tables into user's database, the first version can support MySql only

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git

commit 320904471e1b4c975404598dcf045beaa284f6cf
Author: xstorm1 <xs...@live.cn>
AuthorDate: Tue Aug 3 14:42:10 2021 +0800

    add dbinit module, it will create tables into user's database, the first version can support MySql only
---
 pom.xml                                            |   2 +-
 .../streams/common/configure/ConfigureFileKey.java |   1 +
 .../rocketmq/streams/db/driver/orm/ORMUtil.java    |  23 +++
 rocketmq-streams-dbinit/pom.xml                    |  38 ++++
 .../streams/dbinit/mysql/delegate/DBDelegate.java  |   9 +
 .../dbinit/mysql/delegate/DBDelegateFactory.java   |  27 +++
 .../streams/dbinit/mysql/delegate/DBType.java      |   6 +
 .../dbinit/mysql/delegate/MysqlDelegate.java       |  48 +++++
 .../src/main/resources/tables_mysql_innodb.sql     | 199 +++++++++++++++++++++
 9 files changed, 352 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 3675792..2969816 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,13 +31,13 @@
         <module>rocketmq-streams-dim</module>
         <module>rocketmq-streams-transport-minio</module>
         <module>rocketmq-streams-script</module>
-        <module>rocketmq-streams-script-python</module>
         <module>rocketmq-streams-configurable</module>
         <module>rocketmq-streams-serviceloader</module>
         <module>rocketmq-streams-filter</module>
         <module>rocketmq-streams-schedule</module>
         <module>rocketmq-streams-lease</module>
         <module>rocketmq-streams-db-operator</module>
+        <module>rocketmq-streams-dbinit</module>
         <module>rocketmq-streams-window</module>
         <module>rocketmq-streams-clients</module>
         <module>rocketmq-streams-channel-rocketmq</module>
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
index 316a3b2..e6a796d 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java
@@ -27,6 +27,7 @@ public interface ConfigureFileKey {
     /**
      * 数据库url
      */
+    String DB_TYPE = "dipper.rds.jdbc.type";
     String JDBC_URL = "dipper.rds.jdbc.url";
     String JDBC_USERNAME = "dipper.rds.jdbc.username";
     String JDBC_PASSWORD = "dipper.rds.jdbc.password";
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java
index 20529b0..670cb76 100644
--- a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java
@@ -161,6 +161,29 @@ public class ORMUtil {
         }
     }
 
+
+    public static boolean executeSQL(String sql, Object paras, String driver, final String url, final String userName,
+                                     final String password) {
+        if (paras != null) {
+            sql = SQLUtil.parseIbatisSQL(paras, sql);
+        }
+        JDBCDriver dataSource = null;
+        try {
+            dataSource = DriverBuilder.createDriver(driver, url, userName, password);
+            dataSource.execute(sql);
+            return true;
+        } catch (Exception e) {
+            String errorMsg = ("execute sql  error ,the sql is " + sql + ". the error msg is " + e.getMessage());
+            LOG.error(errorMsg);
+            e.printStackTrace();
+            throw new RuntimeException(errorMsg, e);
+        } finally {
+            if (dataSource != null) {
+                dataSource.destroy();
+            }
+        }
+    }
+
     /**
      * 把一个对象的字段拼接成where条件,如果字段值为null,不拼接
      *
diff --git a/rocketmq-streams-dbinit/pom.xml b/rocketmq-streams-dbinit/pom.xml
new file mode 100644
index 0000000..3434616
--- /dev/null
+++ b/rocketmq-streams-dbinit/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-streams</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>rocketmq-streams-dbinit</artifactId>
+    <name>ROCKETMQ STREAMS :: dbinit</name>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+
+                <includes>
+                    <include>**/*.sql</include>
+                    <include>**/*.properties</include>
+                </includes>
+
+
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+    </build>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-streams-db-operator</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegate.java b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegate.java
new file mode 100644
index 0000000..2737fb5
--- /dev/null
+++ b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegate.java
@@ -0,0 +1,9 @@
+package org.apache.rocketmq.streams.dbinit.mysql.delegate;
+
+public interface DBDelegate {
+
+    public void init(String driver, String url, String userName,
+                              String password);
+
+    public void init();
+}
diff --git a/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegateFactory.java b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegateFactory.java
new file mode 100644
index 0000000..3b02516
--- /dev/null
+++ b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegateFactory.java
@@ -0,0 +1,27 @@
+package org.apache.rocketmq.streams.dbinit.mysql.delegate;
+
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+
+public class DBDelegateFactory {
+
+    public static DBDelegate getDelegate() {
+        String dbType = ComponentCreator.getProperties().getProperty(ConfigureFileKey.DB_TYPE);
+        if (dbType == null || "".equalsIgnoreCase(dbType)) {
+            dbType = DBType.DB_MYSQL;
+        }
+        if (DBType.DB_MYSQL.equalsIgnoreCase(dbType)) {
+            return new MysqlDelegate();
+        }
+
+        return new MysqlDelegate();
+    }
+
+    public static DBDelegate getDelegate(String dbType) {
+        if (DBType.DB_MYSQL.equalsIgnoreCase(dbType)) {
+            return new MysqlDelegate();
+        }
+
+        return new MysqlDelegate();
+    }
+}
diff --git a/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBType.java b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBType.java
new file mode 100644
index 0000000..d8908f8
--- /dev/null
+++ b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBType.java
@@ -0,0 +1,6 @@
+package org.apache.rocketmq.streams.dbinit.mysql.delegate;
+
+public class DBType {
+
+    public static final String DB_MYSQL = "MYSQL";
+}
diff --git a/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/MysqlDelegate.java b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/MysqlDelegate.java
new file mode 100644
index 0000000..1295368
--- /dev/null
+++ b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/MysqlDelegate.java
@@ -0,0 +1,48 @@
+package org.apache.rocketmq.streams.dbinit.mysql.delegate;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
+
+import java.io.IOException;
+import java.net.URL;
+
+public class MysqlDelegate implements DBDelegate {
+
+    public static final Log LOG = LogFactory.getLog(MysqlDelegate.class);
+
+
+    @Override
+    public void init(String driver, final String url, final String userName,
+                     final String password) {
+        String[] sqls = loadSqls();
+        for (String sql : sqls) {
+            ORMUtil.executeSQL(sql, null, driver, url, userName, password);
+        }
+    }
+
+    @Override
+    public void init() {
+        String[] sqls = loadSqls();
+        for (String sql : sqls) {
+            ORMUtil.executeSQL(sql, null);
+        }
+    }
+
+    private String[] loadSqls() {
+        String[] sqls = null;
+        URL url = this.getClass().getClassLoader().getResource("tables_mysql_innodb.sql");
+        try {
+            String tables = FileUtil.loadFileContent(url.openStream());
+            sqls = tables.split(";");
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Init db sqls : " + tables);
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return sqls;
+    }
+
+}
diff --git a/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql b/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql
new file mode 100644
index 0000000..dc5771f
--- /dev/null
+++ b/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql
@@ -0,0 +1,199 @@
+CREATE TABLE IF NOT EXISTS  `window_max_value` (
+  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+  `gmt_create` datetime NOT NULL,
+  `gmt_modified` datetime NOT NULL,
+  `max_value` bigint(20) unsigned NOT NULL,
+  `max_event_time` bigint(20) unsigned NOT NULL,
+  `msg_key` varchar(256) NOT NULL,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `uk__ket` (`msg_key`(250)),
+  KEY `idx_modifytime` (`gmt_modified`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS  `window_value` (
+  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+  `gmt_create` datetime NOT NULL,
+  `gmt_modified` datetime NOT NULL,
+  `start_time` varchar(20) NOT NULL,
+  `end_time` varchar(20) NOT NULL,
+  `max_offset` longtext,
+  `group_by` text,
+  `agg_column_result` longtext,
+  `computed_column_result` longtext,
+  `version` varchar(64) DEFAULT NULL,
+  `name_space` varchar(256) DEFAULT NULL,
+  `configure_name` varchar(256) DEFAULT NULL,
+  `msg_key` varchar(64) NOT NULL,
+  `window_instance_id` varchar(64) NOT NULL,
+  `partition` varchar(512) DEFAULT NULL,
+  `partition_num` bigint(20) DEFAULT NULL,
+  `fire_time` varchar(20) DEFAULT NULL,
+  `update_version` bigint(20) unsigned DEFAULT NULL,
+  `update_flag` bigint(20) DEFAULT NULL,
+  `window_instance_partition_id` varchar(64) DEFAULT NULL,
+  `type` varchar(64) DEFAULT NULL,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `uk_window_state` (`msg_key`),
+  KEY `idx_window_instance_shuffle` (`window_instance_partition_id`,`partition_num`),
+  KEY `idx_window_instance_firetime` (`window_instance_partition_id`,`fire_time`),
+  KEY `idx_window` (`name_space`(128),`configure_name`(128),`partition`(128))
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS  `window_task` (
+  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+  `gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
+  `gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
+  `task_id` varchar(64) NOT NULL,
+  `untreated_flag` int(11) NOT NULL DEFAULT '0',
+  `group_by_value` varchar(1024) NOT NULL,
+  `task_owner` varchar(256) NOT NULL,
+  `task_send_time` datetime DEFAULT NULL,
+  `send_task_msg` text NOT NULL,
+  `msg_send_time` bigint(20) DEFAULT NULL,
+  `name` varchar(128) NOT NULL,
+  `start_time` varchar(20) NOT NULL,
+  `end_time` varchar(20) NOT NULL,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `uk_taskid` (`task_id`),
+  KEY `idx_flag_modifytime` (`name`,`untreated_flag`,`gmt_modified`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS  `window_instance` (
+  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+  `gmt_create` datetime NOT NULL,
+  `gmt_modified` datetime NOT NULL,
+  `start_time` varchar(20) NOT NULL,
+  `end_time` varchar(20) NOT NULL,
+  `fire_time` varchar(20) NOT NULL,
+  `window_name` varchar(128) NOT NULL,
+  `window_name_space` varchar(128) NOT NULL,
+  `status` tinyint(4) NOT NULL DEFAULT '0',
+  `version` int(11) DEFAULT '0',
+  `window_instance_key` varchar(128) DEFAULT NULL,
+  `window_instance_name` varchar(128) DEFAULT NULL,
+  `window_Instance_split_name` varchar(128) DEFAULT NULL,
+  `split_id` varchar(128) DEFAULT NULL,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `uk_window_instance_uniq_index` (`window_instance_key`),
+  KEY `idx_gmt_modified` (`fire_time`,`window_name`,`window_name_space`,`status`),
+  KEY `idx_windowinstance_name` (`window_instance_name`),
+  KEY `idx_windowinstance_split_name` (`window_Instance_split_name`),
+  KEY `idx_windowinstance_split_name_firetime` (`window_Instance_split_name`,`fire_time`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS  `lease_info` (
+  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+  `gmt_create` datetime NOT NULL,
+  `gmt_modified` datetime NOT NULL,
+  `lease_name` varchar(255) NOT NULL,
+  `lease_user_ip` varchar(255) NOT NULL,
+  `lease_end_time` varchar(255) NOT NULL,
+  `status` int(11) NOT NULL DEFAULT '1',
+  `version` bigint(20) NOT NULL,
+  `candidate_lease_ip` varchar(255) DEFAULT NULL,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `uk_name` (`lease_name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS  `dipper_sql_configure` (
+  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+  `gmt_create` datetime NOT NULL,
+  `gmt_modified` datetime NOT NULL,
+  `namespace` varchar(32) NOT NULL,
+  `type` varchar(32) NOT NULL,
+  `name` varchar(128) NOT NULL,
+  `json_value` longtext NOT NULL,
+  `request_id` varchar(128) NOT NULL,
+  `account_id` varchar(32) NOT NULL,
+  `account_name` varchar(32) NOT NULL,
+  `account_nickname` varchar(32) NOT NULL,
+  `client_ip` varchar(64) NOT NULL,
+  `status` tinyint(3) unsigned NOT NULL DEFAULT '0',
+  `is_publish` int(11) NOT NULL DEFAULT '0',
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `uk_namespace_type_name` (`namespace`,`type`,`name`),
+  KEY `idx_namespace` (`namespace`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS  `dipper_configure` (
+  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+  `gmt_create` datetime NOT NULL,
+  `gmt_modified` datetime NOT NULL,
+  `namespace` varchar(32) NOT NULL,
+  `type` varchar(32) NOT NULL,
+  `name` varchar(128) NOT NULL,
+  `json_value` text NOT NULL,
+  `request_id` varchar(128) NOT NULL,
+  `account_id` varchar(32) NOT NULL,
+  `account_name` varchar(32) NOT NULL,
+  `account_nickname` varchar(32) NOT NULL,
+  `client_ip` varchar(64) NOT NULL,
+  `status` tinyint(3) unsigned NOT NULL DEFAULT '0',
+  `isPublish` int(1) NOT NULL DEFAULT '0',
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `uk_namespace_type_name` (`namespace`,`type`,`name`),
+  KEY `idx_namespace` (`namespace`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS  `join_right_state` (
+  `id` bigint(20) NOT NULL AUTO_INCREMENT,
+  `gmt_create` datetime DEFAULT NULL,
+  `gmt_modified` datetime DEFAULT NULL,
+  `window_id` bigint(20) DEFAULT NULL,
+  `window_name` varchar(200) DEFAULT NULL,
+  `window_name_space` varchar(45) DEFAULT NULL,
+  `message_id` varchar(200) DEFAULT NULL,
+  `message_key` varchar(32) DEFAULT NULL,
+  `message_time` datetime DEFAULT NULL,
+  `message_body` longtext,
+  `msg_key` varchar(400) DEFAULT NULL,
+  `window_instance_id` varchar(200) DEFAULT NULL,
+  `partition` varchar(200) DEFAULT NULL,
+  `partition_num` bigint(20) DEFAULT NULL,
+  `window_instance_partition_id` varchar(200) DEFAULT NULL,
+  `version` varchar(64) DEFAULT NULL,
+  `update_flag` bigint(20) DEFAULT NULL,
+  `name_space` varchar(256) DEFAULT NULL,
+  `configure_name` varchar(256) DEFAULT NULL,
+  `type` varchar(64) DEFAULT NULL,
+  `name` varchar(64) DEFAULT NULL,
+  `update_version` bigint(20) unsigned DEFAULT NULL,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `uk_message_id_unique` (`message_id`),
+  KEY `idx_message_key_index` (`message_key`),
+  KEY `idx_gmt_create_index` (`gmt_create`),
+  KEY `idx_window_name_index` (`window_name`(70)),
+  KEY `idx_message_key_gmt_create_index` (`message_key`,`gmt_create`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE IF NOT EXISTS  `join_left_state` (
+  `id` bigint(20) NOT NULL AUTO_INCREMENT,
+  `gmt_create` datetime DEFAULT NULL,
+  `gmt_modified` datetime DEFAULT NULL,
+  `window_id` bigint(20) DEFAULT NULL,
+  `window_name` varchar(200) DEFAULT NULL,
+  `window_name_space` varchar(45) DEFAULT NULL,
+  `message_id` varchar(200) DEFAULT NULL,
+  `message_key` varchar(32) DEFAULT NULL,
+  `message_time` datetime DEFAULT NULL,
+  `message_body` longtext,
+  `msg_key` varchar(400) DEFAULT NULL,
+  `window_instance_id` varchar(200) DEFAULT NULL,
+  `partition` varchar(200) DEFAULT NULL,
+  `partition_num` bigint(20) DEFAULT NULL,
+  `window_instance_partition_id` varchar(200) DEFAULT NULL,
+  `version` varchar(64) DEFAULT NULL,
+  `update_flag` bigint(20) DEFAULT NULL,
+  `name_space` varchar(256) DEFAULT NULL,
+  `configure_name` varchar(256) DEFAULT NULL,
+  `type` varchar(64) DEFAULT NULL,
+  `name` varchar(64) DEFAULT NULL,
+  `update_version` bigint(20) unsigned DEFAULT NULL,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `uk_message_id_unique` (`message_id`),
+  KEY `idx_message_key_index` (`message_key`),
+  KEY `idx_gmt_create_index` (`gmt_create`),
+  KEY `idx_window_name_index` (`window_name`(70)),
+  KEY `idx_message_key_gmt_create_index` (`message_key`,`gmt_create`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+