You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/04 11:16:06 UTC

[rocketmq-streams] 13/42: add module db-operator、transport-minio

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git

commit 9c898ecf0c5301f1ae1d0c144f15f96d2151ba96
Author: muyang <li...@alibaba-inc.com>
AuthorDate: Mon Aug 2 12:21:28 2021 +0800

    add module db-operator、transport-minio
---
 rocketmq-streams-db-operator/pom.xml               |  34 ++
 .../rocketmq-streams-db-operator.iml               |  16 +
 .../streams/db/configuable/DBConfigureService.java | 282 ++++++++++++
 .../DBSupportParentConfigureService.java           |  37 ++
 .../rocketmq/streams/db/driver/DriverBuilder.java  | 111 +++++
 .../rocketmq/streams/db/driver/IDriverBudiler.java |  36 ++
 .../rocketmq/streams/db/driver/JDBCDriver.java     | 277 ++++++++++++
 .../db/driver/batchloader/BatchRowLoader.java      | 179 ++++++++
 .../db/driver/batchloader/IRowOperator.java        |  33 ++
 .../rocketmq/streams/db/driver/orm/ORMUtil.java    | 490 +++++++++++++++++++++
 .../rocketmq/streams/db/operator/SQLOperator.java  | 178 ++++++++
 .../org/apache/rocketmq/streams/db/Person.java     | 110 +++++
 .../DBSupportParentConfigureServiceTest.java       |  74 ++++
 .../streams/db/driver/orm/ORMUtilTest.java         |  86 ++++
 rocketmq-streams-transport-minio/pom.xml           |  25 ++
 .../rocketmq-streams-transport-minio.iml           |  17 +
 .../transport/minio/MinioFileTransport.java        | 141 ++++++
 .../yundun/dipper/configurable/DataTpyeTest.java   |  70 +++
 .../streams/configuable/model/DataTpyeTest.java    |  68 +++
 .../rocketmq/streams/configuable/model/Person.java |  97 ++++
 .../streams/configurable/model/Person.java         |  97 ++++
 .../component/ConfigurableComponent.properties     |   7 +
 .../src/test/resources/log4j.xml                   |  20 +
 .../src/test/resources/pro-function.txt            |  11 +
 .../src/test/resources/python_script.py            |  22 +
 25 files changed, 2518 insertions(+)

