You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/07 16:04:15 UTC
[pulsar] branch master updated: [pulsar-io][jdbc sink]Support
delete and update event for JDBC Sink (#4358)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7d2141b [pulsar-io][jdbc sink]Support delete and update event for JDBC Sink (#4358)
7d2141b is described below
commit 7d2141b92d28117bee05499caca4c7a27839d025
Author: tuteng <eg...@gmail.com>
AuthorDate: Mon Jul 8 00:04:11 2019 +0800
[pulsar-io][jdbc sink]Support delete and update event for JDBC Sink (#4358)
### Motivation
Currently our JDBC Sink not support deletion and update events.
Support for delete and update events.
### Modifications
Support for delete and update events.
Add some document for JDBC Sink. https://github.com/apache/pulsar/issues/4073
### Verifying this change
local Unit Test pass.
Integration test pass
---
.../apache/pulsar/io/jdbc/JdbcAbstractSink.java | 75 +++++---
.../apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java | 21 ++-
.../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java | 14 +-
.../java/org/apache/pulsar/io/jdbc/JdbcUtils.java | 100 ++++++++++-
.../org/apache/pulsar/io/jdbc/JdbcSinkTest.java | 191 ++++++++++++++++-----
.../org/apache/pulsar/io/jdbc/JdbcUtilsTest.java | 36 +++-
site2/docs/io-connectors.md | 1 +
site2/docs/io-jdbc.md | 23 +++
.../integration/functions/PulsarFunctionsTest.java | 119 +++++++++++--
.../tests/integration/io/JdbcSinkTester.java | 12 +-
10 files changed, 495 insertions(+), 97 deletions(-)
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index a4c589f..40e2c23 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.Statement;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -51,6 +52,14 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
private JdbcUtils.TableId tableId;
private PreparedStatement insertStatement;
+ private PreparedStatement updateStatment;
+ private PreparedStatement deleteStatment;
+
+
+ protected static final String ACTION = "ACTION";
+ protected static final String INSERT = "INSERT";
+ protected static final String UPDATE = "UPDATE";
+ protected static final String DELETE = "DELETE";
protected JdbcUtils.TableDefinition tableDefinition;
@@ -87,8 +96,8 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
tableName = jdbcSinkConfig.getTableName();
tableId = JdbcUtils.getTableId(connection, tableName);
- tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
- insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition));
+ // Init PreparedStatement include insert, delete, update
+ initStatement();
timeoutMs = jdbcSinkConfig.getTimeoutMs();
batchSize = jdbcSinkConfig.getBatchSize();
@@ -100,6 +109,28 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
}
+ private void initStatement() throws Exception {
+ List<String> keyList = Lists.newArrayList();
+ String key = jdbcSinkConfig.getKey();
+ if (key !=null && !key.isEmpty()) {
+ keyList = Arrays.asList(key.split(","));
+ }
+ List<String> nonKeyList = Lists.newArrayList();
+ String nonKey = jdbcSinkConfig.getNonKey();
+ if (nonKey != null && !nonKey.isEmpty()) {
+ nonKeyList = Arrays.asList(nonKey.split(","));
+ }
+
+ tableDefinition = JdbcUtils.getTableDefinition(connection, tableId, keyList, nonKeyList);
+ insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition));
+ if (!nonKeyList.isEmpty()) {
+ updateStatment = JdbcUtils.buildUpdateStatement(connection, JdbcUtils.buildUpdateSql(tableDefinition));
+ }
+ if (!keyList.isEmpty()) {
+ deleteStatment = JdbcUtils.buildDeleteStatement(connection, JdbcUtils.buildDeleteSql(tableDefinition));
+ }
+ }
+
@Override
public void close() throws Exception {
if (!connection.getAutoCommit()) {
@@ -115,11 +146,10 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
@Override
public void write(Record<T> record) throws Exception {
int number;
- synchronized (incomingList) {
+ synchronized (this) {
incomingList.add(record);
number = incomingList.size();
}
-
if (number == batchSize) {
flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS);
}
@@ -128,8 +158,7 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
// bind value with a PreparedStetement
public abstract void bindValue(
PreparedStatement statement,
- Record<T> message) throws Exception;
-
+ Record<T> message, String action) throws Exception;
private void flush() {
// if not in flushing state, do flush, else return;
@@ -140,8 +169,7 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
if (!swapList.isEmpty()) {
throw new IllegalStateException("swapList should be empty since last flush. swapList.size: " + swapList.size());
}
-
- synchronized (incomingList) {
+ synchronized (this) {
List<Record<T>> tmpList;
swapList.clear();
@@ -150,22 +178,24 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
incomingList = tmpList;
}
- int updateCount = 0;
- boolean noInfo = false;
+ int count = 0;
try {
// bind each record value
for (Record<T> record : swapList) {
- bindValue(insertStatement, record);
- insertStatement.addBatch();
- record.ack();
- }
-
- for (int updates : insertStatement.executeBatch()) {
- if (updates == Statement.SUCCESS_NO_INFO) {
- noInfo = true;
- continue;
+ String action = record.getProperties().get(ACTION);
+ if (action != null && action.equals(DELETE)) {
+ bindValue(deleteStatment, record, action);
+ count += 1;
+ deleteStatment.execute();
+ } else if (action != null && action.equals(UPDATE)) {
+ bindValue(updateStatment, record, action);
+ count += 1;
+ updateStatment.execute();
+ } else if (action != null && action.equals(INSERT)){
+ bindValue(insertStatement, record, action);
+ count += 1;
+ insertStatement.execute();
}
- updateCount += updateCount;
}
connection.commit();
swapList.forEach(tRecord -> tRecord.ack());
@@ -174,14 +204,15 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
swapList.forEach(tRecord -> tRecord.fail());
}
- if (swapList.size() != updateCount) {
- log.error("Update count {} not match total number of records {}", updateCount, swapList.size());
+ if (swapList.size() != count) {
+ log.error("Update count {} not match total number of records {}", count, swapList.size());
}
// finish flush
if (log.isDebugEnabled()) {
log.debug("Finish flush, queue size: {}", swapList.size());
}
+ swapList.clear();
isFlushing.set(false);
} else {
if (log.isDebugEnabled()) {
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
index 640972a..de146c4 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
@@ -20,6 +20,9 @@
package org.apache.pulsar.io.jdbc;
import java.sql.PreparedStatement;
+import java.util.List;
+
+import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.functions.api.Record;
@@ -33,7 +36,7 @@ import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
@Connector(
name = "jdbc",
type = IOType.SINK,
- help = "A simple JDBC sink that writes pulser messages to a database table",
+ help = "A simple JDBC sink that writes pulsar messages to a database table",
configClass = JdbcSinkConfig.class
)
@Slf4j
@@ -41,16 +44,24 @@ public class JdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {
@Override
public void bindValue(PreparedStatement statement,
- Record<GenericRecord> message) throws Exception {
+ Record<GenericRecord> message, String action) throws Exception {
GenericRecord record = message.getValue();
+ List<ColumnId> columns = Lists.newArrayList();
+ if (action == null || action.equals(INSERT)) {
+ columns = tableDefinition.getColumns();
+ } else if (action.equals(DELETE)){
+ columns.addAll(tableDefinition.getKeyColumns());
+ } else if (action.equals(UPDATE)){
+ columns.addAll(tableDefinition.getNonKeyColumns());
+ columns.addAll(tableDefinition.getKeyColumns());
+ }
int index = 1;
- for (ColumnId columnId : tableDefinition.getColumns()) {
+ for (ColumnId columnId : columns) {
String colName = columnId.getName();
Object obj = record.getField(colName);
setColumnValue(statement, index++, obj);
- log.info("set column value: {}", obj.toString());
}
}
@@ -66,7 +77,7 @@ public class JdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {
} else if (value instanceof Boolean) {
statement.setBoolean(index, (Boolean) value);
} else if (value instanceof String) {
- statement.setString(index, (String )value);
+ statement.setString(index, (String)value);
} else if (value instanceof Short) {
statement.setShort(index, (Short) value);
} else {
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index 3bfc72c..7e238a0 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -66,7 +66,19 @@ public class JdbcSinkConfig implements Serializable {
help = "The name of the table this connector writes messages to"
)
private String tableName;
-
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "Fields used in update events. A comma-separated list."
+ )
+ private String nonKey;
+ // Optional
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "Fields used in where condition of update and delete Events. A comma-separated list."
+ )
+ private String key;
// Optional
@FieldDoc(
required = false,
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
index e959909..21fde90 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
@@ -28,8 +28,10 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+import java.util.StringJoiner;
import java.util.stream.IntStream;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -70,7 +72,6 @@ public class JdbcUtils {
private final int position;
}
- @Data(staticConstructor = "of")
@Setter
@Getter
@EqualsAndHashCode
@@ -78,6 +79,29 @@ public class JdbcUtils {
public static class TableDefinition {
private final TableId tableId;
private final List<ColumnId> columns;
+ private final List<ColumnId> nonKeyColumns;
+ private final List<ColumnId> keyColumns;
+
+ private TableDefinition(TableId tableId, List<ColumnId> columns) {
+ this(tableId, columns, null, null);
+ }
+ private TableDefinition(TableId tableId, List<ColumnId> columns,
+ List<ColumnId> nonKeyColumns, List<ColumnId> keyColumns) {
+ this.tableId = tableId;
+ this.columns = columns;
+ this.nonKeyColumns = nonKeyColumns;
+ this.keyColumns = keyColumns;
+ }
+
+ public static TableDefinition of(TableId tableId, List<ColumnId> columns) {
+ return new TableDefinition(tableId, columns);
+ }
+
+ public static TableDefinition of(TableId tableId, List<ColumnId> columns,
+ List<ColumnId> nonKeyColumns, List<ColumnId> keyColumns) {
+ return new TableDefinition(tableId, columns, nonKeyColumns, keyColumns);
+ }
+
}
/**
@@ -130,8 +154,10 @@ public class JdbcUtils {
/**
* Get the {@link TableDefinition} for the given table.
*/
- public static TableDefinition getTableDefinition(Connection connection, TableId tableId) throws Exception {
- TableDefinition table = TableDefinition.of(tableId, Lists.newArrayList());
+ public static TableDefinition getTableDefinition(
+ Connection connection, TableId tableId, List<String> keyList, List<String> nonKeyList) throws Exception {
+ TableDefinition table = TableDefinition.of(
+ tableId, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
try (ResultSet rs = connection.getMetaData().getColumns(
tableId.getCatalogName(),
@@ -146,7 +172,23 @@ public class JdbcUtils {
final String typeName = rs.getString(6);
final int position = rs.getInt(17);
- table.columns.add(ColumnId.of(tableId, columnName, sqlDataType, typeName, position));
+ ColumnId columnId = ColumnId.of(tableId, columnName, sqlDataType, typeName, position);
+ table.columns.add(columnId);
+ if (keyList != null) {
+ keyList.forEach((key) -> {
+ if (key.equals(columnName)) {
+ table.keyColumns.add(columnId);
+ }
+ });
+ }
+ if (nonKeyList != null) {
+ nonKeyList.forEach((key) -> {
+ if (key.equals(columnName)) {
+ table.nonKeyColumns.add(columnId);
+ }
+ });
+ }
+
if (log.isDebugEnabled()) {
log.debug("Get column. name: {}, data type: {}, position: {}", columnName, typeName, position);
}
@@ -175,4 +217,54 @@ public class JdbcUtils {
return connection.prepareStatement(insertSQL);
}
+ public static String combationWhere(List<ColumnId> columnIds) {
+ StringBuilder builder = new StringBuilder();
+ if (!columnIds.isEmpty()) {
+ builder.append(" WHERE ");
+ StringJoiner whereJoiner = new StringJoiner(" AND ");
+ columnIds.forEach((columnId -> {
+ StringJoiner equals = new StringJoiner("=");
+ equals.add(columnId.getName()).add("?");
+ whereJoiner.add(equals.toString());
+ }));
+ builder.append(whereJoiner.toString());
+ return builder.toString();
+ }
+ return "";
+ }
+
+ public static String buildUpdateSql(TableDefinition table) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("UPDATE ");
+ builder.append(table.tableId.getTableName());
+ builder.append(" SET ");
+ StringJoiner setJoiner = new StringJoiner(",");
+
+ table.nonKeyColumns.forEach((columnId) ->{
+ StringJoiner equals = new StringJoiner("=");
+ equals.add(columnId.getName()).add("? ");
+ setJoiner.add(equals.toString());
+ });
+ builder.append(setJoiner.toString());
+ builder.append(combationWhere(table.keyColumns));
+ return builder.toString();
+ }
+
+ public static PreparedStatement buildUpdateStatement(Connection connection, String updateSQL) throws SQLException {
+ return connection.prepareStatement(updateSQL);
+ }
+
+ public static String buildDeleteSql(TableDefinition table) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("DELETE ");
+ builder.append("FROM ");
+ builder.append(table.tableId.getTableName());
+ builder.append(combationWhere(table.keyColumns));
+ return builder.toString();
+ }
+
+ public static PreparedStatement buildDeleteStatement(Connection connection, String deleteSQL) throws SQLException {
+ return connection.prepareStatement(deleteSQL);
+ }
+
}
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index aec9d7e..583c474 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -49,7 +49,8 @@ import static org.mockito.Mockito.when;
@Slf4j
public class JdbcSinkTest {
private final SqliteUtils sqliteUtils = new SqliteUtils(getClass().getSimpleName());
- private Message<GenericRecord> message;
+ private JdbcAutoSchemaSink jdbcSink;
+ private final String tableName = "TestOpenAndWriteSink";
/**
* A Simple class to test jdbc class
@@ -66,79 +67,179 @@ public class JdbcSinkTest {
@BeforeMethod
public void setUp() throws Exception {
sqliteUtils.setUp();
- }
+ sqliteUtils.createTable(
+ "CREATE TABLE " + tableName + "(" +
+ " field1 TEXT," +
+ " field2 TEXT," +
+ " field3 INTEGER," +
+ "PRIMARY KEY (field1));"
+ );
- @AfterMethod
- public void tearDown() throws Exception {
- sqliteUtils.tearDown();
- }
+ // prepare data for udpate sql
+ String updateSql = "insert into " + tableName + " values('ValueOfField4', 'ValueOfField4', 4)";
+ sqliteUtils.execute(updateSql);
+
+ // prepare data for delete sql
+ String deleteSql = "insert into " + tableName + " values('ValueOfField5', 'ValueOfField5', 5)";
+ sqliteUtils.execute(deleteSql);
- @Test
- public void TestOpenAndWriteSink() throws Exception {
- message = mock(MessageImpl.class);
- JdbcAutoSchemaSink jdbcSink;
Map<String, Object> conf;
- String tableName = "TestOpenAndWriteSink";
- GenericSchema<GenericRecord> genericAvroSchema;
String jdbcUrl = sqliteUtils.sqliteUri();
+
conf = Maps.newHashMap();
conf.put("jdbcUrl", jdbcUrl);
conf.put("tableName", tableName);
+ conf.put("key", "field3");
+ conf.put("nonKey", "field1,field2");
+ // change batchSize to 1, to flush on each write.
+ conf.put("batchSize", 1);
jdbcSink = new JdbcAutoSchemaSink();
+ jdbcSink = new JdbcAutoSchemaSink();
- sqliteUtils.createTable(
- "CREATE TABLE " + tableName + "(" +
- " field1 TEXT," +
- " field2 TEXT," +
- " field3 INTEGER," +
- "PRIMARY KEY (field1));"
- );
+ // open should success
+ jdbcSink.open(conf, null);
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ sqliteUtils.tearDown();
+ jdbcSink.close();
+ }
+
+ @Test
+ public void TestOpenAndWriteSink() throws Exception {
+ Message<GenericRecord> insertMessage = mock(MessageImpl.class);
+ GenericSchema<GenericRecord> genericAvroSchema;
// prepare a foo Record
- Foo obj = new Foo();
- obj.setField1("ValueOfField1");
- obj.setField2("ValueOfField1");
- obj.setField3(3);
+ Foo insertObj = new Foo();
+ insertObj.setField1("ValueOfField1");
+ insertObj.setField2("ValueOfField1");
+ insertObj.setField3(3);
AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
- byte[] bytes = schema.encode(obj);
+ byte[] insertBytes = schema.encode(insertObj);
- Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
- .message(message)
- .topicName("fake_topic_name")
+ Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
+ .message(insertMessage)
+ .topicName("fake_topic_name").ackFunction(new Runnable(){
+ public void run(){
+ }
+ })
.build();
genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
- when(message.getValue())
- .thenReturn(genericAvroSchema.decode(bytes));
+ Map<String, String> insertProperties = Maps.newHashMap();
+ insertProperties.put("ACTION", "INSERT");
+ when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes));
+ when(insertMessage.getProperties()).thenReturn(insertProperties);
log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
- obj.toString(),
- message.getValue().toString(),
- record.getValue().toString());
-
- // change batchSize to 1, to flush on each write.
- conf.put("batchSize", 1);
- // open should success
- jdbcSink.open(conf, null);
+ insertObj.toString(),
+ insertMessage.getValue().toString(),
+ insertRecord.getValue().toString());
// write should success.
- jdbcSink.write(record);
+ jdbcSink.write(insertRecord);
log.info("executed write");
// sleep to wait backend flush complete
- Thread.sleep(500);
+ Thread.sleep(1000);
// value has been written to db, read it out and verify.
- String querySql = "SELECT * FROM " + tableName;
- sqliteUtils.select(querySql, (resultSet) -> {
- Assert.assertEquals(obj.getField1(), resultSet.getString(1));
- Assert.assertEquals(obj.getField2(), resultSet.getString(2));
- Assert.assertEquals(obj.getField3(), resultSet.getInt(3));
+ String querySql = "SELECT * FROM " + tableName + " WHERE field3=3";
+ int count = sqliteUtils.select(querySql, (resultSet) -> {
+ Assert.assertEquals(insertObj.getField1(), resultSet.getString(1));
+ Assert.assertEquals(insertObj.getField2(), resultSet.getString(2));
+ Assert.assertEquals(insertObj.getField3(), resultSet.getInt(3));
});
+ Assert.assertEquals(count, 1);
- jdbcSink.close();
+ }
+
+
+ @Test
+ public void TestUpdateAction() throws Exception {
+
+ AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+ Foo updateObj = new Foo();
+ updateObj.setField1("ValueOfField3");
+ updateObj.setField2("ValueOfField3");
+ updateObj.setField3(4);
+
+ byte[] updateBytes = schema.encode(updateObj);
+ Message<GenericRecord> updateMessage = mock(MessageImpl.class);
+ Record<GenericRecord> updateRecord = PulsarRecord.<GenericRecord>builder()
+ .message(updateMessage)
+ .topicName("fake_topic_name").ackFunction(new Runnable(){
+ public void run(){
+ }
+ })
+ .build();
+
+ GenericSchema<GenericRecord> updateGenericAvroSchema;
+ updateGenericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+ Map<String, String> updateProperties = Maps.newHashMap();
+ updateProperties.put("ACTION", "UPDATE");
+ when(updateMessage.getValue()).thenReturn(updateGenericAvroSchema.decode(updateBytes));
+ when(updateMessage.getProperties()).thenReturn(updateProperties);
+ log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+ updateObj.toString(),
+ updateMessage.getValue().toString(),
+ updateRecord.getValue().toString());
+
+ jdbcSink.write(updateRecord);
+
+ Thread.sleep(1000);
+
+ // value has been written to db, read it out and verify.
+ String updateQuerySql = "SELECT * FROM " + tableName + " WHERE field3=4";
+ sqliteUtils.select(updateQuerySql, (resultSet) -> {
+ Assert.assertEquals(updateObj.getField1(), resultSet.getString(1));
+ Assert.assertEquals(updateObj.getField2(), resultSet.getString(2));
+ Assert.assertEquals(updateObj.getField3(), resultSet.getInt(3));
+ });
+ }
+
+ @Test
+ public void TestDeleteAction() throws Exception {
+
+ AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+ Foo deleteObj = new Foo();
+ deleteObj.setField3(5);
+
+ byte[] deleteBytes = schema.encode(deleteObj);
+ Message<GenericRecord> deleteMessage = mock(MessageImpl.class);
+ Record<GenericRecord> deleteRecord = PulsarRecord.<GenericRecord>builder()
+ .message(deleteMessage)
+ .topicName("fake_topic_name").ackFunction(new Runnable(){
+ public void run(){
+ }
+ })
+ .build();
+
+ GenericSchema<GenericRecord> deleteGenericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+ Map<String, String> deleteProperties = Maps.newHashMap();
+ deleteProperties.put("ACTION", "DELETE");
+ when(deleteMessage.getValue()).thenReturn(deleteGenericAvroSchema.decode(deleteBytes));
+ when(deleteMessage.getProperties()).thenReturn(deleteProperties);
+ log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+ deleteObj.toString(),
+ deleteMessage.getValue().toString(),
+ deleteRecord.getValue().toString());
+
+ jdbcSink.write(deleteRecord);
+
+ Thread.sleep(1000);
+
+ // value has been written to db, read it out and verify.
+ String deleteQuerySql = "SELECT * FROM " + tableName + " WHERE field3=5";
+ Assert.assertEquals(sqliteUtils.select(deleteQuerySql, (resultSet) -> {}), 0);
}
}
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
index d58802d..408f477 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
@@ -22,9 +22,12 @@ package org.apache.pulsar.io.jdbc;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.List;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.io.jdbc.JdbcUtils.TableDefinition;
import org.apache.pulsar.io.jdbc.JdbcUtils.TableId;
+import org.assertj.core.util.Lists;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -75,21 +78,44 @@ public class JdbcUtilsTest {
// Test get getTableDefinition
log.info("verify getTableDefinition");
- TableDefinition table = JdbcUtils.getTableDefinition(connection, id);
+ List<String> keyList = Lists.newArrayList();
+ keyList.add("firstName");
+ keyList.add("lastName");
+ List<String> nonKeyList = Lists.newArrayList();
+ nonKeyList.add("age");
+ nonKeyList.add("long");
+ TableDefinition table = JdbcUtils.getTableDefinition(connection, id, keyList, nonKeyList);
Assert.assertEquals(table.getColumns().get(0).getName(), "firstName");
Assert.assertEquals(table.getColumns().get(0).getTypeName(), "TEXT");
Assert.assertEquals(table.getColumns().get(2).getName(), "age");
Assert.assertEquals(table.getColumns().get(2).getTypeName(), "INTEGER");
Assert.assertEquals(table.getColumns().get(7).getName(), "float");
Assert.assertEquals(table.getColumns().get(7).getTypeName(), "NUMERIC");
-
+ Assert.assertEquals(table.getKeyColumns().get(0).getName(), "firstName");
+ Assert.assertEquals(table.getKeyColumns().get(0).getTypeName(), "TEXT");
+ Assert.assertEquals(table.getKeyColumns().get(1).getName(), "lastName");
+ Assert.assertEquals(table.getKeyColumns().get(1).getTypeName(), "TEXT");
+ Assert.assertEquals(table.getNonKeyColumns().get(0).getName(), "age");
+ Assert.assertEquals(table.getNonKeyColumns().get(0).getTypeName(), "INTEGER");
+ Assert.assertEquals(table.getNonKeyColumns().get(1).getName(), "long");
+ Assert.assertEquals(table.getNonKeyColumns().get(1).getTypeName(), "INTEGER");
// Test get getTableDefinition
log.info("verify buildInsertSql");
- String expctedStatement = "INSERT INTO " + tableName +
+ String expctedInsertStatement = "INSERT INTO " + tableName +
"(firstName,lastName,age,bool,byte,short,long,float,double,bytes)" +
" VALUES(?,?,?,?,?,?,?,?,?,?)";
- String statement = JdbcUtils.buildInsertSql(table);
- Assert.assertEquals(statement, expctedStatement);
+ String insertStatement = JdbcUtils.buildInsertSql(table);
+ Assert.assertEquals(insertStatement, expctedInsertStatement);
+ log.info("verify buildUpdateSql");
+ String expectedUpdateStatement = "UPDATE " + tableName +
+ " SET age=? ,long=? WHERE firstName=? AND lastName=?";
+ String updateStatement = JdbcUtils.buildUpdateSql(table);
+ Assert.assertEquals(updateStatement, expectedUpdateStatement);
+ log.info("verify buildDeleteSql");
+ String expectedDeleteStatement = "DELETE FROM " + tableName +
+ " WHERE firstName=? AND lastName=?";
+ String deleteStatement = JdbcUtils.buildDeleteSql(table);
+ Assert.assertEquals(deleteStatement, expectedDeleteStatement);
}
}
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index e5e112d..0b70410 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -27,3 +27,4 @@ Pulsar Functions cluster.
- [Redis Sink Connector](io-redis.md#sink)
- [Solr Sink Connector](io-solr.md#sink)
- [InfluxDB Sink Connector](io-influxdb.md#sink)
+- [JDBC Sink Connector](io-jdbc.md)
diff --git a/site2/docs/io-jdbc.md b/site2/docs/io-jdbc.md
new file mode 100644
index 0000000..1ded39a
--- /dev/null
+++ b/site2/docs/io-jdbc.md
@@ -0,0 +1,23 @@
+---
+id: io-jdbc
+title: JDBC Connector
+sidebar_label: JDBC Connector
+---
+
+## Sink
+
+The JDBC Sink Connector is used to pull messages from Pulsar topics and persist the messages to an MySQL or Sqlite.
+Current support INSERT, DELETE and UPDATE.
+
+### Sink Configuration Options
+
+| Name | Required | Default | Description |
+|------|----------|---------|-------------|
+| userName | `false` | `` | Username used to connect to the database specified by `jdbcUrl`. |
+| password | `false` | `` | Password used to connect to the database specified by `jdbcUrl`. |
+| jdbcUrl | `true` | `` | The JDBC url of the database this connector connects to. |
+| tableName | `true` | `` | The name of the table this connector writes messages to. |
+| nonKey | `false` | `` | Fields used in update events. A comma-separated list. |
+| key | `false` | `` | Fields used in where condition of update and delete Events. A comma-separated list. |
+| timeoutMs | `false` | `500` | The jdbc operation timeout in milliseconds. |
+| batchSize | `false` | `200` | The batch size of updates made to the database. |
\ No newline at end of file
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index d7e4404..d96c3a9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.containers.GenericContainer;
import org.testng.annotations.Test;
+import org.testng.collections.Maps;
import java.util.Collections;
import java.util.HashSet;
@@ -174,16 +175,34 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
// produce messages
Map<String, String> kvs;
if (tester instanceof JdbcSinkTester) {
- kvs = produceSchemaMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
+ kvs = produceSchemaInsertMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
+ // wait for sink to process messages
+ waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages);
+ // validate the sink result
+ tester.validateSinkResult(kvs);
+
+ kvs = produceSchemaUpdateMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
+
+ // wait for sink to process messages
+ waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages + 20);
+ // validate the sink result
+ tester.validateSinkResult(kvs);
+
+ kvs = produceSchemaDeleteMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
+
+ // wait for sink to process messages
+ waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages + 20 + 20);
+ // validate the sink result
+ tester.validateSinkResult(kvs);
+
} else {
kvs = produceMessagesToInputTopic(inputTopicName, numMessages);
+ // wait for sink to process messages
+ waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages);
+ // validate the sink result
+ tester.validateSinkResult(kvs);
}
- // wait for sink to process messages
- waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages);
-
- // validate the sink result
- tester.validateSinkResult(kvs);
// update the sink
updateSinkConnector(tester, tenant, namespace, sinkName, inputTopicName);
@@ -364,7 +383,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
}
// This for JdbcSinkTester
- protected Map<String, String> produceSchemaMessagesToInputTopic(String inputTopicName,
+ protected Map<String, String> produceSchemaInsertMessagesToInputTopic(String inputTopicName,
int numMessages,
Schema<Foo> schema) throws Exception {
@Cleanup
@@ -380,16 +399,92 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
String key = "key-" + i;
JdbcSinkTester.Foo obj = new JdbcSinkTester.Foo();
- obj.setField1("field1_" + i);
- obj.setField2("field2_" + i);
+ obj.setField1("field1_insert_" + i);
+ obj.setField2("field2_insert_" + i);
obj.setField3(i);
String value = new String(schema.encode(obj));
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("ACTION", "INSERT");
kvs.put(key, value);
+ kvs.put("ACTION", "INSERT");
producer.newMessage()
- .key(key)
- .value(obj)
- .send();
+ .properties(properties)
+ .key(key)
+ .value(obj)
+ .send();
+ }
+ return kvs;
+ }
+
+ // This for JdbcSinkTester
+ protected Map<String, String> produceSchemaUpdateMessagesToInputTopic(String inputTopicName,
+ int numMessages,
+ Schema<Foo> schema) throws Exception {
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ @Cleanup
+ Producer<Foo> producer = client.newProducer(schema)
+ .topic(inputTopicName)
+ .create();
+ LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
+ log.info("update start");
+ for (int i = 0; i < numMessages; i++) {
+ String key = "key-" + i;
+
+ JdbcSinkTester.Foo obj = new JdbcSinkTester.Foo();
+ obj.setField1("field1_insert_" + i);
+ obj.setField2("field2_update_" + i);
+ obj.setField3(i);
+ String value = new String(schema.encode(obj));
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("ACTION", "UPDATE");
+
+ kvs.put(key, value);
+ kvs.put("ACTION", "UPDATE");
+ producer.newMessage()
+ .properties(properties)
+ .key(key)
+ .value(obj)
+ .send();
+ }
+ log.info("update end");
+ return kvs;
+ }
+
+ // This for JdbcSinkTester
+ protected Map<String, String> produceSchemaDeleteMessagesToInputTopic(String inputTopicName,
+ int numMessages,
+ Schema<Foo> schema) throws Exception {
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ @Cleanup
+ Producer<Foo> producer = client.newProducer(schema)
+ .topic(inputTopicName)
+ .create();
+ LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
+ for (int i = 0; i < numMessages; i++) {
+ String key = "key-" + i;
+
+ JdbcSinkTester.Foo obj = new JdbcSinkTester.Foo();
+ obj.setField1("field1_insert_" + i);
+ obj.setField2("field2_update_" + i);
+ obj.setField3(i);
+ String value = new String(schema.encode(obj));
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("ACTION", "DELETE");
+
+ kvs.put(key, value);
+ kvs.put("ACTION", "DELETE");
+ producer.newMessage()
+ .properties(properties)
+ .key(key)
+ .value(obj)
+ .send();
}
return kvs;
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
index da4331a..9904709 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -18,9 +18,6 @@
*/
package org.apache.pulsar.tests.integration.io;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
-
import com.github.dockerjava.api.command.CreateContainerCmd;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -38,6 +35,9 @@ import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.containers.MySQLContainer;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
/**
* A tester for testing jdbc sink.
* This will use MySql as DB server
@@ -71,6 +71,8 @@ public class JdbcSinkTester extends SinkTester<MySQLContainer> {
// container default value is test
sinkConfig.put("userName", "test");
sinkConfig.put("password", "test");
+ sinkConfig.put("nonKey", "field2,field3");
+ sinkConfig.put("key", "field1");
sinkConfig.put("tableName", tableName);
sinkConfig.put("batchSize", 1);
}
@@ -127,6 +129,10 @@ public class JdbcSinkTester extends SinkTester<MySQLContainer> {
PreparedStatement statement = connection.prepareStatement(querySql);
rs = statement.executeQuery();
+ if (kvs.get("ACTION").equals("DELETE")) {
+ assertFalse(rs.first());
+ return;
+ }
while (rs.next()) {
String field1 = rs.getString(1);
String field2 = rs.getString(2);