You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/04 20:20:44 UTC
[incubator-pulsar] branch master updated: Issue 2313: create a JDBC
sink connector (#2440)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 79ac46a Issue 2313: create a JDBC sink connector (#2440)
79ac46a is described below
commit 79ac46a6ae50e48bed4ccb680d1f7945611f0565
Author: Jia Zhai <ji...@users.noreply.github.com>
AuthorDate: Wed Sep 5 04:20:42 2018 +0800
Issue 2313: create a JDBC sink connector (#2440)
### Motivation
This change is trying to add a basic JDBC sink connector.
### Modifications
Add the jdbc module to the pulsar-io sub-module.
Add unit test and integration test for it.
### Result
ut and integration test pass.
Master Issue: #2442
---
distribution/io/src/assemble/io.xml | 5 +
pom.xml | 9 +-
pulsar-io/jdbc/lombok.config | 23 +++
pulsar-io/jdbc/pom.xml | 96 ++++++++++
.../apache/pulsar/io/jdbc/JdbcAbstractSink.java | 197 +++++++++++++++++++++
.../apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java | 92 ++++++++++
.../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java | 63 +++++++
.../java/org/apache/pulsar/io/jdbc/JdbcUtils.java | 178 +++++++++++++++++++
.../resources/META-INF/services/pulsar-io.yaml | 22 +++
.../org/apache/pulsar/io/jdbc/JdbcSinkTest.java | 135 ++++++++++++++
.../org/apache/pulsar/io/jdbc/JdbcUtilsTest.java | 95 ++++++++++
.../org/apache/pulsar/io/jdbc/SqliteUtils.java | 111 ++++++++++++
pulsar-io/pom.xml | 1 +
tests/integration/pom.xml | 21 +++
.../integration/functions/PulsarFunctionsTest.java | 54 +++++-
.../functions/PulsarFunctionsTestBase.java | 2 +-
.../tests/integration/io/JdbcSinkTester.java | 137 ++++++++++++++
.../tests/integration/suites/PulsarTestSuite.java | 11 ++
18 files changed, 1245 insertions(+), 7 deletions(-)
diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index 8cf7fce..bb75e84 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -75,6 +75,11 @@
<fileMode>644</fileMode>
</file>
<file>
+ <source>${basedir}/../../pulsar-io/jdbc/target/pulsar-io-jdbc-${project.version}.nar</source>
+ <outputDirectory>connectors</outputDirectory>
+ <fileMode>644</fileMode>
+ </file>
+ <file>
<source>${basedir}/../../pulsar-io/data-genenator/target/pulsar-io-data-generator-${project.version}.nar</source>
<outputDirectory>connectors</outputDirectory>
<fileMode>644</fileMode>
diff --git a/pom.xml b/pom.xml
index 1ed2802..681f18e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -167,6 +167,8 @@ flexible messaging model and an intuitive client API.</description>
<aws-sdk.version>1.11.297</aws-sdk.version>
<avro.version>1.8.2</avro.version>
<jclouds.version>2.1.1</jclouds.version>
+ <sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
+ <mysql-jdbc.version>8.0.11</mysql-jdbc.version>
<presto.version>0.206</presto.version>
<flink.version>1.6.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
@@ -822,6 +824,11 @@ flexible messaging model and an intuitive client API.</description>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ <version>${testcontainers.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.arquillian.cube</groupId>
<artifactId>arquillian-cube-docker</artifactId>
<version>${arquillian-cube.version}</version>
@@ -1086,7 +1093,7 @@ flexible messaging model and an intuitive client API.</description>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
<exclude>bin/proto/MLDataFormats_pb2.py</exclude>
-
+
<!-- pulasr-io-connector kinesis : auto generated files from flatbuffer schema -->
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude>
diff --git a/pulsar-io/jdbc/lombok.config b/pulsar-io/jdbc/lombok.config
new file mode 100644
index 0000000..9a9adee
--- /dev/null
+++ b/pulsar-io/jdbc/lombok.config
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+## This file is to fix the conflict with jackson error like this:
+## com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of ...
+lombok.anyConstructor.addConstructorProperties=true
+config.stopBubbling = true
diff --git a/pulsar-io/jdbc/pom.xml b/pulsar-io/jdbc/pom.xml
new file mode 100644
index 0000000..eed8588
--- /dev/null
+++ b/pulsar-io/jdbc/pom.xml
@@ -0,0 +1,96 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io</artifactId>
+ <version>2.2.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-io-jdbc</artifactId>
+ <name>Pulsar IO :: Jdbc</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-instance</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.xerial</groupId>
+ <artifactId>sqlite-jdbc</artifactId>
+ <version>${sqlite-jdbc.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql-jdbc.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-original</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+
+</project>
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
new file mode 100644
index 0000000..425fb57
--- /dev/null
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink<T> implements Sink<T> {
+ // ----- Runtime fields
+ private JdbcSinkConfig jdbcSinkConfig;
+ @Getter
+ private Connection connection;
+ private String jdbcUrl;
+ private String tableName;
+
+ private JdbcUtils.TableId tableId;
+ private PreparedStatement insertStatement;
+
+ // TODO: turn to getSchema from SinkContext.getTopicSchema.getSchema(inputTopic)
+ protected String schema;
+ protected JdbcUtils.TableDefinition tableDefinition;
+
+ // for flush
+ private List<Record<T>> incomingList;
+ private List<Record<T>> swapList;
+ private AtomicBoolean isFlushing;
+ private int timeoutMs;
+ private int batchSize;
+ private ScheduledExecutorService flushExecutor;
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+ jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+ jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+ if (jdbcSinkConfig.getJdbcUrl() == null) {
+ throw new IllegalArgumentException("Required jdbc Url not set.");
+ }
+
+ Properties properties = new Properties();
+ String username = jdbcSinkConfig.getUserName();
+ String password = jdbcSinkConfig.getPassword();
+ if (username != null) {
+ properties.setProperty("user", username);
+ }
+ if (password != null) {
+ properties.setProperty("password", password);
+ }
+
+ connection = JdbcUtils.getConnection(jdbcUrl, properties);
+ connection.setAutoCommit(false);
+ log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit());
+
+ schema = jdbcSinkConfig.getSchema();
+ tableName = jdbcSinkConfig.getTableName();
+ tableId = JdbcUtils.getTableId(connection, tableName);
+ tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+ insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition));
+
+ timeoutMs = jdbcSinkConfig.getTimeoutMs();
+ batchSize = jdbcSinkConfig.getBatchSize();
+ incomingList = Lists.newArrayList();
+ swapList = Lists.newArrayList();
+ isFlushing = new AtomicBoolean(false);
+
+ flushExecutor = Executors.newScheduledThreadPool(1);
+ flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!connection.getAutoCommit()) {
+ connection.commit();
+ }
+ flushExecutor.shutdown();
+ if (connection != null) {
+ connection.close();
+ }
+ log.info("Closed jdbc connection: {}", jdbcUrl);
+ }
+
+ @Override
+ public void write(Record<T> record) throws Exception {
+ int number;
+ synchronized (incomingList) {
+ incomingList.add(record);
+ number = incomingList.size();
+ }
+
+ if (number == batchSize) {
+ flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ // bind value with a PreparedStetement
+ public abstract void bindValue(
+ PreparedStatement statement,
+ Record<T> message) throws Exception;
+
+
+ private void flush() {
+ // if not in flushing state, do flush, else return;
+ if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Starting flush, queue size: {}", incomingList.size());
+ }
+ checkState(swapList.isEmpty(),
+ "swapList should be empty since last flush. swapList.size: " + swapList.size());
+
+ synchronized (incomingList) {
+ List<Record<T>> tmpList;
+ swapList.clear();
+
+ tmpList = swapList;
+ swapList = incomingList;
+ incomingList = tmpList;
+ }
+
+ int updateCount = 0;
+ boolean noInfo = false;
+ try {
+ // bind each record value
+ for (Record<T> record : swapList) {
+ bindValue(insertStatement, record);
+ insertStatement.addBatch();
+ record.ack();
+ }
+
+ for (int updates : insertStatement.executeBatch()) {
+ if (updates == Statement.SUCCESS_NO_INFO) {
+ noInfo = true;
+ continue;
+ }
+ updateCount += updateCount;
+ }
+ connection.commit();
+ swapList.forEach(tRecord -> tRecord.ack());
+ } catch (Exception e) {
+ log.error("Got exception ", e);
+ swapList.forEach(tRecord -> tRecord.fail());
+ }
+
+ if (swapList.size() != updateCount) {
+ log.error("Update count {} not match total number of records {}", updateCount, swapList.size());
+ }
+
+ // finish flush
+ if (log.isDebugEnabled()) {
+ log.debug("Finish flush, queue size: {}", swapList.size());
+ }
+ isFlushing.set(false);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size());
+ }
+ }
+ }
+
+}
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
new file mode 100644
index 0000000..ec28220
--- /dev/null
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import java.sql.PreparedStatement;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.util.Utf8;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
+
+/**
+ * A Simple Jdbc sink, which assume input Record as AvroSchema format
+ */
+@Slf4j
+public class JdbcAvroSchemaSink extends JdbcAbstractSink<byte[]> {
+
+ private Schema avroSchema = null;
+ private DatumReader<GenericRecord> reader = null;
+
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+ super.open(config, sinkContext);
+ // get reader, and read value out as GenericRecord
+ if (avroSchema == null || reader == null) {
+ avroSchema = Schema.parse(schema);
+ reader = new GenericDatumReader<>(avroSchema);
+ }
+ log.info("open JdbcAvroSchemaSink with schema: {}, and tableDefinition: {}", schema, tableDefinition.toString());
+ }
+
+
+ public void bindValue(PreparedStatement statement,
+ Record<byte[]> message) throws Exception {
+
+ byte[] value = message.getValue();
+ GenericRecord record = reader.read(null, DecoderFactory.get().binaryDecoder(value, null));
+
+ int index = 1;
+ for (ColumnId columnId : tableDefinition.getColumns()) {
+ String colName = columnId.getName();
+ Object obj = record.get(colName);
+ setColumnValue(statement, index++, obj);
+ log.info("set column value: {}", obj.toString());
+ }
+ }
+
+ private static void setColumnValue(PreparedStatement statement, int index, Object value) throws Exception {
+ if (value instanceof Integer) {
+ statement.setInt(index, (Integer) value);
+ } else if (value instanceof Long) {
+ statement.setLong(index, (Long) value);
+ } else if (value instanceof Double) {
+ statement.setDouble(index, (Double) value);
+ } else if (value instanceof Float) {
+ statement.setFloat(index, (Float) value);
+ } else if (value instanceof Boolean) {
+ statement.setBoolean(index, (Boolean) value);
+ } else if (value instanceof Utf8) {
+ statement.setString(index, ((Utf8)value).toString());
+ } else if (value instanceof Short) {
+ statement.setShort(index, (Short) value);
+ } else {
+ throw new Exception("Not support value type, need to add it. " + value.getClass());
+ }
+ }
+}
+
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
new file mode 100644
index 0000000..3419811
--- /dev/null
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class JdbcSinkConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private String userName;
+ private String password;
+ private String jdbcUrl;
+ private String tableName;
+
+ // schema for input topic
+ private String schema;
+
+ // Optional
+ private int timeoutMs = 500;
+ private int batchSize = 200;
+
+ public static JdbcSinkConfig load(String yamlFile) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return mapper.readValue(new File(yamlFile), JdbcSinkConfig.class);
+ }
+
+ public static JdbcSinkConfig load(Map<String, Object> map) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(new ObjectMapper().writeValueAsString(map), JdbcSinkConfig.class);
+ }
+}
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
new file mode 100644
index 0000000..e959909
--- /dev/null
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Jdbc Utils
+ */
+@Slf4j
+public class JdbcUtils {
+
+ @Data(staticConstructor = "of")
+ @Setter
+ @Getter
+ @EqualsAndHashCode
+ @ToString
+ public static class TableId {
+ private final String catalogName;
+ private final String schemaName;
+ private final String tableName;
+ }
+
+ @Data(staticConstructor = "of")
+ @Setter
+ @Getter
+ @EqualsAndHashCode
+ @ToString
+ public static class ColumnId {
+ private final TableId tableId;
+ private final String name;
+ // SQL type from java.sql.Types
+ private final int type;
+ private final String typeName;
+ // column position in table
+ private final int position;
+ }
+
+ @Data(staticConstructor = "of")
+ @Setter
+ @Getter
+ @EqualsAndHashCode
+ @ToString
+ public static class TableDefinition {
+ private final TableId tableId;
+ private final List<ColumnId> columns;
+ }
+
+ /**
+ * Given a driver type(such as mysql), return its jdbc driver class name.
+ * TODO: test and support more types, also add Driver in pom file.
+ */
+ public static String getDriverClassName(String driver) throws Exception {
+ if (driver.equals("mysql")) {
+ return "com.mysql.jdbc.Driver";
+ } if (driver.equals("sqlite")) {
+ return "org.sqlite.JDBC";
+ } else {
+ throw new Exception("Not tested jdbc driver type: " + driver);
+ }
+ }
+
+ /**
+ * Get the {@link Connection} for the given jdbcUrl.
+ */
+ public static Connection getConnection(String jdbcUrl, Properties properties) throws Exception {
+ String driver = jdbcUrl.split(":")[1];
+ String driverClassName = getDriverClassName(driver);
+ Class.forName(driverClassName);
+
+ return DriverManager.getConnection(jdbcUrl, properties);
+ }
+
+ /**
+ * Get the {@link TableId} for the given tableName.
+ */
+ public static TableId getTableId(Connection connection, String tableName) throws Exception {
+ DatabaseMetaData metadata = connection.getMetaData();
+ try (ResultSet rs = metadata.getTables(null, null, tableName, new String[]{"TABLE"})) {
+ if (rs.next()) {
+ String catalogName = rs.getString(1);
+ String schemaName = rs.getString(2);
+ String gotTableName = rs.getString(3);
+ checkState(tableName.equals(gotTableName),
+ "TableName not match: " + tableName + " Got: " + gotTableName);
+ if (log.isDebugEnabled()) {
+ log.debug("Get Table: {}, {}, {}", catalogName, schemaName, tableName);
+ }
+ return TableId.of(catalogName, schemaName, tableName);
+ } else {
+ throw new Exception("Not able to find table: " + tableName);
+ }
+ }
+ }
+
+ /**
+ * Get the {@link TableDefinition} for the given table.
+ */
+ public static TableDefinition getTableDefinition(Connection connection, TableId tableId) throws Exception {
+ TableDefinition table = TableDefinition.of(tableId, Lists.newArrayList());
+
+ try (ResultSet rs = connection.getMetaData().getColumns(
+ tableId.getCatalogName(),
+ tableId.getSchemaName(),
+ tableId.getTableName(),
+ null
+ )) {
+ while (rs.next()) {
+ final String columnName = rs.getString(4);
+
+ final int sqlDataType = rs.getInt(5);
+ final String typeName = rs.getString(6);
+ final int position = rs.getInt(17);
+
+ table.columns.add(ColumnId.of(tableId, columnName, sqlDataType, typeName, position));
+ if (log.isDebugEnabled()) {
+ log.debug("Get column. name: {}, data type: {}, position: {}", columnName, typeName, position);
+ }
+ }
+ return table;
+ }
+ }
+
+ public static String buildInsertSql(TableDefinition table) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("INSERT INTO ");
+ builder.append(table.tableId.getTableName());
+ builder.append("(");
+
+ table.columns.forEach(columnId -> builder.append(columnId.getName()).append(","));
+ builder.deleteCharAt(builder.length() - 1);
+
+ builder.append(") VALUES(");
+ IntStream.range(0, table.columns.size() - 1).forEach(i -> builder.append("?,"));
+ builder.append("?)");
+
+ return builder.toString();
+ }
+
+ public static PreparedStatement buildInsertStatement(Connection connection, String insertSQL) throws SQLException {
+ return connection.prepareStatement(insertSQL);
+ }
+
+}
diff --git a/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..d9d06bd
--- /dev/null
+++ b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: jdbc
+description: Jdbc sink
+sinkClass: org.apache.pulsar.io.jdbc.JdbcAvroSchemaSink
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
new file mode 100644
index 0000000..33bb859
--- /dev/null
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import com.google.common.collect.Maps;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.Map;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.PulsarRecord;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Jdbc Sink test
+ */
+@Slf4j
+public class JdbcSinkTest {
+ private final SqliteUtils sqliteUtils = new SqliteUtils(getClass().getSimpleName());
+
+ /**
+ * A Simple class to test jdbc class
+ */
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ public static class Foo {
+ private String field1;
+ private String field2;
+ private int field3;
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ sqliteUtils.setUp();
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ sqliteUtils.tearDown();
+ }
+
+ @Test
+ public void TestOpenAndWriteSink() throws Exception {
+ JdbcAvroSchemaSink jdbcSink;
+ Map<String, Object> conf;
+ String tableName = "TestOpenAndWriteSink";
+
+ String jdbcUrl = sqliteUtils.sqliteUri();
+ conf = Maps.newHashMap();
+ conf.put("jdbcUrl", jdbcUrl);
+ conf.put("tableName", tableName);
+
+ jdbcSink = new JdbcAvroSchemaSink();
+
+ sqliteUtils.createTable(
+ "CREATE TABLE " + tableName + "(" +
+ " field1 TEXT," +
+ " field2 TEXT," +
+ " field3 INTEGER," +
+ "PRIMARY KEY (field1));"
+ );
+
+ // prepare a foo Record
+ Foo obj = new Foo();
+ obj.setField1("ValueOfField1");
+ obj.setField2("ValueOfField1");
+ obj.setField3(3);
+ AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+ conf.put("schema", new String(schema.getSchemaInfo().getSchema()));
+ log.info("schema: {}", new String(schema.getSchemaInfo().getSchema()));
+
+ byte[] bytes = schema.encode(obj);
+ ByteBuf payload = Unpooled.copiedBuffer(bytes);
+ Message<byte[]> message = new MessageImpl("77:777", conf, payload, Schema.BYTES);
+ Record<byte[]> record = PulsarRecord.<byte[]>builder()
+ .message(message)
+ .topicName("fake_topic_name")
+ .build();
+
+ log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+ obj.toString(),
+ message.getValue().toString(),
+ record.getValue().toString());
+
+ // change batchSize to 1, to flush on each write.
+ conf.put("batchSize", 1);
+ // open should success
+ jdbcSink.open(conf, null);
+
+ // write should success.
+ jdbcSink.write(record);
+ log.info("executed write");
+ // sleep to wait backend flush complete
+ Thread.sleep(500);
+
+ // value has been written to db, read it out and verify.
+ String querySql = "SELECT * FROM " + tableName;
+ sqliteUtils.select(querySql, (resultSet) -> {
+ Assert.assertEquals(obj.getField1(), resultSet.getString(1));
+ Assert.assertEquals(obj.getField2(), resultSet.getString(2));
+ Assert.assertEquals(obj.getField3(), resultSet.getInt(3));
+ });
+
+ jdbcSink.close();
+ }
+
+}
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
new file mode 100644
index 0000000..d58802d
--- /dev/null
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.io.jdbc.JdbcUtils.TableDefinition;
+import org.apache.pulsar.io.jdbc.JdbcUtils.TableId;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Jdbc Utils test
+ */
+@Slf4j
+public class JdbcUtilsTest {
+
+ private final SqliteUtils sqliteUtils = new SqliteUtils(getClass().getSimpleName());
+ @BeforeMethod
+ public void setUp() throws IOException, SQLException {
+ sqliteUtils.setUp();
+ }
+
+ @AfterMethod
+ public void tearDown() throws IOException, SQLException {
+ sqliteUtils.tearDown();
+ }
+
+ @Test
+ public void TestGetTableId() throws Exception {
+ String tableName = "TestGetTableId";
+
+ sqliteUtils.createTable(
+ "CREATE TABLE " + tableName + "(" +
+ " firstName TEXT," +
+ " lastName TEXT," +
+ " age INTEGER," +
+ " bool NUMERIC," +
+ " byte INTEGER," +
+ " short INTEGER NULL," +
+ " long INTEGER," +
+ " float NUMERIC," +
+ " double NUMERIC," +
+ " bytes BLOB, " +
+ "PRIMARY KEY (firstName, lastName));"
+ );
+
+ Connection connection = sqliteUtils.getConnection();
+
+ // Test getTableId
+ log.info("verify getTableId");
+ TableId id = JdbcUtils.getTableId(connection, tableName);
+ Assert.assertEquals(id.getTableName(), tableName);
+
+ // Test get getTableDefinition
+ log.info("verify getTableDefinition");
+ TableDefinition table = JdbcUtils.getTableDefinition(connection, id);
+ Assert.assertEquals(table.getColumns().get(0).getName(), "firstName");
+ Assert.assertEquals(table.getColumns().get(0).getTypeName(), "TEXT");
+ Assert.assertEquals(table.getColumns().get(2).getName(), "age");
+ Assert.assertEquals(table.getColumns().get(2).getTypeName(), "INTEGER");
+ Assert.assertEquals(table.getColumns().get(7).getName(), "float");
+ Assert.assertEquals(table.getColumns().get(7).getTypeName(), "NUMERIC");
+
+ // Test get getTableDefinition
+ log.info("verify buildInsertSql");
+ String expctedStatement = "INSERT INTO " + tableName +
+ "(firstName,lastName,age,bool,byte,short,long,float,double,bytes)" +
+ " VALUES(?,?,?,?,?,?,?,?,?,?)";
+ String statement = JdbcUtils.buildInsertSql(table);
+ Assert.assertEquals(statement, expctedStatement);
+ }
+
+}
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
new file mode 100644
index 0000000..3b4a01a
--- /dev/null
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public final class SqliteUtils {
+
+ static {
+ try {
+ Class.forName("org.sqlite.JDBC");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public interface ResultSetReadCallback {
+ void read(final ResultSet rs) throws SQLException;
+ }
+
+ private final Path dbPath;
+
+ private Connection connection;
+
+ public Connection getConnection() {
+ return connection;
+ }
+
+ public SqliteUtils(String testId) {
+ dbPath = Paths.get(testId + ".db");
+ }
+
+ public String sqliteUri() {
+ return "jdbc:sqlite:" + dbPath;
+ }
+
+ public void setUp() throws SQLException, IOException {
+ Files.deleteIfExists(dbPath);
+ connection = DriverManager.getConnection(sqliteUri());
+ connection.setAutoCommit(false);
+ }
+
+ public void tearDown() throws SQLException, IOException {
+ connection.close();
+ Files.deleteIfExists(dbPath);
+ }
+
+ public void createTable(final String createSql) throws SQLException {
+ execute(createSql);
+ }
+
+ public void deleteTable(final String table) throws SQLException {
+ execute("DROP TABLE IF EXISTS " + table);
+
+ //random errors of table not being available happens in the unit tests
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public int select(final String query, final SqliteUtils.ResultSetReadCallback callback) throws SQLException {
+ int count = 0;
+ try (Statement stmt = connection.createStatement()) {
+ try (ResultSet rs = stmt.executeQuery(query)) {
+ while (rs.next()) {
+ callback.read(rs);
+ count++;
+ }
+ }
+ }
+ return count;
+ }
+
+ public void execute(String sql) throws SQLException {
+ try (Statement stmt = connection.createStatement()) {
+ stmt.executeUpdate(sql);
+ connection.commit();
+ }
+ }
+
+}
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index f1494db..e89cc02 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -38,6 +38,7 @@
<module>kafka</module>
<module>rabbitmq</module>
<module>kinesis</module>
+ <module>jdbc</module>
<module>data-genenator</module>
</modules>
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index a3a4694..6d3fdc4 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -78,6 +78,27 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql-jdbc.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io-jdbc</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 67136a6..5cefc6a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -36,18 +36,22 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
import org.apache.pulsar.tests.integration.io.CassandraSinkTester;
+import org.apache.pulsar.tests.integration.io.JdbcSinkTester;
+import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
import org.apache.pulsar.tests.integration.io.KafkaSinkTester;
import org.apache.pulsar.tests.integration.io.KafkaSourceTester;
import org.apache.pulsar.tests.integration.io.SinkTester;
import org.apache.pulsar.tests.integration.io.SourceTester;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.Assert;
import org.testng.annotations.Test;
/**
@@ -70,15 +74,20 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
testSink(new CassandraSinkTester());
}
+ @Test
+ public void testJdbcSink() throws Exception {
+ testSink(new JdbcSinkTester());
+ }
+
private void testSink(SinkTester tester) throws Exception {
tester.findSinkServiceContainer(pulsarCluster.getExternalServices());
final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
final String inputTopicName = "test-sink-connector-"
- + functionRuntimeType + "-input-topic-" + randomName(8);
+ + tester.getSinkType() + "-" + functionRuntimeType + "-input-topic-" + randomName(8);
final String sinkName = "test-sink-connector-"
- + functionRuntimeType + "-name-" + randomName(8);
+ + tester.getSinkType() + "-" + functionRuntimeType + "-name-" + randomName(8);
final int numMessages = 20;
// prepare the testing environment for sink
@@ -94,7 +103,12 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
getSinkStatus(tenant, namespace, sinkName);
// produce messages
- Map<String, String> kvs = produceMessagesToInputTopic(inputTopicName, numMessages);
+ Map<String, String> kvs;
+ if (tester instanceof JdbcSinkTester) {
+ kvs = produceSchemaMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(Foo.class));
+ } else {
+ kvs = produceMessagesToInputTopic(inputTopicName, numMessages);
+ }
// wait for sink to process messages
waitForProcessingMessages(tenant, namespace, sinkName, numMessages);
@@ -202,6 +216,36 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
return kvs;
}
+ // This for JdbcSinkTester
+ protected Map<String, String> produceSchemaMessagesToInputTopic(String inputTopicName,
+ int numMessages, Schema schema) throws Exception {
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ @Cleanup
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(inputTopicName)
+ .create();
+ LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
+ for (int i = 0; i < numMessages; i++) {
+ String key = "key-" + i;
+
+ Foo obj = new Foo();
+ obj.setField1("field1_" + i);
+ obj.setField2("field2_" + i);
+ obj.setField3(i);
+ String value = new String(schema.encode(obj));
+
+ kvs.put(key, value);
+ producer.newMessage()
+ .key(key)
+ .value(value)
+ .send();
+ }
+ return kvs;
+ }
+
protected void waitForProcessingMessages(String tenant,
String namespace,
String sinkName,
@@ -226,8 +270,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
// expected in early iterations
}
- log.info("{} ms has elapsed but the sink hasn't process {} messages, backoff to wait for another 1 second",
- stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
+ log.info("{} ms has elapsed but the sink {} hasn't process {} messages, backoff to wait for another 1 second",
+ stopwatch.elapsed(TimeUnit.MILLISECONDS), sinkName, numMessages);
TimeUnit.SECONDS.sleep(1);
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index fe4795d..7a47f77 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -52,7 +52,7 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
@BeforeClass
public void setupFunctionWorkers() {
- final int numFunctionWorkers = 2;
+ final int numFunctionWorkers = 3;
log.info("Setting up {} function workers : function runtime type = {}",
numFunctionWorkers, functionRuntimeType);
pulsarCluster.setupFunctionWorkers(randomName(5), functionRuntimeType, numFunctionWorkers);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
new file mode 100644
index 0000000..6a102f1
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Map;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MySQLContainer;
+
+/**
+ * A tester for testing jdbc sink.
+ * This will use MySql as DB server
+ */
+@Slf4j
+public class JdbcSinkTester extends SinkTester {
+
+ /**
+ * A Simple class to test jdbc class,
+ *
+ */
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ public static class Foo {
+ private String field1;
+ private String field2;
+ private int field3;
+ }
+
+ private static final String NAME = "jdbc";
+
+ private MySQLContainer mySQLContainer;
+ private AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+ private String tableName = "test";
+ private Connection connection;
+
+ public JdbcSinkTester() {
+ super(NAME);
+
+ // container default value is test
+ sinkConfig.put("userName", "test");
+ sinkConfig.put("password", "test");
+ sinkConfig.put("tableName", tableName);
+
+ // prepare schema
+ sinkConfig.put("schema", new String(schema.getSchemaInfo().getSchema()));
+ log.info("schema: {}", new String(schema.getSchemaInfo().getSchema()));
+ sinkConfig.put("batchSize", 1);
+ }
+
+ @Override
+ public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) {
+ GenericContainer<?> container = containers.get("mysql");
+ checkState(container instanceof MySQLContainer,
+ "No MySQL service found in the cluster");
+
+ this.mySQLContainer = (MySQLContainer) container;
+ log.info("find sink service container: {}", mySQLContainer.getContainerName());
+ }
+
+ @Override
+ public void prepareSink() throws Exception {
+ String jdbcUrl = mySQLContainer.getJdbcUrl();
+ // we need set mysql server address in cluster network.
+ sinkConfig.put("jdbcUrl", "jdbc:mysql://mysql:3306/test");
+ String driver = mySQLContainer.getDriverClassName();
+ Class.forName(driver);
+
+ connection = DriverManager.getConnection(jdbcUrl, "test", "test");
+ log.info("getConnection: {}, jdbcurl: {}", connection, jdbcUrl);
+
+ // create table
+ String createTable = "CREATE TABLE " + tableName +
+ " (field1 TEXT, field2 TEXT, field3 INTEGER, PRIMARY KEY (field3))";
+ int ret = connection.createStatement().executeUpdate(createTable);
+ log.info("created table in jdbc: {}, return value: {}", createTable, ret);
+ }
+
+ @Override
+ public void validateSinkResult(Map<String, String> kvs) {
+ log.info("Query table content from mysql server: {}", tableName);
+ String querySql = "SELECT * FROM " + tableName;
+ ResultSet rs;
+ try {
+ // backend flush may not complete.
+ Thread.sleep(1000);
+
+ PreparedStatement statement = connection.prepareStatement(querySql);
+ rs = statement.executeQuery();
+
+ while (rs.next()) {
+ String field1 = rs.getString(1);
+ String field2 = rs.getString(2);
+ int field3 = rs.getInt(3);
+
+ String value = kvs.get("key-" + field3);
+
+ Foo obj = schema.decode(value.getBytes());
+ assertEquals(obj.field1, field1);
+ assertEquals(obj.field2, field2);
+ assertEquals(obj.field3, field3);
+ }
+ } catch (Exception e) {
+ log.error("Got exception: ", e);
+ fail("Got exception when op sql.");
+ return;
+ }
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
index e20a933..147f273 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarCl
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.MySQLContainer;
import org.testng.ITest;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
@@ -51,6 +52,7 @@ public class PulsarTestSuite extends PulsarClusterTestBase implements ITest {
// register external services
Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
+
final String kafkaServiceName = "kafka";
externalServices.put(
kafkaServiceName,
@@ -60,10 +62,19 @@ public class PulsarTestSuite extends PulsarClusterTestBase implements ITest {
.withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd
.withName(kafkaServiceName)
.withHostName(clusterName + "-" + kafkaServiceName)));
+
final String cassandraServiceName = "cassandra";
externalServices.put(
cassandraServiceName,
new CassandraContainer(clusterName));
+
+ // use mySQL for jdbc test
+ final String jdbcServiceName = "mysql";
+ externalServices.put(
+ jdbcServiceName,
+ new MySQLContainer()
+ .withExposedPorts(3306));
+
builder = builder.externalServices(externalServices);
return builder;