diff --git a/rocketmq-streams-db-operator/pom.xml b/rocketmq-streams-db-operator/pom.xml
new file mode 100755
index 0000000..9a9b17b
--- /dev/null
+++ b/rocketmq-streams-db-operator/pom.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="utf-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.rocketmq</groupId>
+        <artifactId>rocketmq-streams</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>rocketmq-streams-db-operator</artifactId>
+    <name>ROCKETMQ STREAMS :: db-operator</name>
+    <packaging>jar</packaging>
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-streams-configurable</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-jdbc</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+        </dependency>
+
+
+    </dependencies>
+</project>
diff --git a/rocketmq-streams-db-operator/rocketmq-streams-db-operator.iml b/rocketmq-streams-db-operator/rocketmq-streams-db-operator.iml
new file mode 100644
index 0000000..38ffb14
--- /dev/null
+++ b/rocketmq-streams-db-operator/rocketmq-streams-db-operator.iml
@@ -0,0 +1,16 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
+  <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5">
+    <output url="file://$MODULE_DIR$/${project.build.directory}/classes" />
+    <output-test url="file://$MODULE_DIR$/${project.build.directory}/test-classes" />
+    <content url="file://$MODULE_DIR$">
+      <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
+      <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
+      <excludeFolder url="file://$MODULE_DIR$/${project.build.directory}/classes" />
+      <excludeFolder url="file://$MODULE_DIR$/${project.build.directory}/test-classes" />
+      <excludeFolder url="file://$MODULE_DIR$/target" />
+    </content>
+    <orderEntry type="inheritedJdk" />
+    <orderEntry type="sourceFolder" forTests="false" />
+  </component>
+</module>
\ No newline at end of file
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java
new file mode 100644
index 0000000..ef319eb
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.configuable;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.configurable.service.AbstractConfigurableService;
+import org.apache.rocketmq.streams.configurable.model.Configure;
+
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.common.interfaces.IPropertyEnable;
+import org.apache.rocketmq.streams.common.utils.AESUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.SQLUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.db.driver.JDBCDriver;
+import org.apache.rocketmq.streams.db.driver.DriverBuilder;
+
+import java.util.*;
+
+/**
+ * Configuable对象存储在db中,是生成环境常用的一种模式 数据库参数可以配置在配置文件中,ConfiguableComponent在启动时,会把参数封装在Properties中,调用DBConfigureService(Properties properties) 构造方法完成实例创建
+ */
+
+public class DBConfigureService extends AbstractConfigurableService implements IPropertyEnable {
+
+    private static final Log LOG = LogFactory.getLog(DBConfigureService.class);
+    private String jdbcdriver;
+    private String url;
+    private String userName;
+    private String password;
+    private String tableName = "dipper_configure";
+    @Deprecated
+    private boolean isCompatibilityOldRuleEngine = false;//兼容老规则引擎使用,正常场景不需要理会
+
+    public DBConfigureService(String jdbcdriver, String url, String userName, String password) {
+        this(jdbcdriver, url, userName, password, null);
+    }
+
+    public DBConfigureService(String jdbcdriver, String url, String userName, String password, String tableName) {
+        this.url = url;
+        this.jdbcdriver = jdbcdriver;
+        this.userName = userName;
+        this.password = password;
+        this.tableName = tableName;
+        LOG.info("DBConfigureService resource ,the info is: driver:" + this.jdbcdriver + ",url:" + this.url
+            + ",username:" + userName + ",password:" + password);
+        regJdbcDriver(jdbcdriver);
+    }
+
+    public DBConfigureService() {
+    }
+
+    /**
+     * @param properties
+     */
+    public DBConfigureService(Properties properties) {
+        super(properties);
+        initProperty(properties);
+    }
+
+    @Override
+    protected GetConfigureResult loadConfigurable(String namespace) {
+        GetConfigureResult result = new GetConfigureResult();
+        try {
+            List<Configure> configures = selectOpening(namespace);
+            List<IConfigurable> configurables = convert(configures);
+            result.setConfigurables(configurables);
+            result.setQuerySuccess(true);// 该字段标示查询是否成功,若不成功则不会更新配置
+        } catch (Exception e) {
+            result.setQuerySuccess(false);
+            LOG.error("load configurable error ", e);
+        }
+        return result;
+    }
+
+    protected List<Configure> selectOpening(String namespace) {
+        return queryConfigureByNamespace(namespace);
+    }
+
+    protected List<Configure> queryConfigureByNamespace(String... namespaces) {
+        return queryConfigureByNamespaceInner(null, namespaces);
+    }
+
+    protected List<Configure> queryConfigureByNamespaceInner(String type, String... namespaces) {
+        JDBCDriver resource = createResouce();
+        try {
+            String namespace = "namespace";
+            if (isCompatibilityOldRuleEngine && AbstractComponent.JDBC_COMPATIBILITY_RULEENGINE_TABLE_NAME.equals(tableName)) {
+                namespace = "name_space";
+            }
+            String sql = "SELECT * FROM `" + tableName + "` WHERE " + namespace + " in (" + SQLUtil.createInSql(namespaces) + ") and status =1";
+            if (StringUtil.isNotEmpty(type)) {
+                sql = sql + " and type='" + type + "'";
+            }
+            JSONObject jsonObject = new JSONObject();
+            jsonObject.put("namespace", MapKeyUtil.createKeyBySign(",", namespaces));
+            sql = SQLUtil.parseIbatisSQL(jsonObject, sql);
+            // String builder = "SELECT * FROM `" + tableName + "` WHERE namespace ='" + namespace + "' and status =1";
+            List<Map<String, Object>> result = resource.queryForList(sql);
+            if (result == null) {
+                return new ArrayList<Configure>();
+            }
+            // LOG.info("load configurable's count is " + result.size());
+            return convert2Configure(result);
+        } finally {
+            if (resource != null) {
+                resource.destroy();
+            }
+        }
+    }
+
+    @Override
+    public List<IConfigurable> queryConfiguableByNamespace(String... namespaces) {
+        List<Configure> configures = queryConfigureByNamespace(namespaces);
+        List<IConfigurable> configurables = convert(configures);
+        return configurables;
+    }
+
+    public static void main(String[] args) {
+        String[] namespaces = new String[] {"rule1", null};
+        String sql = "SELECT * FROM `dipper_configure` WHERE namespace in (" + SQLUtil.createInSql(namespaces) + ") and status =1";
+        JSONObject jsonObject = new JSONObject();
+        jsonObject.put("namespace", MapKeyUtil.createKeyBySign(",", namespaces));
+        sql = SQLUtil.parseIbatisSQL(jsonObject, sql);
+        System.out.println(sql);
+    }
+
+    protected void saveOrUpdate(IConfigurable configure) {
+        JDBCDriver jdbcDataSource = createResouce();
+        String sql = AbstractConfigurable.createSQL(configure, this.tableName);
+        try {
+            jdbcDataSource.executeInsert(sql);
+        } catch (Exception e) {
+            LOG.error("DBConfigureService saveOrUpdate error,sqlnode:" + sql);
+            throw new RuntimeException(e);
+        } finally {
+            if (jdbcDataSource != null) {
+                jdbcDataSource.destroy();
+            }
+        }
+    }
+
+    protected List<Configure> convert2Configure(List<Map<String, Object>> rows) {
+        List<Configure> configures = new ArrayList<Configure>();
+        for (Map<String, Object> row : rows) {
+            Configure configure = new Configure();
+            Long id = getColumnValue(row, "id");
+            configure.setId(id);
+            Date create = getColumnValue(row, "gmt_create");
+            configure.setGmtCreate(create);
+            Date modify = getColumnValue(row, "gmt_modified");
+            configure.setGmtModified(modify);
+            String namespace = getColumnValue(row, "namespace");
+            if (StringUtil.isEmpty(namespace)) {
+                namespace = getColumnValue(row, "name_space");
+            }
+            configure.setNameSpace(namespace);
+            String type = getColumnValue(row, "type");
+            configure.setType(type);
+            String name = getColumnValue(row, "name");
+            configure.setName(name);
+            String jsonValue = getColumnValue(row, "json_value");
+            try {
+                jsonValue = AESUtil.aesDecrypt(jsonValue, ConfigureFileKey.SECRECY);
+            } catch (Exception e) {
+                LOG.error("can't decrypt the value, reason:\t" + e.getCause());
+                throw new RuntimeException(e);
+            }
+            configure.setJsonValue(jsonValue);
+            configures.add(configure);
+        }
+        return configures;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T> T getColumnValue(Map<String, Object> row, String columnName) {
+        Object value = row.get(columnName);
+        if (value == null) {
+            return null;
+        }
+        if (java.math.BigInteger.class.isInstance(value)) {
+            return (T)Long.valueOf(value.toString());
+        }
+        return (T)value;
+
+    }
+
+    protected JDBCDriver createResouce() {
+        JDBCDriver resource = DriverBuilder.createDriver(this.jdbcdriver, this.url, this.userName, this.password);
+        return resource;
+    }
+
+    public void setJdbcdriver(String jdbcdriver) {
+        this.jdbcdriver = jdbcdriver;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    private void regJdbcDriver(String jdbcdriver) {
+        try {
+            if (StringUtil.isEmpty(jdbcdriver)) {
+                jdbcdriver = AbstractComponent.DEFAULT_JDBC_DRIVER;
+            }
+            Class.forName(jdbcdriver);
+        } catch (ClassNotFoundException e) {
+            LOG.error("DBConfigureService regJdbcDriver ClassNotFoundException error", e);
+        } catch (Exception e) {
+            LOG.error("DBConfigureService regJdbcDriver error", e);
+        }
+    }
+
+    @Override
+    public void initProperty(Properties properties) {
+        this.jdbcdriver = properties.getProperty(AbstractComponent.JDBC_DRIVER);
+        regJdbcDriver(jdbcdriver);
+        this.url = properties.getProperty(AbstractComponent.JDBC_URL);
+        this.userName = properties.getProperty(AbstractComponent.JDBC_USERNAME);
+        this.password = properties.getProperty(AbstractComponent.JDBC_PASSWORD);
+        String tableName = properties.getProperty(AbstractComponent.JDBC_TABLE_NAME);
+        String isCompatibilityOldRuleEngine = properties.getProperty(AbstractComponent.JDBC_COMPATIBILITY_OLD_RULEENGINE);
+        if (StringUtil.isNotEmpty(isCompatibilityOldRuleEngine)) {
+            this.isCompatibilityOldRuleEngine = true;
+        }
+        if (StringUtil.isNotEmpty(tableName)) {
+            this.tableName = tableName;
+        }
+        LOG.info(
+            "Properties resource ,the info is: driver:" + this.jdbcdriver + ",url:" + this.url + ",username:" + userName
+                + ",password:" + password);
+    }
+
+    @Override
+    protected void insertConfigurable(IConfigurable configurable) {
+        saveOrUpdate(configurable);
+    }
+
+    @Override
+    protected void updateConfigurable(IConfigurable configurable) {
+        saveOrUpdate(configurable);
+    }
+
+    @Override
+    public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+
+        List<Configure> configures = queryConfigureByNamespaceInner(type, namespace);
+        List<IConfigurable> configurables = convert(configures);
+        List<T> result = new ArrayList<>();
+        for (IConfigurable configurable : configurables) {
+            result.add((T)configurable);
+        }
+        return result;
+    }
+}
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureService.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureService.java
new file mode 100644
index 0000000..77b82e0
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.configuable;
+
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configurable.service.ConfigurableServcieType;
+import org.apache.rocketmq.streams.common.model.ServiceName;
+import com.google.auto.service.AutoService;
+import org.apache.rocketmq.streams.configurable.service.AbstractSupportParentConfigureService;
+
+import java.util.Properties;
+
+@AutoService(IConfigurableService.class)
+@ServiceName(ConfigurableServcieType.DEFAULT_SERVICE_NAME)
+public class DBSupportParentConfigureService extends AbstractSupportParentConfigureService {
+
+    @Override
+    protected void initBeforeInitConfigurable(Properties property) {
+        this.parentConfigureService = new DBConfigureService(property);
+        this.configureService = new DBConfigureService(property);
+
+    }
+}
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/DriverBuilder.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/DriverBuilder.java
new file mode 100644
index 0000000..c0e9f53
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/DriverBuilder.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.driver;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.utils.ReflectUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * 创建JDBCDriver,如果没有
+ */
+public class DriverBuilder {
+
+    private static final Log LOG = LogFactory.getLog(DriverBuilder.class);
+
+    public static final String DEFALUT_JDBC_DRIVER = "com.mysql.jdbc.Driver";
+
+    private static final Map<String, JDBCDriver> dataSourceMap = new ConcurrentHashMap<>();
+
+    private static AtomicInteger count = new AtomicInteger(0);
+
+    /**
+     * 使用ConfiguableComponent在属性文件配置的jdbc信息,dipper默认都是使用这个数据库连接 如果需要连接其他库,需要使用带参数的createDriver
+     *
+     * @return
+     */
+    public static JDBCDriver createDriver() {
+        String driver = ComponentCreator.getProperties().getProperty(AbstractComponent.JDBC_DRIVER);
+        String url = ComponentCreator.getProperties().getProperty(AbstractComponent.JDBC_URL);
+        String userName = ComponentCreator.getProperties().getProperty(AbstractComponent.JDBC_USERNAME);
+        String password = ComponentCreator.getProperties().getProperty(AbstractComponent.JDBC_PASSWORD);
+        return createDriver(driver, url, userName, password);
+    }
+
+    /**
+     * 根据数据库连接信息创建连接,并返回JDBCDriver
+     *
+     * @param driver   数据库驱动,如果为null,默认为mysql
+     * @param url      数据库连接url
+     * @param userName 用户名
+     * @param password 密码
+     * @return JDBCDriver
+     */
+    public static JDBCDriver createDriver(String driver, final String url, final String userName,
+                                          final String password) {
+        if (StringUtil.isEmpty(driver)) {
+            driver = DEFALUT_JDBC_DRIVER;
+        }
+        String className = ComponentCreator.getDBProxyClassName();
+        if (StringUtil.isNotEmpty(className)) {
+            Class clazz = ReflectUtil.forClass(className);
+            try {
+                Constructor constructor = clazz.getConstructor(
+                    new Class[] {String.class, String.class, String.class, String.class});
+                JDBCDriver abstractDBDataSource = (JDBCDriver)constructor.newInstance(url, userName, password,
+                    driver);
+                abstractDBDataSource.init();
+                return abstractDBDataSource;
+            } catch (Exception e) {
+                throw new RuntimeException(e.getMessage(), e);
+            }
+        }
+
+        final String jdbcdriver = driver;
+        ReflectUtil.forClass(jdbcdriver);
+        JDBCDriver resource = new JDBCDriver();
+        LOG.debug("jdbcdriver=" + jdbcdriver + ",url=" + url);
+        resource.setJdbcDriver(jdbcdriver);
+        resource.setUrl(url);
+        resource.setUserName(userName);
+        resource.setPassword(password);
+        resource.init();
+        return resource;
+    }
+
+    /**
+     * 生成拼接字符串
+     *
+     * @param url
+     * @param userName
+     * @param password
+     * @return
+     */
+    private static String genereateKey(String url, String userName, String password) {
+        return url + "_" + userName + "_" + password;
+    }
+
+}
+
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/IDriverBudiler.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/IDriverBudiler.java
new file mode 100644
index 0000000..6be77eb
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/IDriverBudiler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.driver;
+
+import org.apache.rocketmq.streams.common.dboperator.IDBDriver;
+
+/**
+ * 返回操作数据库的driver对象,并且提供方法,判断driver是否有效,以及销毁的方法
+ */
+public interface IDriverBudiler {
+
+    /**
+     * 和dipper系统同数据源
+     *
+     * @return
+     */
+    IDBDriver createDBDriver();
+
+    boolean isValidate();
+
+    void destroy();
+}
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/JDBCDriver.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/JDBCDriver.java
new file mode 100644
index 0000000..356bce6
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/JDBCDriver.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.driver;
+
+import org.apache.rocketmq.streams.common.channel.sink.ISink;
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
+import org.apache.rocketmq.streams.common.dboperator.IDBDriver;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.SingleConnectionDataSource;
+import org.springframework.jdbc.support.GeneratedKeyHolder;
+import org.springframework.jdbc.support.KeyHolder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 数据库常用操作的封装,核心实现的接口是IJdbcTemplate 这个对象实现了IConfigurable接口,可以序列化存储和网络传输 数据库参数,可以配置成名字,实际值在配置文件配置
+ * <p>
+ */
+public class JDBCDriver extends BasedConfigurable implements IDriverBudiler, IDBDriver {
+    private String jdbcDriver = DriverBuilder.DEFALUT_JDBC_DRIVER;
+    @ENVDependence
+    protected String url;
+    @ENVDependence
+    protected String userName;
+    @ENVDependence
+    protected String password;
+
+    protected transient javax.sql.DataSource dataSource;
+    private transient IDBDriver dbDriver = null;
+
+    public JDBCDriver(String url, String userName, String password,
+                      String driver) {
+        setType(ISink.TYPE);
+        this.url = url;
+        this.userName = userName;
+        this.password = password;
+        if (StringUtil.isNotEmpty(driver)) {
+            this.jdbcDriver = driver;
+        }
+    }
+
+    public JDBCDriver() {
+        setType(ISink.TYPE);
+    }
+
+    protected IDBDriver createOrGetDriver() {
+        if (dbDriver == null) {
+            synchronized (this) {
+                if (dbDriver == null) {
+                    dbDriver = createDBDriver();
+                    if (dataSource == null) {
+                        dataSource = createDBDataSource();
+                    }
+                }
+            }
+        }
+        return dbDriver;
+    }
+
+    @Override
+    public IDBDriver createDBDriver() {
+        javax.sql.DataSource dataSource = createDBDataSource();
+        return new IDBDriver() {
+            private final JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
+
+            @Override
+            public int update(String sql) {
+                return jdbcTemplate.update(sql);
+            }
+
+            @Override
+            public void execute(String sql) {
+                jdbcTemplate.execute(sql);
+            }
+
+            @Override
+            public List<Map<String, Object>> queryForList(String sql) {
+                return jdbcTemplate.queryForList(sql);
+            }
+
+            @Override
+            public Map<String, Object> queryOneRow(String sql) {
+                return jdbcTemplate.queryForMap(sql);
+            }
+
+            @Override
+            public long executeInsert(String sql) {
+                try {
+                    KeyHolder keyHolder = new GeneratedKeyHolder();
+                    jdbcTemplate.update(con -> con.prepareStatement(sql, PreparedStatement.RETURN_GENERATED_KEYS), keyHolder);
+                    if (keyHolder.getKeyList() == null || keyHolder.getKeyList().size() > 1 || keyHolder.getKey() == null) {
+                        return 0;
+                    }
+                    return keyHolder.getKey().longValue();
+                } catch (Exception e) {
+                    String errorMsg = "execute builder error ,the builder is " + sql + ". the error msg is " + e.getMessage();
+                    throw new RuntimeException(errorMsg, e);
+                }
+            }
+
+            @Override
+            public void executSqls(String... sqls) {
+                jdbcTemplate.batchUpdate(sqls);
+            }
+
+            @Override
+            public void executSqls(Collection<String> sqlCollection) {
+                if (sqlCollection == null || sqlCollection.size() == 0) {
+                    return;
+                }
+                String[] sqls = new String[sqlCollection.size()];
+                int i = 0;
+                Iterator<String> it = sqlCollection.iterator();
+                while (it.hasNext()) {
+                    String sql = it.next();
+                    sqls[i] = sql;
+                    i++;
+                }
+                executSqls(sqls);
+            }
+
+            /**
+             * 分批获取数据,最终获取全量数据
+             * @param sql 可执行的SQL
+             * @return 结果数据
+             */
+            @Override
+            public List<Map<String, Object>> batchQueryBySql(String sql, int batchSize) {
+                List<Map<String, Object>> rows = new ArrayList<>();
+                int startBatch;
+                String baseSql = sql;
+                if (sql.contains(";")) {
+                    baseSql = sql.substring(0, sql.indexOf(";"));
+                }
+                String batchSQL = baseSql + " limit 0," + batchSize;
+                List<Map<String, Object>> batchResult = queryForList(batchSQL);
+                int index = 1;
+                while (batchResult.size() >= batchSize) {
+                    rows.addAll(batchResult);
+                    startBatch = batchSize * index;
+                    batchSQL = baseSql + " limit " + startBatch + "," + batchSize;
+                    batchResult = queryForList(batchSQL);
+                    index++;
+                }
+                rows.addAll(batchResult);
+
+                return rows;
+            }
+        };
+    }
+
+    protected javax.sql.DataSource createDBDataSource() {
+
+        SingleConnectionDataSource dataSource = new SingleConnectionDataSource(url, userName, password, true);
+
+        dataSource.setDriverClassName(jdbcDriver);// add by 林行0221
+        // 专有云落地运维中心联调时发现独立打包时必须加这句话,否则会报找不到驱动。(MYSQL驱动包虽然已经打进去了但还要在这里显示指定)
+        dataSource.setSuppressClose(true);
+        this.dataSource = dataSource;
+        return dataSource;
+    }
+
+    @Override
+    public boolean isValidate() {
+        try {
+            if (dataSource == null) {
+                dataSource = createDBDataSource();
+            }
+            dataSource.getConnection();
+        } catch (SQLException e) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void destroy() {
+        if (dataSource instanceof SingleConnectionDataSource) {
+            SingleConnectionDataSource data = (SingleConnectionDataSource)dataSource;
+            data.destroy();
+        }
+    }
+
+    public String getJdbcDriver() {
+        return jdbcDriver;
+    }
+
+    public void setJdbcDriver(String jdbcDriver) {
+        this.jdbcDriver = jdbcDriver;
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    @Override
+    public int update(String sql) {
+        return createOrGetDriver().update(sql);
+    }
+
+    @Override
+    public void execute(String sql) {
+        createOrGetDriver().execute(sql);
+    }
+
+    @Override
+    public List<Map<String, Object>> queryForList(String sql) {
+        return createOrGetDriver().queryForList(sql);
+    }
+
+    @Override
+    public Map<String, Object> queryOneRow(String sql) {
+        return createOrGetDriver().queryOneRow(sql);
+    }
+
+    @Override
+    public long executeInsert(String sql) {
+        return createOrGetDriver().executeInsert(sql);
+    }
+
+    @Override
+    public void executSqls(String... sqls) {
+        createOrGetDriver().executSqls(sqls);
+    }
+
+    @Override
+    public void executSqls(Collection<String> sqls) {
+        createOrGetDriver().executSqls(sqls);
+    }
+
+    @Override
+    public List<Map<String, Object>> batchQueryBySql(String sql, int batchSize) {
+        return createOrGetDriver().batchQueryBySql(sql, batchSize);
+    }
+}
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/BatchRowLoader.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/BatchRowLoader.java
new file mode 100644
index 0000000..3e4548c
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/BatchRowLoader.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.driver.batchloader;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.alibaba.fastjson.JSONObject;
+
+import org.apache.rocketmq.streams.common.cache.compress.impl.IntValueKV;
+import org.apache.rocketmq.streams.common.utils.SQLUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.db.driver.JDBCDriver;
+import org.apache.rocketmq.streams.db.driver.DriverBuilder;
+
+/**
+ * 多线程批量加载数据,每加载一批数据后,通过IRowOperator回调接口处理数据 需要有递增的字段,这个字段有索引,不重复,如id字段
+ */
+public class BatchRowLoader {
+    private static final Log LOG = LogFactory.getLog(BatchRowLoader.class);
+    protected static final int MAX_LINE = 5000;//每个批次最大行数,根据这个值划分并行任务
+    protected static ExecutorService executorService = null;
+    protected String idFieldName;//配置字段名称,这个字段的值是数字的,且是递增的
+    protected String sql;//查询的sql语句,类似select * from table where idFieldName>#{idFieldName=0} order by idFieldName.不要加limit,系统会自动添加
+    protected int batchSize = 1000;//每批从数据库加载的数据量
+    protected IRowOperator dataRowProcessor;//加载的数据由这个回调接口处理
+    private JDBCDriver jdbcDriver;
+
+    public BatchRowLoader(String idFieldName, String sql, IRowOperator dataRowProcessor) {
+        this.idFieldName = idFieldName;
+        this.sql = sql;
+        this.dataRowProcessor = dataRowProcessor;
+        this.jdbcDriver = DriverBuilder.createDriver();
+        executorService = new ThreadPoolExecutor(20, 20,
+            0L, TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(1000));
+    }
+
+    public void startLoadData() {
+        try {
+            String statisticalSQL = sql;
+            int startIndex = sql.toLowerCase().indexOf("from");
+            statisticalSQL = "select count(1) as c, min(" + idFieldName + ") as min, max(" + idFieldName + ") as max "
+                + sql.substring(startIndex);
+            List<Map<String, Object>> rows = jdbcDriver.queryForList(statisticalSQL);
+            Map<String, Object> row = rows.get(0);
+            int count = Integer.valueOf(row.get("c").toString());
+            if (count == 0) {
+                LOG.warn("there is no data during execute sql: " + statisticalSQL);
+                return;
+            }
+
+            IntValueKV intValueKV = new IntValueKV(count);
+            //int maxBatch=count/maxSyncCount;//每1w条数据,一个并发。如果数据量比较大,为了提高性能,并行执行
+
+            long min = Long.valueOf(row.get("min").toString());
+            long max = Long.valueOf(row.get("max").toString());
+            int maxSyncCount = count / MAX_LINE + 1;
+            long step = (max - min + 1) / maxSyncCount;
+            CountDownLatch countDownLatch = new CountDownLatch(maxSyncCount + 1);
+            AtomicInteger finishedCount = new AtomicInteger(0);
+            String taskSQL = null;
+            if (sql.indexOf(" where ") != -1) {
+                taskSQL = sql + " and " + idFieldName + ">#{startIndex} and " + idFieldName + "<=#{endIndex} order by "
+                    + idFieldName + " limit " + batchSize;
+            } else {
+                taskSQL = sql + " where " + idFieldName + ">#{startIndex} and " + idFieldName
+                    + "<=#{endIndex} order by " + idFieldName + " limit " + batchSize;
+            }
+
+            int i = 0;
+            for (; i < maxSyncCount; i++) {
+                FetchDataTask
+                    fetchDataTask = new FetchDataTask(taskSQL, (min - 1) + step * i,
+                    (min - 1) + step * (i + 1), countDownLatch, finishedCount, jdbcDriver, count);
+                executorService.execute(fetchDataTask);
+            }
+            FetchDataTask
+                fetchDataTask = new FetchDataTask(taskSQL, (min - 1) + step * i, (min - 1) + step * (i + 1),
+                countDownLatch, finishedCount, jdbcDriver, count);
+            executorService.execute(fetchDataTask);
+
+            countDownLatch.await();
+
+            LOG.info(getClass().getSimpleName() + " load data finish, load data line  size is " + count);
+        } catch (Exception e) {
+            LOG.error("failed loading data batch!", e);
+        } finally {
+            jdbcDriver.destroy();
+        }
+    }
+
+    protected class FetchDataTask implements Runnable {
+        long startIndex;
+        long endIndex;
+        String sql;
+        CountDownLatch countDownLatch;
+        JDBCDriver resource;
+        AtomicInteger finishedCount;//完成了多少条
+        int totalSize;//一共有多少条数据
+
+        public FetchDataTask(String sql, long startIndex, long endIndex, CountDownLatch countDownLatch,
+                             AtomicInteger finishedCount, JDBCDriver resource, int totalSize) {
+            this.startIndex = startIndex;
+            this.endIndex = endIndex;
+            this.countDownLatch = countDownLatch;
+            this.sql = sql;
+            this.finishedCount = finishedCount;
+            this.resource = resource;
+            this.totalSize = totalSize;
+        }
+
+        @Override
+        public void run() {
+            long currentIndex = startIndex;
+            JSONObject msg = new JSONObject();
+            msg.put("endIndex", endIndex);
+            while (true) {
+                try {
+
+                    msg.put("startIndex", currentIndex);
+
+                    String sql = SQLUtil.parseIbatisSQL(msg, this.sql);
+                    List<Map<String, Object>> rows = resource.queryForList(sql);
+                    if (rows == null || rows.size() == 0) {
+                        break;
+                    }
+                    currentIndex = Long.valueOf(rows.get(rows.size() - 1).get(idFieldName).toString());
+
+                    int size = rows.size();
+                    int count = finishedCount.addAndGet(size);
+                    double progress = (double)count / (double)totalSize;
+                    progress = progress * 100;
+                    System.out.println(" finished count is " + count + " the total count is " + totalSize + ", the progress is " + String.format("%.2f", progress) + "%");
+                    if (size < batchSize) {
+                        if (size > 0) {
+
+                            doProcess(rows);
+                        }
+                        break;
+                    }
+                    doProcess(rows);
+                } catch (Exception e) {
+                    throw new RuntimeException("put data error ", e);
+                }
+            }
+
+            countDownLatch.countDown();
+        }
+    }
+
+    private void doProcess(List<Map<String, Object>> rows) {
+        for (Map<String, Object> row : rows) {
+            dataRowProcessor.doProcess(row);
+        }
+    }
+}
+
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/IRowOperator.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/IRowOperator.java
new file mode 100644
index 0000000..f67393f
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/IRowOperator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.driver.batchloader;
+
+import java.util.Map;
+
+/**
+ * 操作一行数据
+ */
+public interface IRowOperator {
+
+    /**
+     * 处理一行数据
+     *
+     * @param row
+     */
+    void doProcess(Map<String, Object> row);
+
+}
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java
new file mode 100644
index 0000000..20529b0
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.driver.orm;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.common.model.Entity;
+import org.apache.rocketmq.streams.common.configurable.IFieldProcessor;
+import org.apache.rocketmq.streams.common.datatype.DataType;
+import org.apache.rocketmq.streams.common.metadata.MetaData;
+import org.apache.rocketmq.streams.common.utils.CollectionUtil;
+import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
+import org.apache.rocketmq.streams.common.utils.DateUtil;
+import org.apache.rocketmq.streams.common.utils.ReflectUtil;
+import org.apache.rocketmq.streams.common.utils.SQLUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.db.driver.DriverBuilder;
+import org.apache.rocketmq.streams.db.driver.JDBCDriver;
+
+/**
+ * 轻量级的orm框架,如果pojo和table 符合驼峰的命名和下划线的命名规范,可以自动实现对象的orm
+ */
+public class ORMUtil {
+    private static final Log LOG = LogFactory.getLog(ORMUtil.class);
+
+    public static <T> T queryForObject(String sql, Object paras, Class<T> convertClass) {
+        return queryForObject(sql, paras, convertClass, null, null, null);
+    }
+
+    /**
+     * 通过sql查询一个唯一的对象出来,返回数据多于一条会报错
+     *
+     * @param sql                                                                查询语句
+     * @param paras                                                              如果有变参,这里面有变参的参数,可以是map,json或对象,只要key名或字段名和sql的参数名相同即可
+     * @param convertClass,如果有变参,这里面有变参的参数,可以是map,json或对象,只要key名或字段名和sql的参数名相同即可
+     * @param url                                                                数据库连接url
+     * @param userName                                                           用户名
+     * @param password                                                           密码
+     * @param <T>
+     * @return 转换后的对象
+     */
+    public static <T> T queryForObject(String sql, Object paras, Class<T> convertClass, String url, String userName,
+                                       String password) {
+        List<T> result = queryForList(sql, paras, convertClass, url, userName, password);
+        if (result == null || result.size() == 0) {
+            return null;
+        }
+        if (result.size() > 1) {
+            throw new RuntimeException("expect only one row, actual " + result.size() + ". the builder is " + sql);
+        }
+        return result.get(0);
+    }
+
+    /**
+     * 根据sql查询一批对象
+     *
+     * @param sql          查询语句
+     * @param paras        如果有变参,这里面有变参的参数,可以是map,json或对象,只要key名或字段名和sql的参数名相同即可
+     * @param convertClass 如果有变参,这里面有变参的参数,可以是map,json或对象,只要key名或字段名和sql的参数名相同即可
+     * @param <T>
+     * @return 返回对象列表
+     */
+    public static <T> List<T> queryForList(String sql, Object paras, Class<T> convertClass) {
+        return queryForList(sql, paras, convertClass, null, null, null);
+    }
+
+    public static boolean hasConfigueDB() {
+        return ComponentCreator.getProperties().getProperty(ConfigureFileKey.JDBC_URL) != null;
+    }
+
+    /**
+     * @param sql          查询语句
+     * @param paras        如果有变参,这里面有变参的参数,可以是map,json或对象,只要key名或字段名和sql的参数名相同即可
+     * @param convertClass 需要转换成的对象类。类的字段应该符合列名的命名规范,下划线换成驼峰形式
+     * @param url          数据库连接url
+     * @param userName     用户名
+     * @param password     密码
+     * @param <T>
+     * @return 返回对象列表
+     */
+    public static <T> List<T> queryForList(String sql, Object paras, Class<T> convertClass, String url, String userName,
+                                           String password) {
+        sql = SQLUtil.parseIbatisSQL(paras, sql);
+        JDBCDriver dataSource = null;
+        try {
+            if (StringUtil.isEmpty(url) || StringUtil.isEmpty(userName)) {
+                dataSource = DriverBuilder.createDriver();
+            } else {
+                dataSource = DriverBuilder.createDriver(DriverBuilder.DEFALUT_JDBC_DRIVER, url, userName, password);
+            }
+
+            List<Map<String, Object>> rows = dataSource.queryForList(sql);
+            List<T> result = new ArrayList();
+            for (Map<String, Object> row : rows) {
+                T t = convert(row, convertClass);
+                result.add(t);
+            }
+            return result;
+        } catch (Exception e) {
+            String errorMsg = ("query for list  error ,the builder is " + sql + ". the error msg is " + e.getMessage());
+            LOG.error(errorMsg);
+            e.printStackTrace();
+            throw new RuntimeException(errorMsg, e);
+        } finally {
+            if (dataSource != null) {
+                dataSource.destroy();
+            }
+        }
+    }
+
+    /**
+     * 执行sql,sql中可以有mybatis的参数#{name}
+     *
+     * @param sql   insert语句
+     * @param paras 可以是map,json或对象,只要key名或字段名和sql的参数名相同即可
+     * @return
+     */
+    public static boolean executeSQL(String sql, Object paras) {
+        if (paras != null) {
+            sql = SQLUtil.parseIbatisSQL(paras, sql);
+        }
+        JDBCDriver dataSource = null;
+        try {
+            dataSource = DriverBuilder.createDriver();
+            dataSource.execute(sql);
+            return true;
+        } catch (Exception e) {
+            String errorMsg = ("execute sql  error ,the sql is " + sql + ". the error msg is " + e.getMessage());
+            LOG.error(errorMsg);
+            e.printStackTrace();
+            throw new RuntimeException(errorMsg, e);
+        } finally {
+            if (dataSource != null) {
+                dataSource.destroy();
+            }
+        }
+    }
+
+    /**
+     * 把一个对象的字段拼接成where条件,如果字段值为null,不拼接
+     *
+     * @param object     带拼接的对象
+     * @param fieldNames 需要拼接的字段名,如果为null,返回null
+     * @return where 部分的sql
+     */
+    public static String createQueryWhereSql(Object object, String... fieldNames) {
+        if (fieldNames == null || fieldNames.length == 0) {
+            return "";
+        }
+        StringBuilder stringBuilder = new StringBuilder();
+        boolean isFirst = true;
+        for (String fieldName : fieldNames) {
+            Object value = ReflectUtil.getBeanFieldValue(object, fieldName);
+            if (object != null && value == null) {
+                continue;
+            }
+            if (isFirst) {
+                isFirst = false;
+            } else {
+                stringBuilder.append(" and ");
+            }
+            String columnName = getColumnNameFromFieldName(fieldName);
+            stringBuilder.append(" " + columnName + "=#{" + fieldName + "} ");
+        }
+        return stringBuilder.toString();
+    }
+
+    /**
+     * 把一行数据转换成一个对象,符合驼峰的命名和下划线的命名规范
+     *
+     * @param row   一行数据
+     * @param clazz 待转化的对象类型
+     * @param <T>
+     * @return 转化对象
+     */
+    public static <T> T convert(Map<String, Object> row, Class<T> clazz) {
+        T t = ReflectUtil.forInstance(clazz);
+        Iterator<Map.Entry<String, Object>> it = row.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, Object> entry = it.next();
+            String columnName = entry.getKey();
+            Object value = entry.getValue();
+            if (value == null) {
+                continue;
+            }
+            String fieldName = getFieldNameFromColumnName(columnName);
+            DataType datatype = DataTypeUtil.createFieldDataType(clazz, fieldName);
+            Object columnValue = datatype.convert(value);
+            ReflectUtil.setBeanFieldValue(t, fieldName, columnValue);
+        }
+        return t;
+    }
+
+    /**
+     * 把列名转换成字段名称,把下划线转化成驼峰
+     *
+     * @param columnName
+     * @return
+     */
+    protected static String getFieldNameFromColumnName(String columnName) {
+        String[] values = columnName.split("_");
+        if (values.length == 1) {
+            return columnName;
+        }
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append(values[0]);
+        for (int i = 1; i < values.length; i++) {
+            String value = values[i];
+            value = value.substring(0, 1).toUpperCase() + value.substring(1);
+            stringBuilder.append(value);
+        }
+        return stringBuilder.toString();
+    }
+
+    /**
+     * 对象批量替换,会生成replace into语句,多个对象会拼接成一个sql,提升效率
+     *
+     * @param values 待插入对象
+     */
+    public static void batchReplaceInto(Collection values) {
+        List list = new ArrayList<>();
+        list.addAll(values);
+        batchReplaceInto(list);
+    }
+
+    public static void batchReplaceInto(Object... valueArray) {
+        List values = new ArrayList();
+        if (valueArray != null) {
+            for (Object value : valueArray) {
+                values.add(value);
+            }
+        }
+        batchReplaceInto(values);
+    }
+
+    public static void batchReplaceInto(List<?> values) {
+        batchIntoByFlag(values, 1);
+    }
+
+    /**
+     * 对象批量插入,如果主键冲突会忽略,会生成insert ignore into语句,多个对象会拼接成一个sql,提升效率
+     *
+     * @param values 待插入对象
+     */
+    public static void batchIgnoreInto(List<?> values) {
+        batchIntoByFlag(values, -1);
+    }
+
+    /**
+     * 对象批量插入,如果主键冲突会忽略,会生成insert ignore into语句,多个对象会拼接成一个sql,提升效率
+     *
+     * @param values 待插入对象
+     */
+    public static void batchInsertInto(List<?> values) {
+        batchIntoByFlag(values, 0);
+    }
+
+    /**
+     * 批量插入对象,多个对象会拼接成一个sql flag==1 then replace into flag=-1 then insert ignore int flag=0 then insert int
+     *
+     * @param values
+     * @param flag
+     */
+    protected static void batchIntoByFlag(List<?> values, int flag) {
+        if (CollectionUtil.isEmpty(values)) {
+            return;
+        }
+        Object object = values.get(0);
+        Map<String, Object> paras = new HashMap<>(16);
+        MetaData metaData = createMetaDate(object, paras);
+        boolean containsIdField = false;
+        if (metaData.getIdFieldName() != null) {
+            for (Object o : values) {
+                Object id = ReflectUtil.getDeclaredField(o, metaData.getIdFieldName());
+                if (id == null) {
+                    containsIdField = false;
+                    break;
+                }
+                if (id instanceof Number) {
+                    if (Long.valueOf(id.toString()) == 0) {
+                        containsIdField = false;
+                        break;
+                    }
+                }
+                if (id instanceof String) {
+                    String idStr = (String)id;
+                    if (StringUtil.isEmpty(idStr)) {
+                        containsIdField = false;
+                        break;
+                    }
+                }
+            }
+        }
+
+        String sql = null;
+        if (flag == 0) {
+            sql = SQLUtil.createInsertSql(metaData, paras, containsIdField);
+        } else if (flag == 1) {
+            sql = SQLUtil.createInsertSql(metaData, paras, containsIdField);
+        } else if (flag == -1) {
+            sql = SQLUtil.createIgnoreInsertSql(metaData, paras, containsIdField);
+        } else {
+            throw new RuntimeException("the flag is not valdate " + flag);
+        }
+
+        List<Map<String, Object>> rows = new ArrayList<>();
+        for (int i = 1; i < values.size(); i++) {
+            Map<String, Object> row = createRow(metaData, values.get(i));
+            rows.add(row);
+        }
+        String valuesSQL = SQLUtil.createInsertValuesSQL(metaData, rows);
+        sql = sql + valuesSQL + " ON DUPLICATE  KEY  UPDATE " + SQLUtil.createDuplicateKeyUpdateSQL(metaData);
+        ;
+        JDBCDriver dataSource = DriverBuilder.createDriver();
+        try {
+            dataSource.execute(sql);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        } finally {
+            if (dataSource != null) {
+                dataSource.destroy();
+            }
+        }
+    }
+
+    private static Map<String, Object> createRow(MetaData metaData, Object object) {
+        Map<String, Object> row = new HashMap<>();
+        ReflectUtil.scanFields(object, new IFieldProcessor() {
+            @Override
+            public void doProcess(Object o, Field field) {
+                String fieldName = field.getName();
+                String columnName = getColumnNameFromFieldName(fieldName);
+                Object value = ReflectUtil.getBeanFieldValue(o, fieldName);
+                row.put(columnName, value);
+
+            }
+        });
+        return row;
+    }
+
+    public static void replaceInto(Object object) {
+        replaceInto(object, null, null, null);
+    }
+
+    /**
+     * 把一个对象插入到数据库,对象符合插入规范,表名是对象名转小写后加下划线。如果有重复到会被替换成最新的
+     *
+     * @param object
+     * @param url
+     * @param userName
+     * @param password
+     */
+    public static void replaceInto(Object object, String url, String userName, String password) {
+        Map<String, Object> paras = new HashMap<>();
+        if (Entity.class.isInstance(object)) {
+            Entity newEntity = (Entity)object;
+            newEntity.setGmtModified(new Date());
+            if (newEntity.getGmtCreate() == null) {
+                newEntity.setGmtCreate(new Date());
+            }
+        }
+        MetaData metaData = createMetaDate(object, paras);
+        String sql = SQLUtil.createReplacesInsertSql(metaData, paras, metaData.getIdFieldName() != null);
+        JDBCDriver dataSource = null;
+        try {
+            if (StringUtil.isEmpty(url) || StringUtil.isEmpty(userName)) {
+                dataSource = DriverBuilder.createDriver();
+            } else {
+                dataSource = DriverBuilder.createDriver(DriverBuilder.DEFALUT_JDBC_DRIVER, url, userName, password);
+            }
+            long id = dataSource.executeInsert(sql);
+            if (Entity.class.isInstance(object)) {
+                Entity newEntity = (Entity)object;
+                newEntity.setId(id);
+            }
+        } catch (Exception e) {
+            String errorMsg = ("replace into error ,the builder is " + sql + ". the error msg is " + e.getMessage());
+            LOG.error(errorMsg);
+            throw new RuntimeException(errorMsg, e);
+        } finally {
+            if (dataSource != null) {
+                dataSource.destroy();
+            }
+        }
+    }
+
+    public static void insertInto(Object object, boolean ignoreRepeateRow) {
+        insertInto(object, ignoreRepeateRow, null, null, null);
+    }
+
+    /**
+     * 把一个对象插入到数据库,对象符合插入规范,表名是对象名转小写后加下划线
+     *
+     * @param object
+     * @param ignoreRepeateRow,如果是重复数据,则不插入。基于唯一建做判断
+     * @param url
+     * @param userName
+     * @param password
+     */
+    public static void insertInto(Object object, boolean ignoreRepeateRow, String url, String userName,
+                                  String password) {
+        Map<String, Object> paras = new HashMap<>();
+        if (Entity.class.isInstance(object)) {
+            Entity newEntity = (Entity)object;
+            newEntity.setGmtCreate(DateUtil.getCurrentTime());
+            newEntity.setGmtModified(DateUtil.getCurrentTime());
+        }
+        MetaData metaData = createMetaDate(object, paras);
+        String sql = null;
+        if (ignoreRepeateRow) {
+            sql = SQLUtil.createIgnoreInsertSql(metaData, paras, metaData.getIdFieldName() != null);
+        } else {
+            sql = SQLUtil.createInsertSql(metaData, paras, metaData.getIdFieldName() != null);
+        }
+        JDBCDriver dataSource = null;
+        try {
+            if (StringUtil.isEmpty(url) || StringUtil.isEmpty(userName)) {
+                dataSource = DriverBuilder.createDriver();
+            } else {
+                dataSource = DriverBuilder.createDriver(DriverBuilder.DEFALUT_JDBC_DRIVER, url, userName, password);
+            }
+            long id = dataSource.executeInsert(sql);
+            if (Entity.class.isInstance(object)) {
+                Entity newEntity = (Entity)object;
+                newEntity.setId(id);
+            }
+        } catch (Exception e) {
+            String errorMsg = ("insert into error ,the builder is " + sql + ". the error msg is " + e.getMessage());
+            LOG.error(errorMsg);
+            throw new RuntimeException(errorMsg, e);
+        } finally {
+            if (dataSource != null) {
+                dataSource.destroy();
+            }
+        }
+    }
+
+    /**
+     * 创建meta信息
+     *
+     * @param object
+     * @param paras
+     * @return
+     */
+    public static MetaData createMetaDate(Object object, Map<String, Object> paras) {
+        MetaData metaData = MetaData.createMetaDate(object, paras);
+        return metaData;
+    }
+
+    public static String getTableName(Class clazz) {
+        return getColumnNameFromFieldName(clazz.getSimpleName());
+    }
+
+    /**
+     * 把驼峰转换成下划线的形式
+     *
+     * @param para
+     * @return
+     */
+    protected static String getColumnNameFromFieldName(String para) {
+        return MetaData.getColumnNameFromFieldName(para);
+    }
+
+}
diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/operator/SQLOperator.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/operator/SQLOperator.java
new file mode 100644
index 0000000..1f42470
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/operator/SQLOperator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.operator;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.db.driver.JDBCDriver;
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+import org.apache.rocketmq.streams.common.configurable.annotation.Changeable;
+import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
+import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
+import org.apache.rocketmq.streams.common.context.AbstractContext;
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.topology.ChainStage;
+import org.apache.rocketmq.streams.common.topology.builder.IStageBuilder;
+import org.apache.rocketmq.streams.common.topology.stages.NewSQLChainStage;
+import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
+import org.apache.rocketmq.streams.common.utils.SQLUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.rocketmq.streams.db.driver.DriverBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * sql算法,执行一个sql,sql中可以有变量,会用message的值做替换。
+ */
+public class SQLOperator extends BasedConfigurable implements IStreamOperator<IMessage, IMessage>, IStageBuilder<ChainStage> {
+    private static final Log LOG = LogFactory.getLog(SQLOperator.class);
+    public static final String DEFALUT_DATA_KEY = "data";
+
+    @ENVDependence
+    protected String jdbcDriver = AbstractComponent.DEFAULT_JDBC_DRIVER;
+    @ENVDependence
+    protected String url;
+    @ENVDependence
+    protected String userName;
+    @ENVDependence
+    protected String password;
+
+    @Changeable
+    protected String sql;//查询的sql,支持ibatis的语法和变量.因为会被替换,所以不自动感知。select * from table where name=#{name=chris}
+
+    public SQLOperator() {
+        setType(IStreamOperator.TYPE);
+    }
+
+    public SQLOperator(String sql, String url, String userName, String password) {
+        this();
+        this.sql = sql;
+        this.url = url;
+        this.password = password;
+        this.userName = userName;
+    }
+
+    /**
+     * db串多数是名字,可以取个名字前缀,如果值为空,默认为此类的name,name为空,默认为简单类名
+     *
+     * @param sql
+     * @param dbInfoNamePrex
+     */
+    public SQLOperator(String sql, String dbInfoNamePrex) {
+        this();
+        if (StringUtil.isEmpty(dbInfoNamePrex)) {
+            dbInfoNamePrex = getConfigureName();
+        }
+        if (StringUtil.isEmpty(dbInfoNamePrex)) {
+            dbInfoNamePrex = this.getClass().getSimpleName();
+        }
+        this.sql = sql;
+        this.url = dbInfoNamePrex + ".url";
+        this.password = dbInfoNamePrex + ".password";
+        ;
+        this.userName = dbInfoNamePrex + ".userName";
+    }
+
+    @Override
+    public IMessage doMessage(IMessage message, AbstractContext context) {
+        String querySQL = SQLUtil.parseIbatisSQL(message.getMessageBody(), sql);
+        List<Map<String, Object>> result = query(querySQL);
+        message.getMessageBody().put(DEFALUT_DATA_KEY, result);
+        return message;
+    }
+
+    /**
+     * 查询数据库数据
+     *
+     * @return
+     */
+    protected List<Map<String, Object>> query(String querySQL) {
+
+        JDBCDriver dataSource = null;
+        try {
+            dataSource = createDBDataSource();
+            List<Map<String, Object>> result = null;
+            result = dataSource.queryForList(sql);
+
+            return result;
+        } finally {
+            if (dataSource != null) {
+                dataSource.destroy();
+            }
+        }
+
+    }
+
+    public JDBCDriver createDBDataSource() {
+        return DriverBuilder.createDriver(jdbcDriver, url, userName, password);
+    }
+
+    @Override
+    public void addConfigurables(PipelineBuilder pipelineBuilder) {
+        pipelineBuilder.addConfigurables(this);
+    }
+
+    @Override
+    public ChainStage createStageChain(PipelineBuilder pipelineBuilder) {
+        NewSQLChainStage sqlChainStage = new NewSQLChainStage();
+        sqlChainStage.setMessageProcessor(this);
+        return sqlChainStage;
+    }
+
+    public String getSql() {
+        return sql;
+    }
+
+    public void setSql(String sql) {
+        this.sql = sql;
+    }
+
+    public String getJdbcDriver() {
+        return jdbcDriver;
+    }
+
+    public void setJdbcDriver(String jdbcDriver) {
+        this.jdbcDriver = jdbcDriver;
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+}
diff --git a/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/Person.java b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/Person.java
new file mode 100644
index 0000000..3d5e51b
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/Person.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
+
+public class Person extends BasedConfigurable {
+    @ENVDependence
+    private String name;
+    private int age;
+    private Boolean isMale;
+    private List<String> addresses;
+    private Map<String, Integer> childName2Age;
+
+    public static Person createPerson(String namespace) {
+        Person person = new Person();
+        person.setNameSpace(namespace);
+        person.setType("person");
+        person.setConfigureName("Chris");
+        person.setName("Chris");
+        List<String> addresses = new ArrayList<>();
+        addresses.add("huilongguan");
+        addresses.add("shangdi");
+        person.setAddresses(addresses);
+        Map<String, Integer> childName2Age = new HashMap<>();
+        childName2Age.put("yuanyahan", 8);
+        childName2Age.put("yuanruxi", 4);
+        person.setChildName2Age(childName2Age);
+        person.setMale(true);
+        person.setAge(18);
+        return person;
+    }
+
+    @Override
+    public String toString() {
+        return "Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses
+            + ", childName2Age=" + childName2Age + '}';
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public int getAge() {
+        return age;
+    }
+
+    public void setAge(int age) {
+        this.age = age;
+    }
+
+    public Boolean getMale() {
+        return isMale;
+    }
+
+    public void setMale(Boolean male) {
+        isMale = male;
+    }
+
+    public List<String> getAddresses() {
+        return addresses;
+    }
+
+    public void setAddresses(List<String> addresses) {
+        this.addresses = addresses;
+    }
+
+    public Map<String, Integer> getChildName2Age() {
+        return childName2Age;
+    }
+
+    public void setChildName2Age(Map<String, Integer> childName2Age) {
+        this.childName2Age = childName2Age;
+    }
+
+    @Override
+    public Object clone() {
+        Person person = null;
+        try {
+            person = (Person)super.clone();
+        } catch (CloneNotSupportedException e) {
+            System.out.println("clone error " + e);
+        }
+        return person;
+    }
+}
diff --git a/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureServiceTest.java b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureServiceTest.java
new file mode 100644
index 0000000..3baa65d
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureServiceTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.configuable;
+
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.configurable.ConfigurableComponent;
+import org.apache.rocketmq.streams.configurable.model.Configure;
+import org.apache.rocketmq.streams.db.Person;
+import org.apache.rocketmq.streams.db.driver.DriverBuilder;
+import org.junit.Test;
+
+import static junit.framework.TestCase.assertTrue;
+
+/**
+ * 数据库的存储,需要配置存储的连接参数,请先完成配置,后执行单元用例 如果未建表,可以通过Configure.createTableSQL() 获取建表语句,创建表后,测试
+ */
+public class DBSupportParentConfigureServiceTest {
+    private String URL = "";
+    protected String USER_NAME = "";
+    protected String PASSWORD = "";
+    protected String TABLE_NAME = "dipper_configure_source";
+
+    @Test
+    public void testDBConfigurableService() {
+        String namespace = "streams.db.configuable";
+
+        //正式使用时,在配置文件配置
+        ComponentCreator.getProperties().put(ConfigureFileKey.CONNECT_TYPE, "DB");
+        ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_URL, URL);//数据库连接url
+        ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_USERNAME, USER_NAME);//用户名
+        ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_PASSWORD, PASSWORD);//password
+        ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_TABLE_NAME, TABLE_NAME);
+
+        //如果表不存在,创建表
+        String sql = (Configure.createTableSQL(TABLE_NAME));
+        DriverBuilder.createDriver().execute(sql);
+        ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance(namespace);
+        configurableComponent.insert(createPerson(namespace));
+        configurableComponent.refreshConfigurable(namespace);
+        Person person = configurableComponent.queryConfigurable("person", "peronName");
+        assertTrue(person != null);
+    }
+
+    /**
+     * 创建configuable对象
+     *
+     * @param namespace
+     * @return
+     */
+    protected Person createPerson(String namespace) {
+        Person person = new Person();
+        person.setName("chris");
+        person.setAge(18);
+        person.setNameSpace(namespace);
+        person.setConfigureName("peronName");
+        person.setType("person");
+        return person;
+    }
+}
diff --git a/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtilTest.java b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtilTest.java
new file mode 100644
index 0000000..e9b1fa2
--- /dev/null
+++ b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtilTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.driver.orm;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.db.Person;
+import org.junit.Test;
+
+public class ORMUtilTest {
+    private String URL = "";
+    protected String USER_NAME = "";
+    protected String PASSWORD = "";
+
+    public ORMUtilTest() {
+        //正式使用时,在配置文件配置
+        ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_URL, URL);//数据库连接url
+        ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_USERNAME, USER_NAME);//用户名
+        ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_PASSWORD, PASSWORD);//password
+    }
+
+    @Test
+    public void testInsert() {
+        String namespace = "org.apache.configuable.test";
+        List<Person> personList = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            personList.add(createPerson(namespace, "chris" + i));
+        }
+        /**
+         * 不带数据库连接信息(url,userName,Password),使用ConfiguableComponet的连接信息
+         */
+        ORMUtil.batchIgnoreInto(personList);//批量插入,如果有唯一键冲突,替换
+        ORMUtil.batchIgnoreInto(personList);//批量插入,如果有唯一键冲突,忽略
+        ORMUtil.batchIntoByFlag(personList, 0);////批量插入,如果有唯一键冲突,跑错
+    }
+
+    @Test
+    public void testQueryList() {
+        Map<String, Integer> paras = new HashMap<>();
+        paras.put("age", 18);
+        List<Person> personList = ORMUtil.queryForList("select * from person where age >${age} limit 100", paras, Person.class);
+    }
+
+    @Test
+    public void testQueryOneRow() {
+        Person personPara = new Person();
+        personPara.setAge(18);
+        personPara.setName("chris1");
+        Person person = ORMUtil.queryForObject("select * from person where age =${age} and name='${name}' ", personPara, Person.class, URL, USER_NAME, PASSWORD);
+    }
+
+    /**
+     * 创建configuable对象
+     *
+     * @param namespace
+     * @return
+     */
+    protected Person createPerson(String namespace, String name) {
+        Person person = new Person();
+        person.setName(name);
+        person.setAge(18);
+        person.setNameSpace(namespace);
+        person.setConfigureName("peronName");
+        person.setType("person");
+        return person;
+    }
+}
diff --git a/rocketmq-streams-transport-minio/pom.xml b/rocketmq-streams-transport-minio/pom.xml
new file mode 100755
index 0000000..510b8cc
--- /dev/null
+++ b/rocketmq-streams-transport-minio/pom.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="utf-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.rocketmq</groupId>
+        <artifactId>rocketmq-streams</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>rocketmq-streams-transport-minio</artifactId>
+    <packaging>jar</packaging>
+    <name>ROCKETMQ STREAMS :: transport-minio</name>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-streams-serviceloader</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.minio</groupId>
+            <artifactId>minio</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/rocketmq-streams-transport-minio/rocketmq-streams-transport-minio.iml b/rocketmq-streams-transport-minio/rocketmq-streams-transport-minio.iml
new file mode 100644
index 0000000..af11f77
--- /dev/null
+++ b/rocketmq-streams-transport-minio/rocketmq-streams-transport-minio.iml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
+  <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5">
+    <output url="file://$MODULE_DIR$/${project.build.directory}/classes" />
+    <output-test url="file://$MODULE_DIR$/${project.build.directory}/test-classes" />
+    <content url="file://$MODULE_DIR$">
+      <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
+      <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
+      <sourceFolder url="file://$MODULE_DIR$/src/test/resources" type="java-test-resource" />
+      <excludeFolder url="file://$MODULE_DIR$/${project.build.directory}/classes" />
+      <excludeFolder url="file://$MODULE_DIR$/${project.build.directory}/test-classes" />
+      <excludeFolder url="file://$MODULE_DIR$/target" />
+    </content>
+    <orderEntry type="inheritedJdk" />
+    <orderEntry type="sourceFolder" forTests="false" />
+  </component>
+</module>
\ No newline at end of file
diff --git a/rocketmq-streams-transport-minio/src/main/java/org/apache/rocketmq/streams/transport/minio/MinioFileTransport.java b/rocketmq-streams-transport-minio/src/main/java/org/apache/rocketmq/streams/transport/minio/MinioFileTransport.java
new file mode 100644
index 0000000..c2a6884
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/main/java/org/apache/rocketmq/streams/transport/minio/MinioFileTransport.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.transport.minio;
+
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.common.model.ServiceName;
+import org.apache.rocketmq.streams.common.transport.AbstractFileTransport;
+import org.apache.rocketmq.streams.common.transport.IFileTransport;
+import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import com.google.auto.service.AutoService;
+import io.minio.MinioClient;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+@AutoService(IFileTransport.class)
+@ServiceName(MinioFileTransport.NAME)
+public class MinioFileTransport extends AbstractFileTransport {
+    public static final String NAME = "minio";
+    protected String ak;
+    protected String sk;
+    protected String endpoint;
+    protected String dirpperDir;
+    protected MinioClient minioClient;
+
+    public MinioFileTransport() {
+        this.ak = ComponentCreator.getProperties().getProperty(ConfigureFileKey.FILE_TRANSPORT_AK);
+        this.sk = ComponentCreator.getProperties().getProperty(ConfigureFileKey.FILE_TRANSPORT_SK);
+        this.endpoint = ComponentCreator.getProperties().getProperty(ConfigureFileKey.FILE_TRANSPORT_ENDPOINT);
+        this.dirpperDir = ComponentCreator.getProperties().getProperty(ConfigureFileKey.FILE_TRANSPORT_DIPPER_DIR);
+        if (StringUtil.isEmpty(this.dirpperDir)) {
+            this.dirpperDir = "dipper_files";
+        }
+    }
+
+    @Override
+    public File download(String remoteFileName, String localDir, String localFileName) {
+        MinioClient minioClient = getOrCreateMinioClient();
+        BufferedWriter bw = null;
+        BufferedReader br = null;
+        try {
+            InputStream input = minioClient.getObject(dirpperDir, remoteFileName);
+            File file = new File(FileUtil.concatFilePath(localDir, localFileName));
+            bw = new BufferedWriter(new FileWriter(file));
+            br = new BufferedReader(new InputStreamReader(input));
+            String line = br.readLine();
+            while (line != null) {
+                bw.write(line);
+                line = br.readLine();
+            }
+            bw.flush();
+            return file;
+        } catch (Exception e) {
+            throw new RuntimeException("dowload file error " + dirpperDir + "/" + remoteFileName, e);
+
+        } finally {
+            if (bw != null) {
+                try {
+                    bw.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+            if (br != null) {
+                try {
+                    br.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+    }
+
+    @Override
+    public Boolean upload(File file, String remoteFileName) {
+        MinioClient minioClient = getOrCreateMinioClient();
+        try {
+            minioClient.putObject(dirpperDir, remoteFileName, file.getAbsolutePath());
+        } catch (Exception e) {
+            throw new RuntimeException("upload file error " + dirpperDir + "/" + remoteFileName, e);
+        }
+        return true;
+    }
+
+    @Override
+    public boolean delete(String remoteFileName) {
+        MinioClient minioClient = getOrCreateMinioClient();
+        try {
+            minioClient.removeObject(dirpperDir, remoteFileName);
+        } catch (Exception e) {
+            throw new RuntimeException("delete file error " + dirpperDir + "/" + remoteFileName, e);
+        }
+        return true;
+    }
+
+    protected MinioClient getOrCreateMinioClient() {
+        if (this.minioClient == null) {
+            synchronized (this) {
+                if (minioClient == null) {
+                    try {
+                        MinioClient minioClient = new MinioClient(endpoint, ak, sk);
+                        boolean existDir = minioClient.bucketExists(dirpperDir);
+
+                        if (!existDir) {
+                            minioClient.makeBucket(dirpperDir);
+                        }
+                        this.minioClient = minioClient;
+                    } catch (Exception e) {
+                        throw new RuntimeException("create minio client error", e);
+                    }
+
+                }
+            }
+        }
+        return this.minioClient;
+    }
+}
+
+
diff --git a/rocketmq-streams-transport-minio/src/test/java/com/aliyun/yundun/dipper/configurable/DataTpyeTest.java b/rocketmq-streams-transport-minio/src/test/java/com/aliyun/yundun/dipper/configurable/DataTpyeTest.java
new file mode 100644
index 0000000..a3ebf6b
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/test/java/com/aliyun/yundun/dipper/configurable/DataTpyeTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.aliyun.yundun.dipper.configurable;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.configurable.model.Person;
+
+public class DataTpyeTest {
+    @Test
+    public void testDataType() {
+        Person person = Person.createPerson("com.dipper.test");
+        String jsonValue = person.toJson();
+
+        Person person1 = new Person();
+        person1.toObject(jsonValue);
+        System.out.println(person1);
+    }
+
+    @Test
+    public void testV2() {
+        Set<String> set = new HashSet<>();
+        set.add("北斗");
+        set.add("福建jz");
+        set.add("甘肃jz");
+        set.add("广东省气象micaps云");
+        set.add("贵州公安科信");
+        set.add("贵州警务云");
+        set.add("杭州税友");
+        set.add("江西公安大数据平台");
+        set.add("昆仑项目");
+        set.add("新华网");
+        set.add("浙江气象高时空分辨率气象预报专有云");
+        List<String> v2 = FileUtil.loadFileLine("/Users/yuanxiaodong/Documents/workdir/项目名称.txt");
+        List<String> zyy = FileUtil.loadFileLine("/Users/yuanxiaodong/Documents/workdir/专有云.txt");
+        int count = 0;
+        for (String v2Line : v2) {
+            boolean match = false;
+            for (String zyyLine : zyy) {
+                if (zyyLine.indexOf(v2Line) != -1 || v2Line.indexOf(zyyLine) != -1) {
+                    match = true;
+                    count++;
+                }
+            }
+            if (match == false) {
+                System.out.println(v2Line);
+            }
+        }
+        System.out.println(count);
+    }
+}
diff --git a/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/DataTpyeTest.java b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/DataTpyeTest.java
new file mode 100644
index 0000000..a1570c3
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/DataTpyeTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.model;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.junit.Test;
+
+public class DataTpyeTest {
+    @Test
+    public void testDataType() {
+        Person person = Person.createPerson("com.dipper.test");
+        String jsonValue = person.toJson();
+
+        Person person1 = new Person();
+        person1.toObject(jsonValue);
+        System.out.println(person1);
+    }
+
+    @Test
+    public void testV2() {
+        Set<String> set = new HashSet<>();
+        set.add("北斗");
+        set.add("福建jz");
+        set.add("甘肃jz");
+        set.add("广东省气象micaps云");
+        set.add("贵州公安科信");
+        set.add("贵州警务云");
+        set.add("杭州税友");
+        set.add("江西公安大数据平台");
+        set.add("昆仑项目");
+        set.add("新华网");
+        set.add("浙江气象高时空分辨率气象预报专有云");
+        List<String> v2 = FileUtil.loadFileLine("/Users/yuanxiaodong/Documents/workdir/项目名称.txt");
+        List<String> zyy = FileUtil.loadFileLine("/Users/yuanxiaodong/Documents/workdir/专有云.txt");
+        int count = 0;
+        for (String v2Line : v2) {
+            boolean match = false;
+            for (String zyyLine : zyy) {
+                if (zyyLine.indexOf(v2Line) != -1 || v2Line.indexOf(zyyLine) != -1) {
+                    match = true;
+                    count++;
+                }
+            }
+            if (match == false) {
+                System.out.println(v2Line);
+            }
+        }
+        System.out.println(count);
+    }
+}
diff --git a/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/Person.java b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/Person.java
new file mode 100644
index 0000000..04b99bb
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/Person.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.model;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+
+public class Person extends BasedConfigurable {
+    private String name;
+    private int age;
+    private Boolean isMale;
+    private List<String> addresses;
+    private Map<String, Integer> childName2Age;
+
+    public static Person createPerson(String namespace) {
+        Person person = new Person();
+        person.setNameSpace(namespace);
+        person.setType("person");
+        person.setConfigureName("Chris");
+        person.setName("Chris");
+        List<String> addresses = new ArrayList<>();
+        addresses.add("huilongguan");
+        addresses.add("shangdi");
+        person.setAddresses(addresses);
+        Map<String, Integer> childName2Age = new HashMap<>();
+        childName2Age.put("yuanyahan", 8);
+        childName2Age.put("yuanruxi", 4);
+        person.setChildName2Age(childName2Age);
+        person.setMale(true);
+        person.setAge(18);
+        return person;
+    }
+
+    @Override
+    public String toString() {
+        return "Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses
+            + ", childName2Age=" + childName2Age + '}';
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public int getAge() {
+        return age;
+    }
+
+    public void setAge(int age) {
+        this.age = age;
+    }
+
+    public Boolean getMale() {
+        return isMale;
+    }
+
+    public void setMale(Boolean male) {
+        isMale = male;
+    }
+
+    public List<String> getAddresses() {
+        return addresses;
+    }
+
+    public void setAddresses(List<String> addresses) {
+        this.addresses = addresses;
+    }
+
+    public Map<String, Integer> getChildName2Age() {
+        return childName2Age;
+    }
+
+    public void setChildName2Age(Map<String, Integer> childName2Age) {
+        this.childName2Age = childName2Age;
+    }
+}
diff --git a/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configurable/model/Person.java b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configurable/model/Person.java
new file mode 100644
index 0000000..709978a
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configurable/model/Person.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.model;
+
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Person extends BasedConfigurable {
+    private String name;
+    private int age;
+    private Boolean isMale;
+    private List<String> addresses;
+    private Map<String, Integer> childName2Age;
+
+    public static Person createPerson(String namespace) {
+        Person person = new Person();
+        person.setNameSpace(namespace);
+        person.setType("person");
+        person.setConfigureName("Chris");
+        person.setName("Chris");
+        List<String> addresses = new ArrayList<>();
+        addresses.add("huilongguan");
+        addresses.add("shangdi");
+        person.setAddresses(addresses);
+        Map<String, Integer> childName2Age = new HashMap<>();
+        childName2Age.put("yuanyahan", 8);
+        childName2Age.put("yuanruxi", 4);
+        person.setChildName2Age(childName2Age);
+        person.setMale(true);
+        person.setAge(18);
+        return person;
+    }
+
+    @Override
+    public String toString() {
+        return "Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses
+            + ", childName2Age=" + childName2Age + '}';
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public int getAge() {
+        return age;
+    }
+
+    public void setAge(int age) {
+        this.age = age;
+    }
+
+    public Boolean getMale() {
+        return isMale;
+    }
+
+    public void setMale(Boolean male) {
+        isMale = male;
+    }
+
+    public List<String> getAddresses() {
+        return addresses;
+    }
+
+    public void setAddresses(List<String> addresses) {
+        this.addresses = addresses;
+    }
+
+    public Map<String, Integer> getChildName2Age() {
+        return childName2Age;
+    }
+
+    public void setChildName2Age(Map<String, Integer> childName2Age) {
+        this.childName2Age = childName2Age;
+    }
+}
diff --git a/rocketmq-streams-transport-minio/src/test/resources/component/ConfigurableComponent.properties b/rocketmq-streams-transport-minio/src/test/resources/component/ConfigurableComponent.properties
new file mode 100644
index 0000000..598511e
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/test/resources/component/ConfigurableComponent.properties
@@ -0,0 +1,7 @@
+dipper.configurable.service.type=resource_support_parent
+dipper.channle.ak=xxxxxx
+dipper.channle.sk=xxxxxx
+dipper.rds.jdbc.driver=com.mysql.jdbc.Driver
+dipper.rds.jdbc.url=xxxxxxx
+dipper.rds.jdbc.username=xxxxxx
+dipper.rds.jdbc.password=xxxxx
\ No newline at end of file
diff --git a/rocketmq-streams-transport-minio/src/test/resources/log4j.xml b/rocketmq-streams-transport-minio/src/test/resources/log4j.xml
new file mode 100755
index 0000000..7812fe7
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/test/resources/log4j.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "http://toolkit.alibaba-inc.com/dtd/log4j/log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+    <appender name="Console" class="org.apache.log4j.ConsoleAppender">
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d{ISO8601} %l [%t] %-5p - %m%n%n"/>
+        </layout>
+        <filter class="org.apache.log4j.varia.LevelRangeFilter">
+            <param name="LevelMin" value="INFO"/>
+            <param name="LevelMax" value="ERROR"/>
+        </filter>
+    </appender>
+
+    <root>
+        <priority value="INFO"/>
+        <appender-ref ref="Console"/>
+    </root>
+
+</log4j:configuration>
\ No newline at end of file
diff --git a/rocketmq-streams-transport-minio/src/test/resources/pro-function.txt b/rocketmq-streams-transport-minio/src/test/resources/pro-function.txt
new file mode 100644
index 0000000..34a186f
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/test/resources/pro-function.txt
@@ -0,0 +1,11 @@
+paserBySplit(@,uuid,file_path,pid,ppid,pfile_path,group_name,group_id,user_name,uid,euid,egroup_id,time,cmd_line,index,perm,tty,pcmdline,sid,cwd,filename);
+addRandom(messageId,10);
+rename(groupname,group_name);
+rename(username,user_name);
+rename(seq,index);
+rename(egourpid,egroup_id);
+rename(filepath,file_path);
+rename(groupid,group_id);
+rename(pfilename,pfile_name);
+rename(safe_mode,perm);
+rename(cmdline,cmd_line);
\ No newline at end of file
diff --git a/rocketmq-streams-transport-minio/src/test/resources/python_script.py b/rocketmq-streams-transport-minio/src/test/resources/python_script.py
new file mode 100644
index 0000000..f4e7252
--- /dev/null
+++ b/rocketmq-streams-transport-minio/src/test/resources/python_script.py
@@ -0,0 +1,22 @@
+#!/usr/bin/env python
+# coding=utf-8
+import json;
+import re;
+import time;
+regex = '^/(.*)/\w+'
+pattern = re.compile(regex)
+
+def pythonTest(*processLine):
+    try:
+        jsonObject = json.loads(processLine[0])
+
+        if jsonObject.has_key('filepath'):
+            filePath = jsonObject['filepath']
+            match = pattern.search(filePath)
+            if match:
+                return match.group(1)
+        else:
+            pass # print "does not has key filepath"
+    except BaseException as e:
+        pass # print "process one line cause exception %s" %e
+    return "does not match"