You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/08/02 10:55:17 UTC

[rocketmq-streams] 04/15: add channel-db module

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git

commit ed97ca3daee8bd3e71c4c5150a54381867a07756
Author: vv <ze...@alibaba-inc.com>
AuthorDate: Mon Aug 2 12:03:59 2021 +0800

    add channel-db module
---
 rocketmq-streams-channel-db/pom.xml                |  21 ++
 .../streams/db/sink/AbstractMultiTableSink.java    | 150 +++++++++++++
 .../apache/rocketmq/streams/db/sink/DBSink.java    | 239 +++++++++++++++++++++
 .../rocketmq/streams/db/sink/DBSinkBuilder.java    |  76 +++++++
 .../streams/db/sink/SelfMultiTableSink.java        |  53 +++++
 .../streams/db/sink/SplitBySerialNumber.java       |  36 ++++
 .../streams/db/sink/SplitByTimeMultiTableSink.java |  36 ++++
 .../streams/db/sink/db/DBWriteOnlyChannelTest.java |  84 ++++++++
 8 files changed, 695 insertions(+)

diff --git a/rocketmq-streams-channel-db/pom.xml b/rocketmq-streams-channel-db/pom.xml
new file mode 100755
index 0000000..10d760e
--- /dev/null
+++ b/rocketmq-streams-channel-db/pom.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="utf-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.rocketmq</groupId>
+        <artifactId>rocketmq-streams</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>rocketmq-streams-channel-db</artifactId>
+    <name>ROCKETMQ STREAMS :: channel-db</name>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-streams-db-operator</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/AbstractMultiTableSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/AbstractMultiTableSink.java
new file mode 100644
index 0000000..6547615
--- /dev/null
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/AbstractMultiTableSink.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.sink;
+
+import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
+import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.functions.MultiTableSplitFunction;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public abstract class AbstractMultiTableSink extends DBSink {
+    protected transient ConcurrentHashMap<String, DBSink> tableSinks = new ConcurrentHashMap();
+    protected transient AtomicLong messageCount = new AtomicLong(0);
+    protected transient MultiTableSplitFunction<IMessage> multiTableSplitFunction;
+
+    public AbstractMultiTableSink(String url, String userName, String password) {
+        this.url = url;
+        this.userName = userName;
+        this.password = password;
+    }
+
+    @Override
+    public boolean batchAdd(IMessage message, ISplit split) {
+
+        DBSink sink = getOrCreateDBSink(split.getQueueId());
+        boolean success = sink.batchAdd(message, split);
+        long count = messageCount.incrementAndGet();
+        if (count >= getBatchSize()) {
+            Set<String> queueIds = new HashSet<>();
+            queueIds.add(split.getQueueId());
+            flush(queueIds);
+        }
+        return success;
+    }
+
+    @Override
+    public boolean batchAdd(IMessage message) {
+        ISplit split = getSplitFromMessage(message);
+        return batchAdd(message, split);
+    }
+
+    @Override
+    public boolean batchSave(List<IMessage> messages) {
+        throw new RuntimeException("can not support this method");
+    }
+
+    @Override
+    public boolean flush(Set<String> splitIds) {
+        if (splitIds == null) {
+            return true;
+        }
+        for (String splitId : splitIds) {
+            DBSink sink = getOrCreateDBSink(splitId);
+            sink.flush();
+        }
+        return true;
+    }
+
+    @Override
+    public boolean flush() {
+        for (DBSink dbSink : tableSinks.values()) {
+            dbSink.flush();
+        }
+        return true;
+    }
+
+    @Override
+    public void openAutoFlush() {
+        for (DBSink dbSink : tableSinks.values()) {
+            dbSink.openAutoFlush();
+        }
+    }
+
+    @Override
+    public void closeAutoFlush() {
+        for (DBSink dbSink : tableSinks.values()) {
+            dbSink.closeAutoFlush();
+        }
+    }
+
+    protected DBSink getOrCreateDBSink(String splitId) {
+        DBSink sink = this.tableSinks.get(splitId);
+        if (sink != null) {
+            return sink;
+        }
+        sink = new DBSink();
+        sink.setUrl(url);
+        sink.setPassword(password);
+        sink.setUserName(userName);
+        sink.setTableName(createTableName(splitId));
+        sink.openAutoFlush();
+        sink.setBatchSize(batchSize);
+        sink.setJdbcDriver(this.jdbcDriver);
+        sink.setMessageCache(new SingleDBSinkCache(sink));
+        sink.init();
+        DBSink existDBSink = this.tableSinks.putIfAbsent(splitId, sink);
+        if (existDBSink != null) {
+            return existDBSink;
+        }
+
+        return sink;
+    }
+
+    protected abstract String createTableName(String splitId);
+
+    protected abstract ISplit getSplitFromMessage(IMessage message);
+
+    protected class SingleDBSinkCache extends MessageCache<IMessage> {
+
+        public SingleDBSinkCache(
+            IMessageFlushCallBack<IMessage> flushCallBack) {
+            super(flushCallBack);
+        }
+
+        @Override
+        public int flush(Set<String> splitIds) {
+            int size = super.flush(splitIds);
+            messageCount.addAndGet(-size);
+            return size;
+        }
+
+        @Override
+        public int flush() {
+            int size = super.flush();
+            messageCount.addAndGet(-size);
+            return size;
+        }
+    }
+
+}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
new file mode 100644
index 0000000..5732424
--- /dev/null
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.sink;
+
+import com.alibaba.fastjson.JSONObject;
+
+import org.apache.rocketmq.streams.common.channel.IChannel;
+import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.metadata.MetaData;
+import org.apache.rocketmq.streams.common.utils.SQLUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.rocketmq.streams.db.driver.DriverBuilder;
+import org.apache.rocketmq.streams.db.driver.JDBCDriver;
+
+import java.sql.*;
+import java.util.List;
+
+/**
+ * 主要用于写db,输入可以是一个insert/replace 模版,也可以是metadata对象,二者选一即可。都支持批量插入,提高吞吐 sql 模版:insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}') MetaData:主要是描述每个字段的类型,是否必须 二者选一个即可。sql模式,系统会把一批(batchSize)数据拼成一个大sql。metadata模式,基于字段描述,最终也是拼成一个大sql
+ */
+public class DBSink extends AbstractSink {
+
+    protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')
+
+    protected MetaData metaData;//可以指定meta data,和insertSQL二选一
+
+    protected String tableName; //指定要插入的数据表
+
+    @ENVDependence
+    protected String jdbcDriver = AbstractComponent.DEFAULT_JDBC_DRIVER;
+    @ENVDependence
+    protected String url;
+    @ENVDependence
+    protected String userName;
+    @ENVDependence
+    protected String password;
+
+    /**
+     * db串多数是名字,可以取个名字前缀,如果值为空,默认为此类的name,name为空,默认为简单类名
+     *
+     * @param insertSQL        sql模版
+     * @param dbInfoNamePrefix 参数可以是名字,这个是名字前缀.真实值可以配置在配置文件中
+     */
+    public DBSink(String insertSQL, String dbInfoNamePrefix) {
+        setType(IChannel.TYPE);
+        if (StringUtil.isEmpty(dbInfoNamePrefix)) {
+            dbInfoNamePrefix = getConfigureName();
+        }
+        if (StringUtil.isEmpty(dbInfoNamePrefix)) {
+            dbInfoNamePrefix = this.getClass().getSimpleName();
+        }
+        this.insertSQLTemplate = insertSQL;
+        this.url = dbInfoNamePrefix + ".url";
+        this.password = dbInfoNamePrefix + ".password";
+        this.userName = dbInfoNamePrefix + ".userName";
+    }
+
+    public DBSink() {
+        setType(IChannel.TYPE);
+    }
+
+    public DBSink(String url, String userName, String password) {
+        setType(IChannel.TYPE);
+        this.url = url;
+        this.userName = userName;
+        this.password = password;
+    }
+
+    public DBSink(String insertSQL, String url, String userName, String password) {
+        setType(IChannel.TYPE);
+        this.url = url;
+        this.userName = userName;
+        this.password = password;
+        this.insertSQLTemplate = insertSQL;
+    }
+
+    @Override
+    protected boolean initConfigurable() {
+        try {
+            Class.forName("com.mysql.jdbc.Driver");
+            if (StringUtil.isNotEmpty(this.tableName)) {
+                Connection connection = DriverManager.getConnection(url, userName, password);
+                DatabaseMetaData metaData = connection.getMetaData();
+                ResultSet metaResult = metaData.getColumns(connection.getCatalog(), "%", this.tableName, null);
+                this.metaData = MetaData.createMetaData(metaResult);
+                this.metaData.setTableName(this.tableName);
+            }
+            return super.initConfigurable();
+        } catch (ClassNotFoundException | SQLException e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+    @Override
+    protected boolean batchInsert(List<IMessage> messageList) {
+        JDBCDriver dbDataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password);
+        try {
+            if (messageList == null || messageList.size() == 0) {
+                return true;
+            }
+            List<JSONObject> messages = convertJsonObjectFromMessage(messageList);
+            if (StringUtil.isEmpty(insertSQLTemplate) && metaData != null) {
+                String sql = SQLUtil.createInsertSql(metaData, messages.get(0));
+                sql = sql + SQLUtil.createInsertValuesSQL(metaData, messages.subList(1, messages.size()));
+                executeSQL(dbDataSource, sql);
+                return true;
+            }
+            String insertValueSQL = parseInsertValues(insertSQLTemplate);
+            if (StringUtil.isEmpty(insertValueSQL) || insertSQLTemplate.replace(insertValueSQL, "").contains("#{")) {
+                for (JSONObject message : messages) {
+                    String sql = parseSQL(message, insertSQLTemplate);
+                    executeSQL(dbDataSource, sql);
+                }
+                return true;
+            } else {
+                StringBuilder sb = new StringBuilder();
+                String insertSQL;
+                boolean isFirst = true;
+                int i = 0;
+                for (JSONObject message : messages) {
+                    insertSQL = parseSQL(message, insertValueSQL);
+                    if (isFirst) {
+                        isFirst = false;
+                    } else {
+                        sb.append(",");
+                    }
+                    i++;
+
+                    sb.append(insertSQL);
+                }
+                insertSQL = this.insertSQLTemplate.replace(insertValueSQL, sb.toString());
+                executeSQL(dbDataSource, insertSQL);
+                return true;
+            }
+        } finally {
+            dbDataSource.destroy();
+        }
+    }
+
+    protected void executeSQL(JDBCDriver dbDataSource, String sql) {
+        dbDataSource.execute(sql);
+    }
+
+    /**
+     * 解析出insert value数据部分,对于批量的插入,效果会更佳
+     */
+    private static final String VALUES_NAME = "values";
+
+    protected String parseInsertValues(String insertSQL) {
+        int start = insertSQL.toLowerCase().indexOf(VALUES_NAME);
+        if (start == -1) {
+            return null;
+        }
+        String valuesSQL = insertSQL.substring(start + VALUES_NAME.length());
+        int end = valuesSQL.toLowerCase().lastIndexOf(")");
+        if (end == -1) {
+            return null;
+        }
+        return valuesSQL.substring(0, end + 1);
+    }
+
+    protected String parseSQL(JSONObject message, String sql) {
+        return SQLUtil.parseIbatisSQL(message, sql);
+    }
+
+    public String getInsertSQLTemplate() {
+        return insertSQLTemplate;
+    }
+
+    public void setInsertSQLTemplate(String insertSQLTemplate) {
+        this.insertSQLTemplate = insertSQLTemplate;
+    }
+
+    public String getJdbcDriver() {
+        return jdbcDriver;
+    }
+
+    public void setJdbcDriver(String jdbcDriver) {
+        this.jdbcDriver = jdbcDriver;
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public MetaData getMetaData() {
+        return metaData;
+    }
+
+    public void setMetaData(MetaData metaData) {
+        this.metaData = metaData;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
new file mode 100644
index 0000000..ef7ae28
--- /dev/null
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.sink;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
+import org.apache.rocketmq.streams.common.channel.sink.ISink;
+import org.apache.rocketmq.streams.common.channel.source.ISource;
+import org.apache.rocketmq.streams.common.model.ServiceName;
+import org.apache.rocketmq.streams.common.metadata.MetaData;
+import org.apache.rocketmq.streams.common.metadata.MetaDataField;
+import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
+import com.google.auto.service.AutoService;
+
+@AutoService(IChannelBuilder.class)
+@ServiceName(DBSinkBuilder.TYPE)
+public class DBSinkBuilder implements IChannelBuilder {
+    public static final String TYPE = "rds";
+
+    @Override
+    public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
+        DBSink sink = new DBSink();
+        sink.setUrl(properties.getProperty("url"));
+        sink.setUserName("userName");
+        sink.setPassword("password");
+        List<MetaDataField> fieldList = metaData.getMetaDataFields();
+        StringBuilder insertSQL = new StringBuilder();
+        StringBuilder insertValueSQL = new StringBuilder();
+        boolean isFirst = true;
+        for (MetaDataField field : fieldList) {
+            String fieldName = field.getFieldName();
+            if (isFirst) {
+                isFirst = false;
+            } else {
+                insertSQL.append(",");
+                insertValueSQL.append(",");
+            }
+            insertSQL.append(fieldName);
+            if (DataTypeUtil.isNumber(field.getDataType())) {
+                insertValueSQL.append(fieldName);
+            } else {
+                insertValueSQL.append("'#{" + fieldName + "}'");
+            }
+        }
+        String sql = "insert into " + properties.getProperty("tableName") + "(" + insertSQL.toString() + ")values(" + insertValueSQL.toString() + ")";
+        sink.setInsertSQLTemplate(sql);
+        return sink;
+    }
+
+    @Override
+    public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
+        throw new RuntimeException("can not support this method");
+    }
+
+    @Override
+    public String getType() {
+        return TYPE;
+    }
+
+}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java
new file mode 100644
index 0000000..96922e6
--- /dev/null
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.sink;
+
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.functions.MultiTableSplitFunction;
+import org.apache.rocketmq.streams.common.utils.Base64Utils;
+import org.apache.rocketmq.streams.common.utils.InstantiationUtil;
+
+public class SelfMultiTableSink extends AbstractMultiTableSink implements IAfterConfiguableRefreshListerner {
+    protected String multiTableSplitFunctionSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码
+    protected transient MultiTableSplitFunction<IMessage> multiTableSplitFunction;
+
+    public SelfMultiTableSink(String url, String userName, String password, MultiTableSplitFunction<IMessage> multiTableSplitFunction) {
+        super(url, userName, password);
+        this.multiTableSplitFunction = multiTableSplitFunction;
+        byte[] bytes = InstantiationUtil.serializeObject(multiTableSplitFunction);
+        multiTableSplitFunctionSerializeValue = Base64Utils.encode(bytes);
+    }
+
+    @Override
+    protected String createTableName(String splitId) {
+        return multiTableSplitFunction.createTableFromSplitId(splitId);
+    }
+
+    @Override
+    protected ISplit getSplitFromMessage(IMessage message) {
+        return multiTableSplitFunction.createSplit(message);
+    }
+
+    @Override
+    public void doProcessAfterRefreshConfigurable(IConfigurableService configurableService) {
+        byte[] bytes = Base64Utils.decode(multiTableSplitFunctionSerializeValue);
+        this.multiTableSplitFunction = InstantiationUtil.deserializeObject(bytes);
+    }
+}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitBySerialNumber.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitBySerialNumber.java
new file mode 100644
index 0000000..c2a49b7
--- /dev/null
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitBySerialNumber.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.sink;
+
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.context.IMessage;
+
+public class SplitBySerialNumber extends AbstractMultiTableSink {
+    public SplitBySerialNumber(String url, String userName, String password) {
+        super(url, userName, password);
+    }
+
+    @Override
+    protected String createTableName(String splitId) {
+        return null;
+    }
+
+    @Override
+    protected ISplit getSplitFromMessage(IMessage message) {
+        return null;
+    }
+}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitByTimeMultiTableSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitByTimeMultiTableSink.java
new file mode 100644
index 0000000..87a2b3e
--- /dev/null
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitByTimeMultiTableSink.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.sink;
+
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.context.IMessage;
+
+public class SplitByTimeMultiTableSink extends AbstractMultiTableSink {
+    public SplitByTimeMultiTableSink(String url, String userName, String password) {
+        super(url, userName, password);
+    }
+
+    @Override
+    protected String createTableName(String splitId) {
+        return null;
+    }
+
+    @Override
+    protected ISplit getSplitFromMessage(IMessage message) {
+        return null;
+    }
+}
diff --git a/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/DBWriteOnlyChannelTest.java b/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/DBWriteOnlyChannelTest.java
new file mode 100644
index 0000000..c14bbc3
--- /dev/null
+++ b/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/DBWriteOnlyChannelTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.sink.db;
+
+import com.alibaba.fastjson.JSONObject;
+
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.context.Message;
+import org.apache.rocketmq.streams.common.metadata.MetaData;
+import org.apache.rocketmq.streams.db.driver.JDBCDriver;
+import org.apache.rocketmq.streams.db.sink.DBSink;
+import org.junit.Test;
+
+public class DBWriteOnlyChannelTest {
+
+    private String URL = "jdbc:mysql://XXXXX:3306/yundun_soc?useUnicode=true&characterEncoding=utf8&autoReconnect=true";
+    protected String USER_NAME = "XXXX";
+    protected String PASSWORD = "XXXX";
+
+    @Test
+    public void testOutputBySQL() {
+        String sql = "insert into table(name,age) values('#{name}',#{age})";
+        DBSink sink = new DBSink(sql, URL, USER_NAME, PASSWORD) {
+
+            /**
+             * 因为不是真实表,会报错,把执行sql,改成打印sql
+             */
+            @Override
+            protected void executeSQL(JDBCDriver dbDataSource, String sql) {
+                System.out.println(sql);
+            }
+        };
+        for (int i = 0; i < 10; i++) {
+            JSONObject msg = new JSONObject();
+            msg.put("name", "chris" + i);
+            msg.put("age", i);
+            IMessage message = new Message(msg);
+            sink.batchAdd(message);
+        }
+        sink.flush();
+    }
+
+    @Test
+    public void testOutputByMetaData() {
+        DBSink sink = new DBSink() {
+            /**
+             * 因为不是真实表,会报错,把执行sql,改成打印sql
+             */
+            @Override
+            protected void executeSQL(JDBCDriver dbDataSource, String sql) {
+                System.out.println(sql);
+            }
+        };
+        JSONObject msg = new JSONObject();
+        msg.put("name", "chris");
+        msg.put("age", 18);
+        MetaData metaData = MetaData.createMetaData(msg);
+        metaData.setTableName("tableName");
+        sink.setMetaData(metaData);
+        for (int i = 0; i < 10; i++) {
+            msg = new JSONObject();
+            msg.put("name", "chris" + i);
+            msg.put("age", i);
+            IMessage message = new Message(msg);
+            sink.batchAdd(message);
+        }
+        sink.flush();
+    }
+
+}