You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/05 12:54:04 UTC
[rocketmq-streams] 06/46: add channel-db module
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit ed97ca3daee8bd3e71c4c5150a54381867a07756
Author: vv <ze...@alibaba-inc.com>
AuthorDate: Mon Aug 2 12:03:59 2021 +0800
add channel-db module
---
rocketmq-streams-channel-db/pom.xml | 21 ++
.../streams/db/sink/AbstractMultiTableSink.java | 150 +++++++++++++
.../apache/rocketmq/streams/db/sink/DBSink.java | 239 +++++++++++++++++++++
.../rocketmq/streams/db/sink/DBSinkBuilder.java | 76 +++++++
.../streams/db/sink/SelfMultiTableSink.java | 53 +++++
.../streams/db/sink/SplitBySerialNumber.java | 36 ++++
.../streams/db/sink/SplitByTimeMultiTableSink.java | 36 ++++
.../streams/db/sink/db/DBWriteOnlyChannelTest.java | 84 ++++++++
8 files changed, 695 insertions(+)
diff --git a/rocketmq-streams-channel-db/pom.xml b/rocketmq-streams-channel-db/pom.xml
new file mode 100755
index 0000000..10d760e
--- /dev/null
+++ b/rocketmq-streams-channel-db/pom.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="utf-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>rocketmq-streams-channel-db</artifactId>
+ <name>ROCKETMQ STREAMS :: channel-db</name>
+ <packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-db-operator</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/AbstractMultiTableSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/AbstractMultiTableSink.java
new file mode 100644
index 0000000..6547615
--- /dev/null
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/AbstractMultiTableSink.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.sink;
+
+import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
+import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.functions.MultiTableSplitFunction;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public abstract class AbstractMultiTableSink extends DBSink {
+ protected transient ConcurrentHashMap<String, DBSink> tableSinks = new ConcurrentHashMap();
+ protected transient AtomicLong messageCount = new AtomicLong(0);
+ protected transient MultiTableSplitFunction<IMessage> multiTableSplitFunction;
+
+ public AbstractMultiTableSink(String url, String userName, String password) {
+ this.url = url;
+ this.userName = userName;
+ this.password = password;
+ }
+
+ @Override
+ public boolean batchAdd(IMessage message, ISplit split) {
+
+ DBSink sink = getOrCreateDBSink(split.getQueueId());
+ boolean success = sink.batchAdd(message, split);
+ long count = messageCount.incrementAndGet();
+ if (count >= getBatchSize()) {
+ Set<String> queueIds = new HashSet<>();
+ queueIds.add(split.getQueueId());
+ flush(queueIds);
+ }
+ return success;
+ }
+
+ @Override
+ public boolean batchAdd(IMessage message) {
+ ISplit split = getSplitFromMessage(message);
+ return batchAdd(message, split);
+ }
+
+ @Override
+ public boolean batchSave(List<IMessage> messages) {
+ throw new RuntimeException("can not support this method");
+ }
+
+ @Override
+ public boolean flush(Set<String> splitIds) {
+ if (splitIds == null) {
+ return true;
+ }
+ for (String splitId : splitIds) {
+ DBSink sink = getOrCreateDBSink(splitId);
+ sink.flush();
+ }
+ return true;
+ }
+
+ @Override
+ public boolean flush() {
+ for (DBSink dbSink : tableSinks.values()) {
+ dbSink.flush();
+ }
+ return true;
+ }
+
+ @Override
+ public void openAutoFlush() {
+ for (DBSink dbSink : tableSinks.values()) {
+ dbSink.openAutoFlush();
+ }
+ }
+
+ @Override
+ public void closeAutoFlush() {
+ for (DBSink dbSink : tableSinks.values()) {
+ dbSink.closeAutoFlush();
+ }
+ }
+
+ protected DBSink getOrCreateDBSink(String splitId) {
+ DBSink sink = this.tableSinks.get(splitId);
+ if (sink != null) {
+ return sink;
+ }
+ sink = new DBSink();
+ sink.setUrl(url);
+ sink.setPassword(password);
+ sink.setUserName(userName);
+ sink.setTableName(createTableName(splitId));
+ sink.openAutoFlush();
+ sink.setBatchSize(batchSize);
+ sink.setJdbcDriver(this.jdbcDriver);
+ sink.setMessageCache(new SingleDBSinkCache(sink));
+ sink.init();
+ DBSink existDBSink = this.tableSinks.putIfAbsent(splitId, sink);
+ if (existDBSink != null) {
+ return existDBSink;
+ }
+
+ return sink;
+ }
+
+ protected abstract String createTableName(String splitId);
+
+ protected abstract ISplit getSplitFromMessage(IMessage message);
+
+ protected class SingleDBSinkCache extends MessageCache<IMessage> {
+
+ public SingleDBSinkCache(
+ IMessageFlushCallBack<IMessage> flushCallBack) {
+ super(flushCallBack);
+ }
+
+ @Override
+ public int flush(Set<String> splitIds) {
+ int size = super.flush(splitIds);
+ messageCount.addAndGet(-size);
+ return size;
+ }
+
+ @Override
+ public int flush() {
+ int size = super.flush();
+ messageCount.addAndGet(-size);
+ return size;
+ }
+ }
+
+}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
new file mode 100644
index 0000000..5732424
--- /dev/null
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.sink;
+
+import com.alibaba.fastjson.JSONObject;
+
+import org.apache.rocketmq.streams.common.channel.IChannel;
+import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.metadata.MetaData;
+import org.apache.rocketmq.streams.common.utils.SQLUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.rocketmq.streams.db.driver.DriverBuilder;
+import org.apache.rocketmq.streams.db.driver.JDBCDriver;
+
+import java.sql.*;
+import java.util.List;
+
+/**
+ * 主要用于写db,输入可以是一个insert/replace 模版,也可以是metadata对象,二者选一即可。都支持批量插入,提高吞吐 sql 模版:insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}') MetaData:主要是描述每个字段的类型,是否必须 二者选一个即可。sql模式,系统会把一批(batchSize)数据拼成一个大sql。metadata模式,基于字段描述,最终也是拼成一个大sql
+ */
+public class DBSink extends AbstractSink {
+
+ protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')
+
+ protected MetaData metaData;//可以指定meta data,和insertSQL二选一
+
+ protected String tableName; //指定要插入的数据表
+
+ @ENVDependence
+ protected String jdbcDriver = AbstractComponent.DEFAULT_JDBC_DRIVER;
+ @ENVDependence
+ protected String url;
+ @ENVDependence
+ protected String userName;
+ @ENVDependence
+ protected String password;
+
+ /**
+ * db串多数是名字,可以取个名字前缀,如果值为空,默认为此类的name,name为空,默认为简单类名
+ *
+ * @param insertSQL sql模版
+ * @param dbInfoNamePrefix 参数可以是名字,这个是名字前缀.真实值可以配置在配置文件中
+ */
+ public DBSink(String insertSQL, String dbInfoNamePrefix) {
+ setType(IChannel.TYPE);
+ if (StringUtil.isEmpty(dbInfoNamePrefix)) {
+ dbInfoNamePrefix = getConfigureName();
+ }
+ if (StringUtil.isEmpty(dbInfoNamePrefix)) {
+ dbInfoNamePrefix = this.getClass().getSimpleName();
+ }
+ this.insertSQLTemplate = insertSQL;
+ this.url = dbInfoNamePrefix + ".url";
+ this.password = dbInfoNamePrefix + ".password";
+ this.userName = dbInfoNamePrefix + ".userName";
+ }
+
+ public DBSink() {
+ setType(IChannel.TYPE);
+ }
+
+ public DBSink(String url, String userName, String password) {
+ setType(IChannel.TYPE);
+ this.url = url;
+ this.userName = userName;
+ this.password = password;
+ }
+
+ public DBSink(String insertSQL, String url, String userName, String password) {
+ setType(IChannel.TYPE);
+ this.url = url;
+ this.userName = userName;
+ this.password = password;
+ this.insertSQLTemplate = insertSQL;
+ }
+
+ @Override
+ protected boolean initConfigurable() {
+ try {
+ Class.forName("com.mysql.jdbc.Driver");
+ if (StringUtil.isNotEmpty(this.tableName)) {
+ Connection connection = DriverManager.getConnection(url, userName, password);
+ DatabaseMetaData metaData = connection.getMetaData();
+ ResultSet metaResult = metaData.getColumns(connection.getCatalog(), "%", this.tableName, null);
+ this.metaData = MetaData.createMetaData(metaResult);
+ this.metaData.setTableName(this.tableName);
+ }
+ return super.initConfigurable();
+ } catch (ClassNotFoundException | SQLException e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+ @Override
+ protected boolean batchInsert(List<IMessage> messageList) {
+ JDBCDriver dbDataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password);
+ try {
+ if (messageList == null || messageList.size() == 0) {
+ return true;
+ }
+ List<JSONObject> messages = convertJsonObjectFromMessage(messageList);
+ if (StringUtil.isEmpty(insertSQLTemplate) && metaData != null) {
+ String sql = SQLUtil.createInsertSql(metaData, messages.get(0));
+ sql = sql + SQLUtil.createInsertValuesSQL(metaData, messages.subList(1, messages.size()));
+ executeSQL(dbDataSource, sql);
+ return true;
+ }
+ String insertValueSQL = parseInsertValues(insertSQLTemplate);
+ if (StringUtil.isEmpty(insertValueSQL) || insertSQLTemplate.replace(insertValueSQL, "").contains("#{")) {
+ for (JSONObject message : messages) {
+ String sql = parseSQL(message, insertSQLTemplate);
+ executeSQL(dbDataSource, sql);
+ }
+ return true;
+ } else {
+ StringBuilder sb = new StringBuilder();
+ String insertSQL;
+ boolean isFirst = true;
+ int i = 0;
+ for (JSONObject message : messages) {
+ insertSQL = parseSQL(message, insertValueSQL);
+ if (isFirst) {
+ isFirst = false;
+ } else {
+ sb.append(",");
+ }
+ i++;
+
+ sb.append(insertSQL);
+ }
+ insertSQL = this.insertSQLTemplate.replace(insertValueSQL, sb.toString());
+ executeSQL(dbDataSource, insertSQL);
+ return true;
+ }
+ } finally {
+ dbDataSource.destroy();
+ }
+ }
+
+ protected void executeSQL(JDBCDriver dbDataSource, String sql) {
+ dbDataSource.execute(sql);
+ }
+
+ /**
+ * 解析出insert value数据部分,对于批量的插入,效果会更佳
+ */
+ private static final String VALUES_NAME = "values";
+
+ protected String parseInsertValues(String insertSQL) {
+ int start = insertSQL.toLowerCase().indexOf(VALUES_NAME);
+ if (start == -1) {
+ return null;
+ }
+ String valuesSQL = insertSQL.substring(start + VALUES_NAME.length());
+ int end = valuesSQL.toLowerCase().lastIndexOf(")");
+ if (end == -1) {
+ return null;
+ }
+ return valuesSQL.substring(0, end + 1);
+ }
+
+ protected String parseSQL(JSONObject message, String sql) {
+ return SQLUtil.parseIbatisSQL(message, sql);
+ }
+
+ public String getInsertSQLTemplate() {
+ return insertSQLTemplate;
+ }
+
+ public void setInsertSQLTemplate(String insertSQLTemplate) {
+ this.insertSQLTemplate = insertSQLTemplate;
+ }
+
+ public String getJdbcDriver() {
+ return jdbcDriver;
+ }
+
+ public void setJdbcDriver(String jdbcDriver) {
+ this.jdbcDriver = jdbcDriver;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public MetaData getMetaData() {
+ return metaData;
+ }
+
+ public void setMetaData(MetaData metaData) {
+ this.metaData = metaData;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
new file mode 100644
index 0000000..ef7ae28
--- /dev/null
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.sink;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
+import org.apache.rocketmq.streams.common.channel.sink.ISink;
+import org.apache.rocketmq.streams.common.channel.source.ISource;
+import org.apache.rocketmq.streams.common.model.ServiceName;
+import org.apache.rocketmq.streams.common.metadata.MetaData;
+import org.apache.rocketmq.streams.common.metadata.MetaDataField;
+import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
+import com.google.auto.service.AutoService;
+
+@AutoService(IChannelBuilder.class)
+@ServiceName(DBSinkBuilder.TYPE)
+public class DBSinkBuilder implements IChannelBuilder {
+ public static final String TYPE = "rds";
+
+ @Override
+ public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
+ DBSink sink = new DBSink();
+ sink.setUrl(properties.getProperty("url"));
+ sink.setUserName("userName");
+ sink.setPassword("password");
+ List<MetaDataField> fieldList = metaData.getMetaDataFields();
+ StringBuilder insertSQL = new StringBuilder();
+ StringBuilder insertValueSQL = new StringBuilder();
+ boolean isFirst = true;
+ for (MetaDataField field : fieldList) {
+ String fieldName = field.getFieldName();
+ if (isFirst) {
+ isFirst = false;
+ } else {
+ insertSQL.append(",");
+ insertValueSQL.append(",");
+ }
+ insertSQL.append(fieldName);
+ if (DataTypeUtil.isNumber(field.getDataType())) {
+ insertValueSQL.append(fieldName);
+ } else {
+ insertValueSQL.append("'#{" + fieldName + "}'");
+ }
+ }
+ String sql = "insert into " + properties.getProperty("tableName") + "(" + insertSQL.toString() + ")values(" + insertValueSQL.toString() + ")";
+ sink.setInsertSQLTemplate(sql);
+ return sink;
+ }
+
+ @Override
+ public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
+ throw new RuntimeException("can not support this method");
+ }
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java
new file mode 100644
index 0000000..96922e6
--- /dev/null
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.sink;
+
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.functions.MultiTableSplitFunction;
+import org.apache.rocketmq.streams.common.utils.Base64Utils;
+import org.apache.rocketmq.streams.common.utils.InstantiationUtil;
+
+public class SelfMultiTableSink extends AbstractMultiTableSink implements IAfterConfiguableRefreshListerner {
+ protected String multiTableSplitFunctionSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码
+ protected transient MultiTableSplitFunction<IMessage> multiTableSplitFunction;
+
+ public SelfMultiTableSink(String url, String userName, String password, MultiTableSplitFunction<IMessage> multiTableSplitFunction) {
+ super(url, userName, password);
+ this.multiTableSplitFunction = multiTableSplitFunction;
+ byte[] bytes = InstantiationUtil.serializeObject(multiTableSplitFunction);
+ multiTableSplitFunctionSerializeValue = Base64Utils.encode(bytes);
+ }
+
+ @Override
+ protected String createTableName(String splitId) {
+ return multiTableSplitFunction.createTableFromSplitId(splitId);
+ }
+
+ @Override
+ protected ISplit getSplitFromMessage(IMessage message) {
+ return multiTableSplitFunction.createSplit(message);
+ }
+
+ @Override
+ public void doProcessAfterRefreshConfigurable(IConfigurableService configurableService) {
+ byte[] bytes = Base64Utils.decode(multiTableSplitFunctionSerializeValue);
+ this.multiTableSplitFunction = InstantiationUtil.deserializeObject(bytes);
+ }
+}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitBySerialNumber.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitBySerialNumber.java
new file mode 100644
index 0000000..c2a49b7
--- /dev/null
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitBySerialNumber.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.sink;
+
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.context.IMessage;
+
+public class SplitBySerialNumber extends AbstractMultiTableSink {
+ public SplitBySerialNumber(String url, String userName, String password) {
+ super(url, userName, password);
+ }
+
+ @Override
+ protected String createTableName(String splitId) {
+ return null;
+ }
+
+ @Override
+ protected ISplit getSplitFromMessage(IMessage message) {
+ return null;
+ }
+}
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitByTimeMultiTableSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitByTimeMultiTableSink.java
new file mode 100644
index 0000000..87a2b3e
--- /dev/null
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitByTimeMultiTableSink.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.sink;
+
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.context.IMessage;
+
+public class SplitByTimeMultiTableSink extends AbstractMultiTableSink {
+ public SplitByTimeMultiTableSink(String url, String userName, String password) {
+ super(url, userName, password);
+ }
+
+ @Override
+ protected String createTableName(String splitId) {
+ return null;
+ }
+
+ @Override
+ protected ISplit getSplitFromMessage(IMessage message) {
+ return null;
+ }
+}
diff --git a/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/DBWriteOnlyChannelTest.java b/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/DBWriteOnlyChannelTest.java
new file mode 100644
index 0000000..c14bbc3
--- /dev/null
+++ b/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/DBWriteOnlyChannelTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.db.sink.db;
+
+import com.alibaba.fastjson.JSONObject;
+
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.context.Message;
+import org.apache.rocketmq.streams.common.metadata.MetaData;
+import org.apache.rocketmq.streams.db.driver.JDBCDriver;
+import org.apache.rocketmq.streams.db.sink.DBSink;
+import org.junit.Test;
+
+public class DBWriteOnlyChannelTest {
+
+ private String URL = "jdbc:mysql://XXXXX:3306/yundun_soc?useUnicode=true&characterEncoding=utf8&autoReconnect=true";
+ protected String USER_NAME = "XXXX";
+ protected String PASSWORD = "XXXX";
+
+ @Test
+ public void testOutputBySQL() {
+ String sql = "insert into table(name,age) values('#{name}',#{age})";
+ DBSink sink = new DBSink(sql, URL, USER_NAME, PASSWORD) {
+
+ /**
+ * 因为不是真实表,会报错,把执行sql,改成打印sql
+ */
+ @Override
+ protected void executeSQL(JDBCDriver dbDataSource, String sql) {
+ System.out.println(sql);
+ }
+ };
+ for (int i = 0; i < 10; i++) {
+ JSONObject msg = new JSONObject();
+ msg.put("name", "chris" + i);
+ msg.put("age", i);
+ IMessage message = new Message(msg);
+ sink.batchAdd(message);
+ }
+ sink.flush();
+ }
+
+ @Test
+ public void testOutputByMetaData() {
+ DBSink sink = new DBSink() {
+ /**
+ * 因为不是真实表,会报错,把执行sql,改成打印sql
+ */
+ @Override
+ protected void executeSQL(JDBCDriver dbDataSource, String sql) {
+ System.out.println(sql);
+ }
+ };
+ JSONObject msg = new JSONObject();
+ msg.put("name", "chris");
+ msg.put("age", 18);
+ MetaData metaData = MetaData.createMetaData(msg);
+ metaData.setTableName("tableName");
+ sink.setMetaData(metaData);
+ for (int i = 0; i < 10; i++) {
+ msg = new JSONObject();
+ msg.put("name", "chris" + i);
+ msg.put("age", i);
+ IMessage message = new Message(msg);
+ sink.batchAdd(message);
+ }
+ sink.flush();
+ }
+
+}