You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/08/02 11:19:15 UTC
[rocketmq-streams] 13/27: add module db-operator、transport-minio
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit 9c898ecf0c5301f1ae1d0c144f15f96d2151ba96
Author: muyang <li...@alibaba-inc.com>
AuthorDate: Mon Aug 2 12:21:28 2021 +0800
add module db-operator、transport-minio
---
rocketmq-streams-db-operator/pom.xml | 34 ++
.../rocketmq-streams-db-operator.iml | 16 +
.../streams/db/configuable/DBConfigureService.java | 282 ++++++++++++
.../DBSupportParentConfigureService.java | 37 ++
.../rocketmq/streams/db/driver/DriverBuilder.java | 111 +++++
.../rocketmq/streams/db/driver/IDriverBudiler.java | 36 ++
.../rocketmq/streams/db/driver/JDBCDriver.java | 277 ++++++++++++
.../db/driver/batchloader/BatchRowLoader.java | 179 ++++++++
.../db/driver/batchloader/IRowOperator.java | 33 ++
.../rocketmq/streams/db/driver/orm/ORMUtil.java | 490 +++++++++++++++++++++
.../rocketmq/streams/db/operator/SQLOperator.java | 178 ++++++++
.../org/apache/rocketmq/streams/db/Person.java | 110 +++++
.../DBSupportParentConfigureServiceTest.java | 74 ++++
.../streams/db/driver/orm/ORMUtilTest.java | 86 ++++
rocketmq-streams-transport-minio/pom.xml | 25 ++
.../rocketmq-streams-transport-minio.iml | 17 +
.../transport/minio/MinioFileTransport.java | 141 ++++++
.../yundun/dipper/configurable/DataTpyeTest.java | 70 +++
.../streams/configuable/model/DataTpyeTest.java | 68 +++
.../rocketmq/streams/configuable/model/Person.java | 97 ++++
.../streams/configurable/model/Person.java | 97 ++++
.../component/ConfigurableComponent.properties | 7 +
.../src/test/resources/log4j.xml | 20 +
.../src/test/resources/pro-function.txt | 11 +
.../src/test/resources/python_script.py | 22 +
25 files changed, 2518 insertions(+)
diff --git a/rocketmq-streams-db-operator/pom.xml b/rocketmq-streams-db-operator/pom.xml
new file mode 100755
index 0000000..9a9b17b
--- /dev/null
+++ b/rocketmq-streams-db-operator/pom.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="utf-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>rocketmq-streams-db-operator</artifactId>
+ <name>ROCKETMQ STREAMS :: db-operator</name>
+ <packaging>jar</packaging>
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-configurable</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-jdbc</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ </dependency>
+
+
+ </dependencies>
+</project>
diff --git a/rocketmq-streams-db-operator/rocketmq-streams-db-operator.iml b/rocketmq-streams-db-operator/rocketmq-streams-db-operator.iml
new file mode 100644
index 0000000..38ffb14
--- /dev/null
+++ b/rocketmq-streams-db-operator/rocketmq-streams-db-operator.iml
@@ -0,0 +1,16 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
+ <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5">
+ <output url="file://$MODULE_DIR$/${project.build.directory}/classes" />
+ <output-test url="file://$MODULE_DIR$/${project.build.directory}/test-classes" />
+ <content url="file://$MODULE_DIR$">
+ <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
+ <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
+ <excludeFolder url="file://$MODULE_DIR$/${project.build.directory}/classes" />
+ <excludeFolder url="file://$MODULE_DIR$/${project.build.directory}/test-classes" />
+ <excludeFolder url="file://$MODULE_DIR$/target" />
+ </content>
+ <orderEntry type="inheritedJdk" />
+ <orderEntry type="sourceFolder" forTests="false" />
+ </component>
+</module>
\ No newline at end of file
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java
new file mode 100644
index 0000000..ef319eb
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.configuable;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.configurable.service.AbstractConfigurableService;
+import org.apache.rocketmq.streams.configurable.model.Configure;
+
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.common.interfaces.IPropertyEnable;
+import org.apache.rocketmq.streams.common.utils.AESUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.SQLUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.db.driver.JDBCDriver;
+import org.apache.rocketmq.streams.db.driver.DriverBuilder;
+
+import java.util.*;
+
+/**
+ * Configuable对象存储在db中,是生成环境常用的一种模式 数据库参数可以配置在配置文件中,ConfiguableComponent在启动时,会把参数封装在Properties中,调用DBConfigureService(Properties properties) 构造方法完成实例创建
+ */
+
+public class DBConfigureService extends AbstractConfigurableService implements IPropertyEnable {
+
+ private static final Log LOG = LogFactory.getLog(DBConfigureService.class);
+ private String jdbcdriver;
+ private String url;
+ private String userName;
+ private String password;
+ private String tableName = "dipper_configure";
+ @Deprecated
+ private boolean isCompatibilityOldRuleEngine = false;//兼容老规则引擎使用,正常场景不需要理会
+
+ public DBConfigureService(String jdbcdriver, String url, String userName, String password) {
+ this(jdbcdriver, url, userName, password, null);
+ }
+
+ public DBConfigureService(String jdbcdriver, String url, String userName, String password, String tableName) {
+ this.url = url;
+ this.jdbcdriver = jdbcdriver;
+ this.userName = userName;
+ this.password = password;
+ this.tableName = tableName;
+ LOG.info("DBConfigureService resource ,the info is: driver:" + this.jdbcdriver + ",url:" + this.url
+ + ",username:" + userName + ",password:" + password);
+ regJdbcDriver(jdbcdriver);
+ }
+
+ public DBConfigureService() {
+ }
+
+ /**
+ * @param properties
+ */
+ public DBConfigureService(Properties properties) {
+ super(properties);
+ initProperty(properties);
+ }
+
+ @Override
+ protected GetConfigureResult loadConfigurable(String namespace) {
+ GetConfigureResult result = new GetConfigureResult();
+ try {
+ List<Configure> configures = selectOpening(namespace);
+ List<IConfigurable> configurables = convert(configures);
+ result.setConfigurables(configurables);
+ result.setQuerySuccess(true);// 该字段标示查询是否成功,若不成功则不会更新配置
+ } catch (Exception e) {
+ result.setQuerySuccess(false);
+ LOG.error("load configurable error ", e);
+ }
+ return result;
+ }
+
+ protected List<Configure> selectOpening(String namespace) {
+ return queryConfigureByNamespace(namespace);
+ }
+
+ protected List<Configure> queryConfigureByNamespace(String... namespaces) {
+ return queryConfigureByNamespaceInner(null, namespaces);
+ }
+
+ protected List<Configure> queryConfigureByNamespaceInner(String type, String... namespaces) {
+ JDBCDriver resource = createResouce();
+ try {
+ String namespace = "namespace";
+ if (isCompatibilityOldRuleEngine && AbstractComponent.JDBC_COMPATIBILITY_RULEENGINE_TABLE_NAME.equals(tableName)) {
+ namespace = "name_space";
+ }
+ String sql = "SELECT * FROM `" + tableName + "` WHERE " + namespace + " in (" + SQLUtil.createInSql(namespaces) + ") and status =1";
+ if (StringUtil.isNotEmpty(type)) {
+ sql = sql + " and type='" + type + "'";
+ }
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("namespace", MapKeyUtil.createKeyBySign(",", namespaces));
+ sql = SQLUtil.parseIbatisSQL(jsonObject, sql);
+ // String builder = "SELECT * FROM `" + tableName + "` WHERE namespace ='" + namespace + "' and status =1";
+ List<Map<String, Object>> result = resource.queryForList(sql);
+ if (result == null) {
+ return new ArrayList<Configure>();
+ }
+ // LOG.info("load configurable's count is " + result.size());
+ return convert2Configure(result);
+ } finally {
+ if (resource != null) {
+ resource.destroy();
+ }
+ }
+ }
+
+ @Override
+ public List<IConfigurable> queryConfiguableByNamespace(String... namespaces) {
+ List<Configure> configures = queryConfigureByNamespace(namespaces);
+ List<IConfigurable> configurables = convert(configures);
+ return configurables;
+ }
+
+ public static void main(String[] args) {
+ String[] namespaces = new String[] {"rule1", null};
+ String sql = "SELECT * FROM `dipper_configure` WHERE namespace in (" + SQLUtil.createInSql(namespaces) + ") and status =1";
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("namespace", MapKeyUtil.createKeyBySign(",", namespaces));
+ sql = SQLUtil.parseIbatisSQL(jsonObject, sql);
+ System.out.println(sql);
+ }
+
+ protected void saveOrUpdate(IConfigurable configure) {
+ JDBCDriver jdbcDataSource = createResouce();
+ String sql = AbstractConfigurable.createSQL(configure, this.tableName);
+ try {
+ jdbcDataSource.executeInsert(sql);
+ } catch (Exception e) {
+ LOG.error("DBConfigureService saveOrUpdate error,sqlnode:" + sql);
+ throw new RuntimeException(e);
+ } finally {
+ if (jdbcDataSource != null) {
+ jdbcDataSource.destroy();
+ }
+ }
+ }
+
+ protected List<Configure> convert2Configure(List<Map<String, Object>> rows) {
+ List<Configure> configures = new ArrayList<Configure>();
+ for (Map<String, Object> row : rows) {
+ Configure configure = new Configure();
+ Long id = getColumnValue(row, "id");
+ configure.setId(id);
+ Date create = getColumnValue(row, "gmt_create");
+ configure.setGmtCreate(create);
+ Date modify = getColumnValue(row, "gmt_modified");
+ configure.setGmtModified(modify);
+ String namespace = getColumnValue(row, "namespace");
+ if (StringUtil.isEmpty(namespace)) {
+ namespace = getColumnValue(row, "name_space");
+ }
+ configure.setNameSpace(namespace);
+ String type = getColumnValue(row, "type");
+ configure.setType(type);
+ String name = getColumnValue(row, "name");
+ configure.setName(name);
+ String jsonValue = getColumnValue(row, "json_value");
+ try {
+ jsonValue = AESUtil.aesDecrypt(jsonValue, ConfigureFileKey.SECRECY);
+ } catch (Exception e) {
+ LOG.error("can't decrypt the value, reason:\t" + e.getCause());
+ throw new RuntimeException(e);
+ }
+ configure.setJsonValue(jsonValue);
+ configures.add(configure);
+ }
+ return configures;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected <T> T getColumnValue(Map<String, Object> row, String columnName) {
+ Object value = row.get(columnName);
+ if (value == null) {
+ return null;
+ }
+ if (java.math.BigInteger.class.isInstance(value)) {
+ return (T)Long.valueOf(value.toString());
+ }
+ return (T)value;
+
+ }
+
+ protected JDBCDriver createResouce() {
+ JDBCDriver resource = DriverBuilder.createDriver(this.jdbcdriver, this.url, this.userName, this.password);
+ return resource;
+ }
+
+ public void setJdbcdriver(String jdbcdriver) {
+ this.jdbcdriver = jdbcdriver;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ private void regJdbcDriver(String jdbcdriver) {
+ try {
+ if (StringUtil.isEmpty(jdbcdriver)) {
+ jdbcdriver = AbstractComponent.DEFAULT_JDBC_DRIVER;
+ }
+ Class.forName(jdbcdriver);
+ } catch (ClassNotFoundException e) {
+ LOG.error("DBConfigureService regJdbcDriver ClassNotFoundException error", e);
+ } catch (Exception e) {
+ LOG.error("DBConfigureService regJdbcDriver error", e);
+ }
+ }
+
+ @Override
+ public void initProperty(Properties properties) {
+ this.jdbcdriver = properties.getProperty(AbstractComponent.JDBC_DRIVER);
+ regJdbcDriver(jdbcdriver);
+ this.url = properties.getProperty(AbstractComponent.JDBC_URL);
+ this.userName = properties.getProperty(AbstractComponent.JDBC_USERNAME);
+ this.password = properties.getProperty(AbstractComponent.JDBC_PASSWORD);
+ String tableName = properties.getProperty(AbstractComponent.JDBC_TABLE_NAME);
+ String isCompatibilityOldRuleEngine = properties.getProperty(AbstractComponent.JDBC_COMPATIBILITY_OLD_RULEENGINE);
+ if (StringUtil.isNotEmpty(isCompatibilityOldRuleEngine)) {
+ this.isCompatibilityOldRuleEngine = true;
+ }
+ if (StringUtil.isNotEmpty(tableName)) {
+ this.tableName = tableName;
+ }
+ LOG.info(
+ "Properties resource ,the info is: driver:" + this.jdbcdriver + ",url:" + this.url + ",username:" + userName
+ + ",password:" + password);
+ }
+
+ @Override
+ protected void insertConfigurable(IConfigurable configurable) {
+ saveOrUpdate(configurable);
+ }
+
+ @Override
+ protected void updateConfigurable(IConfigurable configurable) {
+ saveOrUpdate(configurable);
+ }
+
+ @Override
+ public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+
+ List<Configure> configures = queryConfigureByNamespaceInner(type, namespace);
+ List<IConfigurable> configurables = convert(configures);
+ List<T> result = new ArrayList<>();
+ for (IConfigurable configurable : configurables) {
+ result.add((T)configurable);
+ }
+ return result;
+ }
+}
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureService.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureService.java
new file mode 100644
index 0000000..77b82e0
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.configuable;
+
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configurable.service.ConfigurableServcieType;
+import org.apache.rocketmq.streams.common.model.ServiceName;
+import com.google.auto.service.AutoService;
+import org.apache.rocketmq.streams.configurable.service.AbstractSupportParentConfigureService;
+
+import java.util.Properties;
+
+@AutoService(IConfigurableService.class)
+@ServiceName(ConfigurableServcieType.DEFAULT_SERVICE_NAME)
+public class DBSupportParentConfigureService extends AbstractSupportParentConfigureService {
+
+ @Override
+ protected void initBeforeInitConfigurable(Properties property) {
+ this.parentConfigureService = new DBConfigureService(property);
+ this.configureService = new DBConfigureService(property);
+
+ }
+}
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/DriverBuilder.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/DriverBuilder.java
new file mode 100644
index 0000000..c0e9f53
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/DriverBuilder.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.driver;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.utils.ReflectUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * 创建JDBCDriver,如果没有
+ */
+public class DriverBuilder {
+
+ private static final Log LOG = LogFactory.getLog(DriverBuilder.class);
+
+ public static final String DEFALUT_JDBC_DRIVER = "com.mysql.jdbc.Driver";
+
+ private static final Map<String, JDBCDriver> dataSourceMap = new ConcurrentHashMap<>();
+
+ private static AtomicInteger count = new AtomicInteger(0);
+
+ /**
+ * 使用ConfiguableComponent在属性文件配置的jdbc信息,dipper默认都是使用这个数据库连接 如果需要连接其他库,需要使用带参数的createDriver
+ *
+ * @return
+ */
+ public static JDBCDriver createDriver() {
+ String driver = ComponentCreator.getProperties().getProperty(AbstractComponent.JDBC_DRIVER);
+ String url = ComponentCreator.getProperties().getProperty(AbstractComponent.JDBC_URL);
+ String userName = ComponentCreator.getProperties().getProperty(AbstractComponent.JDBC_USERNAME);
+ String password = ComponentCreator.getProperties().getProperty(AbstractComponent.JDBC_PASSWORD);
+ return createDriver(driver, url, userName, password);
+ }
+
+ /**
+ * 根据数据库连接信息创建连接,并返回JDBCDriver
+ *
+ * @param driver 数据库驱动,如果为null,默认为mysql
+ * @param url 数据库连接url
+ * @param userName 用户名
+ * @param password 密码
+ * @return JDBCDriver
+ */
+ public static JDBCDriver createDriver(String driver, final String url, final String userName,
+ final String password) {
+ if (StringUtil.isEmpty(driver)) {
+ driver = DEFALUT_JDBC_DRIVER;
+ }
+ String className = ComponentCreator.getDBProxyClassName();
+ if (StringUtil.isNotEmpty(className)) {
+ Class clazz = ReflectUtil.forClass(className);
+ try {
+ Constructor constructor = clazz.getConstructor(
+ new Class[] {String.class, String.class, String.class, String.class});
+ JDBCDriver abstractDBDataSource = (JDBCDriver)constructor.newInstance(url, userName, password,
+ driver);
+ abstractDBDataSource.init();
+ return abstractDBDataSource;
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ final String jdbcdriver = driver;
+ ReflectUtil.forClass(jdbcdriver);
+ JDBCDriver resource = new JDBCDriver();
+ LOG.debug("jdbcdriver=" + jdbcdriver + ",url=" + url);
+ resource.setJdbcDriver(jdbcdriver);
+ resource.setUrl(url);
+ resource.setUserName(userName);
+ resource.setPassword(password);
+ resource.init();
+ return resource;
+ }
+
+ /**
+ * 生成拼接字符串
+ *
+ * @param url
+ * @param userName
+ * @param password
+ * @return
+ */
+ private static String genereateKey(String url, String userName, String password) {
+ return url + "_" + userName + "_" + password;
+ }
+
+}
+
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/IDriverBudiler.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/IDriverBudiler.java
new file mode 100644
index 0000000..6be77eb
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/IDriverBudiler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.driver;
+
+import org.apache.rocketmq.streams.common.dboperator.IDBDriver;
+
+/**
+ * 返回操作数据库的driver对象,并且提供方法,判断driver是否有效,以及销毁的方法
+ */
+public interface IDriverBudiler {
+
+ /**
+ * 和dipper系统同数据源
+ *
+ * @return
+ */
+ IDBDriver createDBDriver();
+
+ boolean isValidate();
+
+ void destroy();
+}
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/JDBCDriver.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/JDBCDriver.java
new file mode 100644
index 0000000..356bce6
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/JDBCDriver.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.driver;
+
+import org.apache.rocketmq.streams.common.channel.sink.ISink;
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
+import org.apache.rocketmq.streams.common.dboperator.IDBDriver;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.SingleConnectionDataSource;
+import org.springframework.jdbc.support.GeneratedKeyHolder;
+import org.springframework.jdbc.support.KeyHolder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 数据库常用操作的封装,核心实现的接口是IJdbcTemplate 这个对象实现了IConfigurable接口,可以序列化存储和网络传输 数据库参数,可以配置成名字,实际值在配置文件配置
+ * <p>
+ */
+public class JDBCDriver extends BasedConfigurable implements IDriverBudiler, IDBDriver {
+ private String jdbcDriver = DriverBuilder.DEFALUT_JDBC_DRIVER;
+ @ENVDependence
+ protected String url;
+ @ENVDependence
+ protected String userName;
+ @ENVDependence
+ protected String password;
+
+ protected transient javax.sql.DataSource dataSource;
+ private transient IDBDriver dbDriver = null;
+
+ public JDBCDriver(String url, String userName, String password,
+ String driver) {
+ setType(ISink.TYPE);
+ this.url = url;
+ this.userName = userName;
+ this.password = password;
+ if (StringUtil.isNotEmpty(driver)) {
+ this.jdbcDriver = driver;
+ }
+ }
+
+ public JDBCDriver() {
+ setType(ISink.TYPE);
+ }
+
+ protected IDBDriver createOrGetDriver() {
+ if (dbDriver == null) {
+ synchronized (this) {
+ if (dbDriver == null) {
+ dbDriver = createDBDriver();
+ if (dataSource == null) {
+ dataSource = createDBDataSource();
+ }
+ }
+ }
+ }
+ return dbDriver;
+ }
+
+ @Override
+ public IDBDriver createDBDriver() {
+ javax.sql.DataSource dataSource = createDBDataSource();
+ return new IDBDriver() {
+ private final JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
+
+ @Override
+ public int update(String sql) {
+ return jdbcTemplate.update(sql);
+ }
+
+ @Override
+ public void execute(String sql) {
+ jdbcTemplate.execute(sql);
+ }
+
+ @Override
+ public List<Map<String, Object>> queryForList(String sql) {
+ return jdbcTemplate.queryForList(sql);
+ }
+
+ @Override
+ public Map<String, Object> queryOneRow(String sql) {
+ return jdbcTemplate.queryForMap(sql);
+ }
+
+ @Override
+ public long executeInsert(String sql) {
+ try {
+ KeyHolder keyHolder = new GeneratedKeyHolder();
+ jdbcTemplate.update(con -> con.prepareStatement(sql, PreparedStatement.RETURN_GENERATED_KEYS), keyHolder);
+ if (keyHolder.getKeyList() == null || keyHolder.getKeyList().size() > 1 || keyHolder.getKey() == null) {
+ return 0;
+ }
+ return keyHolder.getKey().longValue();
+ } catch (Exception e) {
+ String errorMsg = "execute builder error ,the builder is " + sql + ". the error msg is " + e.getMessage();
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+
+ @Override
+ public void executSqls(String... sqls) {
+ jdbcTemplate.batchUpdate(sqls);
+ }
+
+ @Override
+ public void executSqls(Collection<String> sqlCollection) {
+ if (sqlCollection == null || sqlCollection.size() == 0) {
+ return;
+ }
+ String[] sqls = new String[sqlCollection.size()];
+ int i = 0;
+ Iterator<String> it = sqlCollection.iterator();
+ while (it.hasNext()) {
+ String sql = it.next();
+ sqls[i] = sql;
+ i++;
+ }
+ executSqls(sqls);
+ }
+
+ /**
+ * 分批获取数据,最终获取全量数据
+ * @param sql 可执行的SQL
+ * @return 结果数据
+ */
+ @Override
+ public List<Map<String, Object>> batchQueryBySql(String sql, int batchSize) {
+ List<Map<String, Object>> rows = new ArrayList<>();
+ int startBatch;
+ String baseSql = sql;
+ if (sql.contains(";")) {
+ baseSql = sql.substring(0, sql.indexOf(";"));
+ }
+ String batchSQL = baseSql + " limit 0," + batchSize;
+ List<Map<String, Object>> batchResult = queryForList(batchSQL);
+ int index = 1;
+ while (batchResult.size() >= batchSize) {
+ rows.addAll(batchResult);
+ startBatch = batchSize * index;
+ batchSQL = baseSql + " limit " + startBatch + "," + batchSize;
+ batchResult = queryForList(batchSQL);
+ index++;
+ }
+ rows.addAll(batchResult);
+
+ return rows;
+ }
+ };
+ }
+
+ protected javax.sql.DataSource createDBDataSource() {
+
+ SingleConnectionDataSource dataSource = new SingleConnectionDataSource(url, userName, password, true);
+
+ dataSource.setDriverClassName(jdbcDriver);// add by 林行0221
+ // 专有云落地运维中心联调时发现独立打包时必须加这句话,否则会报找不到驱动。(MYSQL驱动包虽然已经打进去了但还要在这里显示指定)
+ dataSource.setSuppressClose(true);
+ this.dataSource = dataSource;
+ return dataSource;
+ }
+
+ @Override
+ public boolean isValidate() {
+ try {
+ if (dataSource == null) {
+ dataSource = createDBDataSource();
+ }
+ dataSource.getConnection();
+ } catch (SQLException e) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void destroy() {
+ if (dataSource instanceof SingleConnectionDataSource) {
+ SingleConnectionDataSource data = (SingleConnectionDataSource)dataSource;
+ data.destroy();
+ }
+ }
+
+ public String getJdbcDriver() {
+ return jdbcDriver;
+ }
+
+ public void setJdbcDriver(String jdbcDriver) {
+ this.jdbcDriver = jdbcDriver;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ @Override
+ public int update(String sql) {
+ return createOrGetDriver().update(sql);
+ }
+
+ @Override
+ public void execute(String sql) {
+ createOrGetDriver().execute(sql);
+ }
+
+ @Override
+ public List<Map<String, Object>> queryForList(String sql) {
+ return createOrGetDriver().queryForList(sql);
+ }
+
+ @Override
+ public Map<String, Object> queryOneRow(String sql) {
+ return createOrGetDriver().queryOneRow(sql);
+ }
+
+ @Override
+ public long executeInsert(String sql) {
+ return createOrGetDriver().executeInsert(sql);
+ }
+
+ @Override
+ public void executSqls(String... sqls) {
+ createOrGetDriver().executSqls(sqls);
+ }
+
+ @Override
+ public void executSqls(Collection<String> sqls) {
+ createOrGetDriver().executSqls(sqls);
+ }
+
+ @Override
+ public List<Map<String, Object>> batchQueryBySql(String sql, int batchSize) {
+ return createOrGetDriver().batchQueryBySql(sql, batchSize);
+ }
+}
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/BatchRowLoader.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/BatchRowLoader.java
new file mode 100644
index 0000000..3e4548c
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/BatchRowLoader.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.driver.batchloader;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.alibaba.fastjson.JSONObject;
+
+import org.apache.rocketmq.streams.common.cache.compress.impl.IntValueKV;
+import org.apache.rocketmq.streams.common.utils.SQLUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.db.driver.JDBCDriver;
+import org.apache.rocketmq.streams.db.driver.DriverBuilder;
+
+/**
+ * 多线程批量加载数据,每加载一批数据后,通过IRowOperator回调接口处理数据 需要有递增的字段,这个字段有索引,不重复,如id字段
+ */
+public class BatchRowLoader {
+ private static final Log LOG = LogFactory.getLog(BatchRowLoader.class);
+ protected static final int MAX_LINE = 5000;//每个批次最大行数,根据这个值划分并行任务
+ protected static ExecutorService executorService = null;
+ protected String idFieldName;//配置字段名称,这个字段的值是数字的,且是递增的
+ protected String sql;//查询的sql语句,类似select * from table where idFieldName>#{idFieldName=0} order by idFieldName.不要加limit,系统会自动添加
+ protected int batchSize = 1000;//每批从数据库加载的数据量
+ protected IRowOperator dataRowProcessor;//加载的数据由这个回调接口处理
+ private JDBCDriver jdbcDriver;
+
+ public BatchRowLoader(String idFieldName, String sql, IRowOperator dataRowProcessor) {
+ this.idFieldName = idFieldName;
+ this.sql = sql;
+ this.dataRowProcessor = dataRowProcessor;
+ this.jdbcDriver = DriverBuilder.createDriver();
+ executorService = new ThreadPoolExecutor(20, 20,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(1000));
+ }
+
+ public void startLoadData() {
+ try {
+ String statisticalSQL = sql;
+ int startIndex = sql.toLowerCase().indexOf("from");
+ statisticalSQL = "select count(1) as c, min(" + idFieldName + ") as min, max(" + idFieldName + ") as max "
+ + sql.substring(startIndex);
+ List<Map<String, Object>> rows = jdbcDriver.queryForList(statisticalSQL);
+ Map<String, Object> row = rows.get(0);
+ int count = Integer.valueOf(row.get("c").toString());
+ if (count == 0) {
+ LOG.warn("there is no data during execute sql: " + statisticalSQL);
+ return;
+ }
+
+ IntValueKV intValueKV = new IntValueKV(count);
+ //int maxBatch=count/maxSyncCount;//每1w条数据,一个并发。如果数据量比较大,为了提高性能,并行执行
+
+ long min = Long.valueOf(row.get("min").toString());
+ long max = Long.valueOf(row.get("max").toString());
+ int maxSyncCount = count / MAX_LINE + 1;
+ long step = (max - min + 1) / maxSyncCount;
+ CountDownLatch countDownLatch = new CountDownLatch(maxSyncCount + 1);
+ AtomicInteger finishedCount = new AtomicInteger(0);
+ String taskSQL = null;
+ if (sql.indexOf(" where ") != -1) {
+ taskSQL = sql + " and " + idFieldName + ">#{startIndex} and " + idFieldName + "<=#{endIndex} order by "
+ + idFieldName + " limit " + batchSize;
+ } else {
+ taskSQL = sql + " where " + idFieldName + ">#{startIndex} and " + idFieldName
+ + "<=#{endIndex} order by " + idFieldName + " limit " + batchSize;
+ }
+
+ int i = 0;
+ for (; i < maxSyncCount; i++) {
+ FetchDataTask
+ fetchDataTask = new FetchDataTask(taskSQL, (min - 1) + step * i,
+ (min - 1) + step * (i + 1), countDownLatch, finishedCount, jdbcDriver, count);
+ executorService.execute(fetchDataTask);
+ }
+ FetchDataTask
+ fetchDataTask = new FetchDataTask(taskSQL, (min - 1) + step * i, (min - 1) + step * (i + 1),
+ countDownLatch, finishedCount, jdbcDriver, count);
+ executorService.execute(fetchDataTask);
+
+ countDownLatch.await();
+
+ LOG.info(getClass().getSimpleName() + " load data finish, load data line size is " + count);
+ } catch (Exception e) {
+ LOG.error("failed loading data batch!", e);
+ } finally {
+ jdbcDriver.destroy();
+ }
+ }
+
+ protected class FetchDataTask implements Runnable {
+ long startIndex;
+ long endIndex;
+ String sql;
+ CountDownLatch countDownLatch;
+ JDBCDriver resource;
+ AtomicInteger finishedCount;//完成了多少条
+ int totalSize;//一共有多少条数据
+
+ public FetchDataTask(String sql, long startIndex, long endIndex, CountDownLatch countDownLatch,
+ AtomicInteger finishedCount, JDBCDriver resource, int totalSize) {
+ this.startIndex = startIndex;
+ this.endIndex = endIndex;
+ this.countDownLatch = countDownLatch;
+ this.sql = sql;
+ this.finishedCount = finishedCount;
+ this.resource = resource;
+ this.totalSize = totalSize;
+ }
+
+ @Override
+ public void run() {
+ long currentIndex = startIndex;
+ JSONObject msg = new JSONObject();
+ msg.put("endIndex", endIndex);
+ while (true) {
+ try {
+
+ msg.put("startIndex", currentIndex);
+
+ String sql = SQLUtil.parseIbatisSQL(msg, this.sql);
+ List<Map<String, Object>> rows = resource.queryForList(sql);
+ if (rows == null || rows.size() == 0) {
+ break;
+ }
+ currentIndex = Long.valueOf(rows.get(rows.size() - 1).get(idFieldName).toString());
+
+ int size = rows.size();
+ int count = finishedCount.addAndGet(size);
+ double progress = (double)count / (double)totalSize;
+ progress = progress * 100;
+ System.out.println(" finished count is " + count + " the total count is " + totalSize + ", the progress is " + String.format("%.2f", progress) + "%");
+ if (size < batchSize) {
+ if (size > 0) {
+
+ doProcess(rows);
+ }
+ break;
+ }
+ doProcess(rows);
+ } catch (Exception e) {
+ throw new RuntimeException("put data error ", e);
+ }
+ }
+
+ countDownLatch.countDown();
+ }
+ }
+
+ private void doProcess(List<Map<String, Object>> rows) {
+ for (Map<String, Object> row : rows) {
+ dataRowProcessor.doProcess(row);
+ }
+ }
+}
+
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/IRowOperator.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/IRowOperator.java
new file mode 100644
index 0000000..f67393f
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/IRowOperator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.driver.batchloader;
+
+import java.util.Map;
+
+/**
+ * 操作一行数据
+ */
+public interface IRowOperator {
+
+ /**
+ * 处理一行数据
+ *
+ * @param row
+ */
+ void doProcess(Map<String, Object> row);
+
+}
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java
new file mode 100644
index 0000000..20529b0
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.driver.orm;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.common.model.Entity;
+import org.apache.rocketmq.streams.common.configurable.IFieldProcessor;
+import org.apache.rocketmq.streams.common.datatype.DataType;
+import org.apache.rocketmq.streams.common.metadata.MetaData;
+import org.apache.rocketmq.streams.common.utils.CollectionUtil;
+import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
+import org.apache.rocketmq.streams.common.utils.DateUtil;
+import org.apache.rocketmq.streams.common.utils.ReflectUtil;
+import org.apache.rocketmq.streams.common.utils.SQLUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.db.driver.DriverBuilder;
+import org.apache.rocketmq.streams.db.driver.JDBCDriver;
+
+/**
+ * 轻量级的orm框架,如果pojo和table 符合驼峰的命名和下划线的命名规范,可以自动实现对象的orm
+ */
+public class ORMUtil {
+ private static final Log LOG = LogFactory.getLog(ORMUtil.class);
+
+ public static <T> T queryForObject(String sql, Object paras, Class<T> convertClass) {
+ return queryForObject(sql, paras, convertClass, null, null, null);
+ }
+
+ /**
+ * 通过sql查询一个唯一的对象出来,返回数据多于一条会报错
+ *
+ * @param sql 查询语句
+ * @param paras 如果有变参,这里面有变参的参数,可以是map,json或对象,只要key名或字段名和sql的参数名相同即可
+ * @param convertClass,如果有变参,这里面有变参的参数,可以是map,json或对象,只要key名或字段名和sql的参数名相同即可
+ * @param url 数据库连接url
+ * @param userName 用户名
+ * @param password 密码
+ * @param <T>
+ * @return 转换后的对象
+ */
+ public static <T> T queryForObject(String sql, Object paras, Class<T> convertClass, String url, String userName,
+ String password) {
+ List<T> result = queryForList(sql, paras, convertClass, url, userName, password);
+ if (result == null || result.size() == 0) {
+ return null;
+ }
+ if (result.size() > 1) {
+ throw new RuntimeException("expect only one row, actual " + result.size() + ". the builder is " + sql);
+ }
+ return result.get(0);
+ }
+
+ /**
+ * 根据sql查询一批对象
+ *
+ * @param sql 查询语句
+ * @param paras 如果有变参,这里面有变参的参数,可以是map,json或对象,只要key名或字段名和sql的参数名相同即可
+ * @param convertClass 如果有变参,这里面有变参的参数,可以是map,json或对象,只要key名或字段名和sql的参数名相同即可
+ * @param <T>
+ * @return 返回对象列表
+ */
+ public static <T> List<T> queryForList(String sql, Object paras, Class<T> convertClass) {
+ return queryForList(sql, paras, convertClass, null, null, null);
+ }
+
+ public static boolean hasConfigueDB() {
+ return ComponentCreator.getProperties().getProperty(ConfigureFileKey.JDBC_URL) != null;
+ }
+
+ /**
+ * @param sql 查询语句
+ * @param paras 如果有变参,这里面有变参的参数,可以是map,json或对象,只要key名或字段名和sql的参数名相同即可
+ * @param convertClass 需要转换成的对象类。类的字段应该符合列名的命名规范,下划线换成驼峰形式
+ * @param url 数据库连接url
+ * @param userName 用户名
+ * @param password 密码
+ * @param <T>
+ * @return 返回对象列表
+ */
+ public static <T> List<T> queryForList(String sql, Object paras, Class<T> convertClass, String url, String userName,
+ String password) {
+ sql = SQLUtil.parseIbatisSQL(paras, sql);
+ JDBCDriver dataSource = null;
+ try {
+ if (StringUtil.isEmpty(url) || StringUtil.isEmpty(userName)) {
+ dataSource = DriverBuilder.createDriver();
+ } else {
+ dataSource = DriverBuilder.createDriver(DriverBuilder.DEFALUT_JDBC_DRIVER, url, userName, password);
+ }
+
+ List<Map<String, Object>> rows = dataSource.queryForList(sql);
+ List<T> result = new ArrayList();
+ for (Map<String, Object> row : rows) {
+ T t = convert(row, convertClass);
+ result.add(t);
+ }
+ return result;
+ } catch (Exception e) {
+ String errorMsg = ("query for list error ,the builder is " + sql + ". the error msg is " + e.getMessage());
+ LOG.error(errorMsg);
+ e.printStackTrace();
+ throw new RuntimeException(errorMsg, e);
+ } finally {
+ if (dataSource != null) {
+ dataSource.destroy();
+ }
+ }
+ }
+
+ /**
+ * 执行sql,sql中可以有mybatis的参数#{name}
+ *
+ * @param sql insert语句
+ * @param paras 可以是map,json或对象,只要key名或字段名和sql的参数名相同即可
+ * @return
+ */
+ public static boolean executeSQL(String sql, Object paras) {
+ if (paras != null) {
+ sql = SQLUtil.parseIbatisSQL(paras, sql);
+ }
+ JDBCDriver dataSource = null;
+ try {
+ dataSource = DriverBuilder.createDriver();
+ dataSource.execute(sql);
+ return true;
+ } catch (Exception e) {
+ String errorMsg = ("execute sql error ,the sql is " + sql + ". the error msg is " + e.getMessage());
+ LOG.error(errorMsg);
+ e.printStackTrace();
+ throw new RuntimeException(errorMsg, e);
+ } finally {
+ if (dataSource != null) {
+ dataSource.destroy();
+ }
+ }
+ }
+
+ /**
+ * 把一个对象的字段拼接成where条件,如果字段值为null,不拼接
+ *
+ * @param object 带拼接的对象
+ * @param fieldNames 需要拼接的字段名,如果为null,返回null
+ * @return where 部分的sql
+ */
+ public static String createQueryWhereSql(Object object, String... fieldNames) {
+ if (fieldNames == null || fieldNames.length == 0) {
+ return "";
+ }
+ StringBuilder stringBuilder = new StringBuilder();
+ boolean isFirst = true;
+ for (String fieldName : fieldNames) {
+ Object value = ReflectUtil.getBeanFieldValue(object, fieldName);
+ if (object != null && value == null) {
+ continue;
+ }
+ if (isFirst) {
+ isFirst = false;
+ } else {
+ stringBuilder.append(" and ");
+ }
+ String columnName = getColumnNameFromFieldName(fieldName);
+ stringBuilder.append(" " + columnName + "=#{" + fieldName + "} ");
+ }
+ return stringBuilder.toString();
+ }
+
+ /**
+ * 把一行数据转换成一个对象,符合驼峰的命名和下划线的命名规范
+ *
+ * @param row 一行数据
+ * @param clazz 待转化的对象类型
+ * @param <T>
+ * @return 转化对象
+ */
+ public static <T> T convert(Map<String, Object> row, Class<T> clazz) {
+ T t = ReflectUtil.forInstance(clazz);
+ Iterator<Map.Entry<String, Object>> it = row.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, Object> entry = it.next();
+ String columnName = entry.getKey();
+ Object value = entry.getValue();
+ if (value == null) {
+ continue;
+ }
+ String fieldName = getFieldNameFromColumnName(columnName);
+ DataType datatype = DataTypeUtil.createFieldDataType(clazz, fieldName);
+ Object columnValue = datatype.convert(value);
+ ReflectUtil.setBeanFieldValue(t, fieldName, columnValue);
+ }
+ return t;
+ }
+
+ /**
+ * 把列名转换成字段名称,把下划线转化成驼峰
+ *
+ * @param columnName
+ * @return
+ */
+ protected static String getFieldNameFromColumnName(String columnName) {
+ String[] values = columnName.split("_");
+ if (values.length == 1) {
+ return columnName;
+ }
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(values[0]);
+ for (int i = 1; i < values.length; i++) {
+ String value = values[i];
+ value = value.substring(0, 1).toUpperCase() + value.substring(1);
+ stringBuilder.append(value);
+ }
+ return stringBuilder.toString();
+ }
+
+ /**
+ * 对象批量替换,会生成replace into语句,多个对象会拼接成一个sql,提升效率
+ *
+ * @param values 待插入对象
+ */
+ public static void batchReplaceInto(Collection values) {
+ List list = new ArrayList<>();
+ list.addAll(values);
+ batchReplaceInto(list);
+ }
+
+ public static void batchReplaceInto(Object... valueArray) {
+ List values = new ArrayList();
+ if (valueArray != null) {
+ for (Object value : valueArray) {
+ values.add(value);
+ }
+ }
+ batchReplaceInto(values);
+ }
+
+ public static void batchReplaceInto(List<?> values) {
+ batchIntoByFlag(values, 1);
+ }
+
+ /**
+ * 对象批量插入,如果主键冲突会忽略,会生成insert ignore into语句,多个对象会拼接成一个sql,提升效率
+ *
+ * @param values 待插入对象
+ */
+ public static void batchIgnoreInto(List<?> values) {
+ batchIntoByFlag(values, -1);
+ }
+
+ /**
+ * 对象批量插入,如果主键冲突会忽略,会生成insert ignore into语句,多个对象会拼接成一个sql,提升效率
+ *
+ * @param values 待插入对象
+ */
+ public static void batchInsertInto(List<?> values) {
+ batchIntoByFlag(values, 0);
+ }
+
+ /**
+ * 批量插入对象,多个对象会拼接成一个sql flag==1 then replace into flag=-1 then insert ignore int flag=0 then insert int
+ *
+ * @param values
+ * @param flag
+ */
+ protected static void batchIntoByFlag(List<?> values, int flag) {
+ if (CollectionUtil.isEmpty(values)) {
+ return;
+ }
+ Object object = values.get(0);
+ Map<String, Object> paras = new HashMap<>(16);
+ MetaData metaData = createMetaDate(object, paras);
+ boolean containsIdField = false;
+ if (metaData.getIdFieldName() != null) {
+ for (Object o : values) {
+ Object id = ReflectUtil.getDeclaredField(o, metaData.getIdFieldName());
+ if (id == null) {
+ containsIdField = false;
+ break;
+ }
+ if (id instanceof Number) {
+ if (Long.valueOf(id.toString()) == 0) {
+ containsIdField = false;
+ break;
+ }
+ }
+ if (id instanceof String) {
+ String idStr = (String)id;
+ if (StringUtil.isEmpty(idStr)) {
+ containsIdField = false;
+ break;
+ }
+ }
+ }
+ }
+
+ String sql = null;
+ if (flag == 0) {
+ sql = SQLUtil.createInsertSql(metaData, paras, containsIdField);
+ } else if (flag == 1) {
+ sql = SQLUtil.createInsertSql(metaData, paras, containsIdField);
+ } else if (flag == -1) {
+ sql = SQLUtil.createIgnoreInsertSql(metaData, paras, containsIdField);
+ } else {
+ throw new RuntimeException("the flag is not valdate " + flag);
+ }
+
+ List<Map<String, Object>> rows = new ArrayList<>();
+ for (int i = 1; i < values.size(); i++) {
+ Map<String, Object> row = createRow(metaData, values.get(i));
+ rows.add(row);
+ }
+ String valuesSQL = SQLUtil.createInsertValuesSQL(metaData, rows);
+ sql = sql + valuesSQL + " ON DUPLICATE KEY UPDATE " + SQLUtil.createDuplicateKeyUpdateSQL(metaData);
+ ;
+ JDBCDriver dataSource = DriverBuilder.createDriver();
+ try {
+ dataSource.execute(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ } finally {
+ if (dataSource != null) {
+ dataSource.destroy();
+ }
+ }
+ }
+
+ private static Map<String, Object> createRow(MetaData metaData, Object object) {
+ Map<String, Object> row = new HashMap<>();
+ ReflectUtil.scanFields(object, new IFieldProcessor() {
+ @Override
+ public void doProcess(Object o, Field field) {
+ String fieldName = field.getName();
+ String columnName = getColumnNameFromFieldName(fieldName);
+ Object value = ReflectUtil.getBeanFieldValue(o, fieldName);
+ row.put(columnName, value);
+
+ }
+ });
+ return row;
+ }
+
+ public static void replaceInto(Object object) {
+ replaceInto(object, null, null, null);
+ }
+
+ /**
+ * 把一个对象插入到数据库,对象符合插入规范,表名是对象名转小写后加下划线。如果有重复到会被替换成最新的
+ *
+ * @param object
+ * @param url
+ * @param userName
+ * @param password
+ */
+ public static void replaceInto(Object object, String url, String userName, String password) {
+ Map<String, Object> paras = new HashMap<>();
+ if (Entity.class.isInstance(object)) {
+ Entity newEntity = (Entity)object;
+ newEntity.setGmtModified(new Date());
+ if (newEntity.getGmtCreate() == null) {
+ newEntity.setGmtCreate(new Date());
+ }
+ }
+ MetaData metaData = createMetaDate(object, paras);
+ String sql = SQLUtil.createReplacesInsertSql(metaData, paras, metaData.getIdFieldName() != null);
+ JDBCDriver dataSource = null;
+ try {
+ if (StringUtil.isEmpty(url) || StringUtil.isEmpty(userName)) {
+ dataSource = DriverBuilder.createDriver();
+ } else {
+ dataSource = DriverBuilder.createDriver(DriverBuilder.DEFALUT_JDBC_DRIVER, url, userName, password);
+ }
+ long id = dataSource.executeInsert(sql);
+ if (Entity.class.isInstance(object)) {
+ Entity newEntity = (Entity)object;
+ newEntity.setId(id);
+ }
+ } catch (Exception e) {
+ String errorMsg = ("replace into error ,the builder is " + sql + ". the error msg is " + e.getMessage());
+ LOG.error(errorMsg);
+ throw new RuntimeException(errorMsg, e);
+ } finally {
+ if (dataSource != null) {
+ dataSource.destroy();
+ }
+ }
+ }
+
+ public static void insertInto(Object object, boolean ignoreRepeateRow) {
+ insertInto(object, ignoreRepeateRow, null, null, null);
+ }
+
+ /**
+ * 把一个对象插入到数据库,对象符合插入规范,表名是对象名转小写后加下划线
+ *
+ * @param object
+ * @param ignoreRepeateRow,如果是重复数据,则不插入。基于唯一建做判断
+ * @param url
+ * @param userName
+ * @param password
+ */
+ public static void insertInto(Object object, boolean ignoreRepeateRow, String url, String userName,
+ String password) {
+ Map<String, Object> paras = new HashMap<>();
+ if (Entity.class.isInstance(object)) {
+ Entity newEntity = (Entity)object;
+ newEntity.setGmtCreate(DateUtil.getCurrentTime());
+ newEntity.setGmtModified(DateUtil.getCurrentTime());
+ }
+ MetaData metaData = createMetaDate(object, paras);
+ String sql = null;
+ if (ignoreRepeateRow) {
+ sql = SQLUtil.createIgnoreInsertSql(metaData, paras, metaData.getIdFieldName() != null);
+ } else {
+ sql = SQLUtil.createInsertSql(metaData, paras, metaData.getIdFieldName() != null);
+ }
+ JDBCDriver dataSource = null;
+ try {
+ if (StringUtil.isEmpty(url) || StringUtil.isEmpty(userName)) {
+ dataSource = DriverBuilder.createDriver();
+ } else {
+ dataSource = DriverBuilder.createDriver(DriverBuilder.DEFALUT_JDBC_DRIVER, url, userName, password);
+ }
+ long id = dataSource.executeInsert(sql);
+ if (Entity.class.isInstance(object)) {
+ Entity newEntity = (Entity)object;
+ newEntity.setId(id);
+ }
+ } catch (Exception e) {
+ String errorMsg = ("insert into error ,the builder is " + sql + ". the error msg is " + e.getMessage());
+ LOG.error(errorMsg);
+ throw new RuntimeException(errorMsg, e);
+ } finally {
+ if (dataSource != null) {
+ dataSource.destroy();
+ }
+ }
+ }
+
+ /**
+ * 创建meta信息
+ *
+ * @param object
+ * @param paras
+ * @return
+ */
+ public static MetaData createMetaDate(Object object, Map<String, Object> paras) {
+ MetaData metaData = MetaData.createMetaDate(object, paras);
+ return metaData;
+ }
+
+ public static String getTableName(Class clazz) {
+ return getColumnNameFromFieldName(clazz.getSimpleName());
+ }
+
+ /**
+ * 把驼峰转换成下划线的形式
+ *
+ * @param para
+ * @return
+ */
+ protected static String getColumnNameFromFieldName(String para) {
+ return MetaData.getColumnNameFromFieldName(para);
+ }
+
+}
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/operator/SQLOperator.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/operator/SQLOperator.java
new file mode 100644
index 0000000..1f42470
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/operator/SQLOperator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.operator;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.db.driver.JDBCDriver;
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+import org.apache.rocketmq.streams.common.configurable.annotation.Changeable;
+import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
+import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
+import org.apache.rocketmq.streams.common.context.AbstractContext;
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.topology.ChainStage;
+import org.apache.rocketmq.streams.common.topology.builder.IStageBuilder;
+import org.apache.rocketmq.streams.common.topology.stages.NewSQLChainStage;
+import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
+import org.apache.rocketmq.streams.common.utils.SQLUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.rocketmq.streams.db.driver.DriverBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * sql算法,执行一个sql,sql中可以有变量,会用message的值做替换。
+ */
+public class SQLOperator extends BasedConfigurable implements IStreamOperator<IMessage, IMessage>, IStageBuilder<ChainStage> {
+ private static final Log LOG = LogFactory.getLog(SQLOperator.class);
+ public static final String DEFALUT_DATA_KEY = "data";
+
+ @ENVDependence
+ protected String jdbcDriver = AbstractComponent.DEFAULT_JDBC_DRIVER;
+ @ENVDependence
+ protected String url;
+ @ENVDependence
+ protected String userName;
+ @ENVDependence
+ protected String password;
+
+ @Changeable
+ protected String sql;//查询的sql,支持ibatis的语法和变量.因为会被替换,所以不自动感知。select * from table where name=#{name=chris}
+
+ public SQLOperator() {
+ setType(IStreamOperator.TYPE);
+ }
+
+ public SQLOperator(String sql, String url, String userName, String password) {
+ this();
+ this.sql = sql;
+ this.url = url;
+ this.password = password;
+ this.userName = userName;
+ }
+
+ /**
+ * db串多数是名字,可以取个名字前缀,如果值为空,默认为此类的name,name为空,默认为简单类名
+ *
+ * @param sql
+ * @param dbInfoNamePrex
+ */
+ public SQLOperator(String sql, String dbInfoNamePrex) {
+ this();
+ if (StringUtil.isEmpty(dbInfoNamePrex)) {
+ dbInfoNamePrex = getConfigureName();
+ }
+ if (StringUtil.isEmpty(dbInfoNamePrex)) {
+ dbInfoNamePrex = this.getClass().getSimpleName();
+ }
+ this.sql = sql;
+ this.url = dbInfoNamePrex + ".url";
+ this.password = dbInfoNamePrex + ".password";
+ ;
+ this.userName = dbInfoNamePrex + ".userName";
+ }
+
+ @Override
+ public IMessage doMessage(IMessage message, AbstractContext context) {
+ String querySQL = SQLUtil.parseIbatisSQL(message.getMessageBody(), sql);
+ List<Map<String, Object>> result = query(querySQL);
+ message.getMessageBody().put(DEFALUT_DATA_KEY, result);
+ return message;
+ }
+
+ /**
+ * 查询数据库数据
+ *
+ * @return
+ */
+ protected List<Map<String, Object>> query(String querySQL) {
+
+ JDBCDriver dataSource = null;
+ try {
+ dataSource = createDBDataSource();
+ List<Map<String, Object>> result = null;
+ result = dataSource.queryForList(sql);
+
+ return result;
+ } finally {
+ if (dataSource != null) {
+ dataSource.destroy();
+ }
+ }
+
+ }
+
+ public JDBCDriver createDBDataSource() {
+ return DriverBuilder.createDriver(jdbcDriver, url, userName, password);
+ }
+
+ @Override
+ public void addConfigurables(PipelineBuilder pipelineBuilder) {
+ pipelineBuilder.addConfigurables(this);
+ }
+
+ @Override
+ public ChainStage createStageChain(PipelineBuilder pipelineBuilder) {
+ NewSQLChainStage sqlChainStage = new NewSQLChainStage();
+ sqlChainStage.setMessageProcessor(this);
+ return sqlChainStage;
+ }
+
+ public String getSql() {
+ return sql;
+ }
+
+ public void setSql(String sql) {
+ this.sql = sql;
+ }
+
+ public String getJdbcDriver() {
+ return jdbcDriver;
+ }
+
+ public void setJdbcDriver(String jdbcDriver) {
+ this.jdbcDriver = jdbcDriver;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+}
diff --git a/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/Person.java b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/Person.java
new file mode 100644
index 0000000..3d5e51b
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/Person.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
+
+public class Person extends BasedConfigurable {
+ @ENVDependence
+ private String name;
+ private int age;
+ private Boolean isMale;
+ private List<String> addresses;
+ private Map<String, Integer> childName2Age;
+
+ public static Person createPerson(String namespace) {
+ Person person = new Person();
+ person.setNameSpace(namespace);
+ person.setType("person");
+ person.setConfigureName("Chris");
+ person.setName("Chris");
+ List<String> addresses = new ArrayList<>();
+ addresses.add("huilongguan");
+ addresses.add("shangdi");
+ person.setAddresses(addresses);
+ Map<String, Integer> childName2Age = new HashMap<>();
+ childName2Age.put("yuanyahan", 8);
+ childName2Age.put("yuanruxi", 4);
+ person.setChildName2Age(childName2Age);
+ person.setMale(true);
+ person.setAge(18);
+ return person;
+ }
+
+ @Override
+ public String toString() {
+ return "Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses
+ + ", childName2Age=" + childName2Age + '}';
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+
+ public Boolean getMale() {
+ return isMale;
+ }
+
+ public void setMale(Boolean male) {
+ isMale = male;
+ }
+
+ public List<String> getAddresses() {
+ return addresses;
+ }
+
+ public void setAddresses(List<String> addresses) {
+ this.addresses = addresses;
+ }
+
+ public Map<String, Integer> getChildName2Age() {
+ return childName2Age;
+ }
+
+ public void setChildName2Age(Map<String, Integer> childName2Age) {
+ this.childName2Age = childName2Age;
+ }
+
+ @Override
+ public Object clone() {
+ Person person = null;
+ try {
+ person = (Person)super.clone();
+ } catch (CloneNotSupportedException e) {
+ System.out.println("clone error " + e);
+ }
+ return person;
+ }
+}
diff --git a/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureServiceTest.java b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureServiceTest.java
new file mode 100644
index 0000000..3baa65d
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureServiceTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.configuable;
+
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.configurable.ConfigurableComponent;
+import org.apache.rocketmq.streams.configurable.model.Configure;
+import org.apache.rocketmq.streams.db.Person;
+import org.apache.rocketmq.streams.db.driver.DriverBuilder;
+import org.junit.Test;
+
+import static junit.framework.TestCase.assertTrue;
+
+/**
+ * 数据库的存储,需要配置存储的连接参数,请先完成配置,后执行单元用例 如果未建表,可以通过Configure.createTableSQL() 获取建表语句,创建表后,测试
+ */
+public class DBSupportParentConfigureServiceTest {
+ private String URL = "";
+ protected String USER_NAME = "";
+ protected String PASSWORD = "";
+ protected String TABLE_NAME = "dipper_configure_source";
+
+ @Test
+ public void testDBConfigurableService() {
+ String namespace = "streams.db.configuable";
+
+ //正式使用时,在配置文件配置
+ ComponentCreator.getProperties().put(ConfigureFileKey.CONNECT_TYPE, "DB");
+ ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_URL, URL);//数据库连接url
+ ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_USERNAME, USER_NAME);//用户名
+ ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_PASSWORD, PASSWORD);//password
+ ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_TABLE_NAME, TABLE_NAME);
+
+ //如果表不存在,创建表
+ String sql = (Configure.createTableSQL(TABLE_NAME));
+ DriverBuilder.createDriver().execute(sql);
+ ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance(namespace);
+ configurableComponent.insert(createPerson(namespace));
+ configurableComponent.refreshConfigurable(namespace);
+ Person person = configurableComponent.queryConfigurable("person", "peronName");
+ assertTrue(person != null);
+ }
+
+ /**
+ * 创建configuable对象
+ *
+ * @param namespace
+ * @return
+ */
+ protected Person createPerson(String namespace) {
+ Person person = new Person();
+ person.setName("chris");
+ person.setAge(18);
+ person.setNameSpace(namespace);
+ person.setConfigureName("peronName");
+ person.setType("person");
+ return person;
+ }
+}
diff --git a/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtilTest.java b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtilTest.java
new file mode 100644
index 0000000..e9b1fa2
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtilTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.driver.orm;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.db.Person;
+import org.junit.Test;
+
+public class ORMUtilTest {
+ private String URL = "";
+ protected String USER_NAME = "";
+ protected String PASSWORD = "";
+
+ public ORMUtilTest() {
+ //正式使用时,在配置文件配置
+ ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_URL, URL);//数据库连接url
+ ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_USERNAME, USER_NAME);//用户名
+ ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_PASSWORD, PASSWORD);//password
+ }
+
+ @Test
+ public void testInsert() {
+ String namespace = "org.apache.configuable.test";
+ List<Person> personList = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ personList.add(createPerson(namespace, "chris" + i));
+ }
+ /**
+ * 不带数据库连接信息(url,userName,Password),使用ConfiguableComponet的连接信息
+ */
+ ORMUtil.batchIgnoreInto(personList);//批量插入,如果有唯一键冲突,替换
+ ORMUtil.batchIgnoreInto(personList);//批量插入,如果有唯一键冲突,忽略
+ ORMUtil.batchIntoByFlag(personList, 0);////批量插入,如果有唯一键冲突,跑错
+ }
+
+ @Test
+ public void testQueryList() {
+ Map<String, Integer> paras = new HashMap<>();
+ paras.put("age", 18);
+ List<Person> personList = ORMUtil.queryForList("select * from person where age >${age} limit 100", paras, Person.class);
+ }
+
+ @Test
+ public void testQueryOneRow() {
+ Person personPara = new Person();
+ personPara.setAge(18);
+ personPara.setName("chris1");
+ Person person = ORMUtil.queryForObject("select * from person where age =${age} and name='${name}' ", personPara, Person.class, URL, USER_NAME, PASSWORD);
+ }
+
+ /**
+ * 创建configuable对象
+ *
+ * @param namespace
+ * @return
+ */
+ protected Person createPerson(String namespace, String name) {
+ Person person = new Person();
+ person.setName(name);
+ person.setAge(18);
+ person.setNameSpace(namespace);
+ person.setConfigureName("peronName");
+ person.setType("person");
+ return person;
+ }
+}
diff --git a/rocketmq-streams-transport-minio/pom.xml b/rocketmq-streams-transport-minio/pom.xml
new file mode 100755
index 0000000..510b8cc
--- /dev/null
+++ b/rocketmq-streams-transport-minio/pom.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="utf-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>rocketmq-streams-transport-minio</artifactId>
+ <packaging>jar</packaging>
+ <name>ROCKETMQ STREAMS :: transport-minio</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-serviceloader</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.minio</groupId>
+ <artifactId>minio</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/rocketmq-streams-transport-minio/rocketmq-streams-transport-minio.iml b/rocketmq-streams-transport-minio/rocketmq-streams-transport-minio.iml
new file mode 100644
index 0000000..af11f77
--- /dev/null
+++ b/rocketmq-streams-transport-minio/rocketmq-streams-transport-minio.iml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
+ <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5">
+ <output url="file://$MODULE_DIR$/${project.build.directory}/classes" />
+ <output-test url="file://$MODULE_DIR$/${project.build.directory}/test-classes" />
+ <content url="file://$MODULE_DIR$">
+ <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
+ <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
+ <sourceFolder url="file://$MODULE_DIR$/src/test/resources" type="java-test-resource" />
+ <excludeFolder url="file://$MODULE_DIR$/${project.build.directory}/classes" />
+ <excludeFolder url="file://$MODULE_DIR$/${project.build.directory}/test-classes" />
+ <excludeFolder url="file://$MODULE_DIR$/target" />
+ </content>
+ <orderEntry type="inheritedJdk" />
+ <orderEntry type="sourceFolder" forTests="false" />
+ </component>
+</module>
\ No newline at end of file
diff --git a/rocketmq-streams-transport-minio/src/main/java/org/apache/rocketmq/streams/transport/minio/MinioFileTransport.java b/rocketmq-streams-transport-minio/src/main/java/org/apache/rocketmq/streams/transport/minio/MinioFileTransport.java
new file mode 100644
index 0000000..c2a6884
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/main/java/org/apache/rocketmq/streams/transport/minio/MinioFileTransport.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.transport.minio;
+
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.common.model.ServiceName;
+import org.apache.rocketmq.streams.common.transport.AbstractFileTransport;
+import org.apache.rocketmq.streams.common.transport.IFileTransport;
+import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import com.google.auto.service.AutoService;
+import io.minio.MinioClient;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+@AutoService(IFileTransport.class)
+@ServiceName(MinioFileTransport.NAME)
+public class MinioFileTransport extends AbstractFileTransport {
+ public static final String NAME = "minio";
+ protected String ak;
+ protected String sk;
+ protected String endpoint;
+ protected String dirpperDir;
+ protected MinioClient minioClient;
+
+ public MinioFileTransport() {
+ this.ak = ComponentCreator.getProperties().getProperty(ConfigureFileKey.FILE_TRANSPORT_AK);
+ this.sk = ComponentCreator.getProperties().getProperty(ConfigureFileKey.FILE_TRANSPORT_SK);
+ this.endpoint = ComponentCreator.getProperties().getProperty(ConfigureFileKey.FILE_TRANSPORT_ENDPOINT);
+ this.dirpperDir = ComponentCreator.getProperties().getProperty(ConfigureFileKey.FILE_TRANSPORT_DIPPER_DIR);
+ if (StringUtil.isEmpty(this.dirpperDir)) {
+ this.dirpperDir = "dipper_files";
+ }
+ }
+
+ @Override
+ public File download(String remoteFileName, String localDir, String localFileName) {
+ MinioClient minioClient = getOrCreateMinioClient();
+ BufferedWriter bw = null;
+ BufferedReader br = null;
+ try {
+ InputStream input = minioClient.getObject(dirpperDir, remoteFileName);
+ File file = new File(FileUtil.concatFilePath(localDir, localFileName));
+ bw = new BufferedWriter(new FileWriter(file));
+ br = new BufferedReader(new InputStreamReader(input));
+ String line = br.readLine();
+ while (line != null) {
+ bw.write(line);
+ line = br.readLine();
+ }
+ bw.flush();
+ return file;
+ } catch (Exception e) {
+ throw new RuntimeException("dowload file error " + dirpperDir + "/" + remoteFileName, e);
+
+ } finally {
+ if (bw != null) {
+ try {
+ bw.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ if (br != null) {
+ try {
+ br.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public Boolean upload(File file, String remoteFileName) {
+ MinioClient minioClient = getOrCreateMinioClient();
+ try {
+ minioClient.putObject(dirpperDir, remoteFileName, file.getAbsolutePath());
+ } catch (Exception e) {
+ throw new RuntimeException("upload file error " + dirpperDir + "/" + remoteFileName, e);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean delete(String remoteFileName) {
+ MinioClient minioClient = getOrCreateMinioClient();
+ try {
+ minioClient.removeObject(dirpperDir, remoteFileName);
+ } catch (Exception e) {
+ throw new RuntimeException("delete file error " + dirpperDir + "/" + remoteFileName, e);
+ }
+ return true;
+ }
+
+ protected MinioClient getOrCreateMinioClient() {
+ if (this.minioClient == null) {
+ synchronized (this) {
+ if (minioClient == null) {
+ try {
+ MinioClient minioClient = new MinioClient(endpoint, ak, sk);
+ boolean existDir = minioClient.bucketExists(dirpperDir);
+
+ if (!existDir) {
+ minioClient.makeBucket(dirpperDir);
+ }
+ this.minioClient = minioClient;
+ } catch (Exception e) {
+ throw new RuntimeException("create minio client error", e);
+ }
+
+ }
+ }
+ }
+ return this.minioClient;
+ }
+}
+
+
diff --git a/rocketmq-streams-transport-minio/src/test/java/com/aliyun/yundun/dipper/configurable/DataTpyeTest.java b/rocketmq-streams-transport-minio/src/test/java/com/aliyun/yundun/dipper/configurable/DataTpyeTest.java
new file mode 100644
index 0000000..a3ebf6b
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/test/java/com/aliyun/yundun/dipper/configurable/DataTpyeTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.aliyun.yundun.dipper.configurable;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.configurable.model.Person;
+
+public class DataTpyeTest {
+ @Test
+ public void testDataType() {
+ Person person = Person.createPerson("com.dipper.test");
+ String jsonValue = person.toJson();
+
+ Person person1 = new Person();
+ person1.toObject(jsonValue);
+ System.out.println(person1);
+ }
+
+ @Test
+ public void testV2() {
+ Set<String> set = new HashSet<>();
+ set.add("北斗");
+ set.add("福建jz");
+ set.add("甘肃jz");
+ set.add("广东省气象micaps云");
+ set.add("贵州公安科信");
+ set.add("贵州警务云");
+ set.add("杭州税友");
+ set.add("江西公安大数据平台");
+ set.add("昆仑项目");
+ set.add("新华网");
+ set.add("浙江气象高时空分辨率气象预报专有云");
+ List<String> v2 = FileUtil.loadFileLine("/Users/yuanxiaodong/Documents/workdir/项目名称.txt");
+ List<String> zyy = FileUtil.loadFileLine("/Users/yuanxiaodong/Documents/workdir/专有云.txt");
+ int count = 0;
+ for (String v2Line : v2) {
+ boolean match = false;
+ for (String zyyLine : zyy) {
+ if (zyyLine.indexOf(v2Line) != -1 || v2Line.indexOf(zyyLine) != -1) {
+ match = true;
+ count++;
+ }
+ }
+ if (match == false) {
+ System.out.println(v2Line);
+ }
+ }
+ System.out.println(count);
+ }
+}
diff --git a/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/DataTpyeTest.java b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/DataTpyeTest.java
new file mode 100644
index 0000000..a1570c3
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/DataTpyeTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.model;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.junit.Test;
+
+public class DataTpyeTest {
+ @Test
+ public void testDataType() {
+ Person person = Person.createPerson("com.dipper.test");
+ String jsonValue = person.toJson();
+
+ Person person1 = new Person();
+ person1.toObject(jsonValue);
+ System.out.println(person1);
+ }
+
+ @Test
+ public void testV2() {
+ Set<String> set = new HashSet<>();
+ set.add("北斗");
+ set.add("福建jz");
+ set.add("甘肃jz");
+ set.add("广东省气象micaps云");
+ set.add("贵州公安科信");
+ set.add("贵州警务云");
+ set.add("杭州税友");
+ set.add("江西公安大数据平台");
+ set.add("昆仑项目");
+ set.add("新华网");
+ set.add("浙江气象高时空分辨率气象预报专有云");
+ List<String> v2 = FileUtil.loadFileLine("/Users/yuanxiaodong/Documents/workdir/项目名称.txt");
+ List<String> zyy = FileUtil.loadFileLine("/Users/yuanxiaodong/Documents/workdir/专有云.txt");
+ int count = 0;
+ for (String v2Line : v2) {
+ boolean match = false;
+ for (String zyyLine : zyy) {
+ if (zyyLine.indexOf(v2Line) != -1 || v2Line.indexOf(zyyLine) != -1) {
+ match = true;
+ count++;
+ }
+ }
+ if (match == false) {
+ System.out.println(v2Line);
+ }
+ }
+ System.out.println(count);
+ }
+}
diff --git a/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/Person.java b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/Person.java
new file mode 100644
index 0000000..04b99bb
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/Person.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.model;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+
+public class Person extends BasedConfigurable {
+ private String name;
+ private int age;
+ private Boolean isMale;
+ private List<String> addresses;
+ private Map<String, Integer> childName2Age;
+
+ public static Person createPerson(String namespace) {
+ Person person = new Person();
+ person.setNameSpace(namespace);
+ person.setType("person");
+ person.setConfigureName("Chris");
+ person.setName("Chris");
+ List<String> addresses = new ArrayList<>();
+ addresses.add("huilongguan");
+ addresses.add("shangdi");
+ person.setAddresses(addresses);
+ Map<String, Integer> childName2Age = new HashMap<>();
+ childName2Age.put("yuanyahan", 8);
+ childName2Age.put("yuanruxi", 4);
+ person.setChildName2Age(childName2Age);
+ person.setMale(true);
+ person.setAge(18);
+ return person;
+ }
+
+ @Override
+ public String toString() {
+ return "Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses
+ + ", childName2Age=" + childName2Age + '}';
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+
+ public Boolean getMale() {
+ return isMale;
+ }
+
+ public void setMale(Boolean male) {
+ isMale = male;
+ }
+
+ public List<String> getAddresses() {
+ return addresses;
+ }
+
+ public void setAddresses(List<String> addresses) {
+ this.addresses = addresses;
+ }
+
+ public Map<String, Integer> getChildName2Age() {
+ return childName2Age;
+ }
+
+ public void setChildName2Age(Map<String, Integer> childName2Age) {
+ this.childName2Age = childName2Age;
+ }
+}
diff --git a/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configurable/model/Person.java b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configurable/model/Person.java
new file mode 100644
index 0000000..709978a
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configurable/model/Person.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.model;
+
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Person extends BasedConfigurable {
+ private String name;
+ private int age;
+ private Boolean isMale;
+ private List<String> addresses;
+ private Map<String, Integer> childName2Age;
+
+ public static Person createPerson(String namespace) {
+ Person person = new Person();
+ person.setNameSpace(namespace);
+ person.setType("person");
+ person.setConfigureName("Chris");
+ person.setName("Chris");
+ List<String> addresses = new ArrayList<>();
+ addresses.add("huilongguan");
+ addresses.add("shangdi");
+ person.setAddresses(addresses);
+ Map<String, Integer> childName2Age = new HashMap<>();
+ childName2Age.put("yuanyahan", 8);
+ childName2Age.put("yuanruxi", 4);
+ person.setChildName2Age(childName2Age);
+ person.setMale(true);
+ person.setAge(18);
+ return person;
+ }
+
+ @Override
+ public String toString() {
+ return "Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses
+ + ", childName2Age=" + childName2Age + '}';
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+
+ public Boolean getMale() {
+ return isMale;
+ }
+
+ public void setMale(Boolean male) {
+ isMale = male;
+ }
+
+ public List<String> getAddresses() {
+ return addresses;
+ }
+
+ public void setAddresses(List<String> addresses) {
+ this.addresses = addresses;
+ }
+
+ public Map<String, Integer> getChildName2Age() {
+ return childName2Age;
+ }
+
+ public void setChildName2Age(Map<String, Integer> childName2Age) {
+ this.childName2Age = childName2Age;
+ }
+}
diff --git a/rocketmq-streams-transport-minio/src/test/resources/component/ConfigurableComponent.properties b/rocketmq-streams-transport-minio/src/test/resources/component/ConfigurableComponent.properties
new file mode 100644
index 0000000..598511e
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/test/resources/component/ConfigurableComponent.properties
@@ -0,0 +1,7 @@
+dipper.configurable.service.type=resource_support_parent
+dipper.channle.ak=xxxxxx
+dipper.channle.sk=xxxxxx
+dipper.rds.jdbc.driver=com.mysql.jdbc.Driver
+dipper.rds.jdbc.url=xxxxxxx
+dipper.rds.jdbc.username=xxxxxx
+dipper.rds.jdbc.password=xxxxx
\ No newline at end of file
diff --git a/rocketmq-streams-transport-minio/src/test/resources/log4j.xml b/rocketmq-streams-transport-minio/src/test/resources/log4j.xml
new file mode 100755
index 0000000..7812fe7
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/test/resources/log4j.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "http://toolkit.alibaba-inc.com/dtd/log4j/log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <appender name="Console" class="org.apache.log4j.ConsoleAppender">
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d{ISO8601} %l [%t] %-5p - %m%n%n"/>
+ </layout>
+ <filter class="org.apache.log4j.varia.LevelRangeFilter">
+ <param name="LevelMin" value="INFO"/>
+ <param name="LevelMax" value="ERROR"/>
+ </filter>
+ </appender>
+
+ <root>
+ <priority value="INFO"/>
+ <appender-ref ref="Console"/>
+ </root>
+
+</log4j:configuration>
\ No newline at end of file
diff --git a/rocketmq-streams-transport-minio/src/test/resources/pro-function.txt b/rocketmq-streams-transport-minio/src/test/resources/pro-function.txt
new file mode 100644
index 0000000..34a186f
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/test/resources/pro-function.txt
@@ -0,0 +1,11 @@
+paserBySplit(@,uuid,file_path,pid,ppid,pfile_path,group_name,group_id,user_name,uid,euid,egroup_id,time,cmd_line,index,perm,tty,pcmdline,sid,cwd,filename);
+addRandom(messageId,10);
+rename(groupname,group_name);
+rename(username,user_name);
+rename(seq,index);
+rename(egourpid,egroup_id);
+rename(filepath,file_path);
+rename(groupid,group_id);
+rename(pfilename,pfile_name);
+rename(safe_mode,perm);
+rename(cmdline,cmd_line);
\ No newline at end of file
diff --git a/rocketmq-streams-transport-minio/src/test/resources/python_script.py b/rocketmq-streams-transport-minio/src/test/resources/python_script.py
new file mode 100644
index 0000000..f4e7252
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/test/resources/python_script.py
@@ -0,0 +1,22 @@
+#!/usr/bin/env python
+# coding=utf-8
+import json;
+import re;
+import time;
+regex = '^/(.*)/\w+'
+pattern = re.compile(regex)
+
+def pythonTest(*processLine):
+ try:
+ jsonObject = json.loads(processLine[0])
+
+ if jsonObject.has_key('filepath'):
+ filePath = jsonObject['filepath']
+ match = pattern.search(filePath)
+ if match:
+ return match.group(1)
+ else:
+ pass # print "does not has key filepath"
+ except BaseException as e:
+ pass # print "process one line cause exception %s" %e
+ return "does not match"