You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/05/30 10:27:25 UTC
[pulsar] branch master updated: [improve][connector] JDBC Sinks: support KeyValue schema (#15807)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 29308e48b78 [improve][connector] JDBC Sinks: support KeyValue schema (#15807)
29308e48b78 is described below
commit 29308e48b78a01959ee8067dbcac67d0e903e8a4
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Mon May 30 12:27:14 2022 +0200
[improve][connector] JDBC Sinks: support KeyValue schema (#15807)
* [improve][connector] JDBC Sinks: support KeyValue schema
* revert JDK
* debug
* checkstyle and itest
* fix test
* better error message
---
.../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java | 88 ++++++++++++-
.../apache/pulsar/io/jdbc/JdbcAbstractSink.java | 8 +-
pulsar-io/jdbc/sqlite/pom.xml | 5 +
.../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java | 140 ++++++++++++++++++---
.../io/sinks/JdbcPostgresSinkTester.java | 88 ++++++++++---
.../integration/io/sinks/PulsarIOSinkRunner.java | 2 +-
.../integration/io/sinks/PulsarSinksTest.java | 6 +-
7 files changed, 296 insertions(+), 41 deletions(-)
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index d45be932797..a4a746ebd89 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -19,11 +19,20 @@
package org.apache.pulsar.io.jdbc;
+import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import java.sql.PreparedStatement;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
@@ -31,13 +40,32 @@ import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
* An abstract Jdbc sink, which interprets input Record in generic record.
*/
@Slf4j
-public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {
+public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericObject> {
@Override
public void bindValue(PreparedStatement statement,
- Record<GenericRecord> message, String action) throws Exception {
+ Record<GenericObject> message, String action) throws Exception {
+ final GenericObject record = message.getValue();
+ Function<String, Object> recordValueGetter;
+ if (message.getSchema() != null && message.getSchema() instanceof KeyValueSchema) {
+ KeyValueSchema<GenericObject, GenericObject> keyValueSchema = (KeyValueSchema) message.getSchema();
+
+ final org.apache.pulsar.client.api.Schema<GenericObject> keySchema = keyValueSchema.getKeySchema();
+ final org.apache.pulsar.client.api.Schema<GenericObject> valueSchema = keyValueSchema.getValueSchema();
+ KeyValue<GenericObject, GenericObject> keyValue =
+ (KeyValue<GenericObject, GenericObject>) record.getNativeObject();
+
+ final GenericObject key = keyValue.getKey();
+ final GenericObject value = keyValue.getValue();
+
+ Map<String, Object> data = new HashMap<>();
+ fillKeyValueSchemaData(keySchema, key, data);
+ fillKeyValueSchemaData(valueSchema, value, data);
+ recordValueGetter = (k) -> data.get(k);
+ } else {
+ recordValueGetter = (key) -> ((GenericRecord) record).getField(key);
+ }
- GenericRecord record = message.getValue();
List<ColumnId> columns = Lists.newArrayList();
if (action == null || action.equals(INSERT)) {
columns = tableDefinition.getColumns();
@@ -56,7 +84,7 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericRec
log.debug("colName: {} colType: {}", colName, colType);
}
try {
- Object obj = record.getField(colName);
+ Object obj = recordValueGetter.apply(colName);
if (obj != null) {
setColumnValue(statement, index++, obj);
} else {
@@ -73,7 +101,6 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericRec
}
setColumnNull(statement, index++, colType);
}
-
}
}
@@ -107,5 +134,56 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericRec
throw new Exception("Not support value type, need to add it. " + value.getClass());
}
}
+
+ private Object getValueFromJsonNode(final JsonNode fn) {
+ if (fn == null || fn.isNull()) {
+ return null;
+ }
+ if (fn.isContainerNode()) {
+ throw new IllegalArgumentException("Container nodes are not supported, the JSON must contains only "
+ + "first level fields.");
+ } else if (fn.isBoolean()) {
+ return fn.asBoolean();
+ } else if (fn.isFloatingPointNumber()) {
+ return fn.asDouble();
+ } else if (fn.isBigInteger()) {
+ if (fn.canConvertToLong()) {
+ return fn.asLong();
+ } else {
+ return fn.asText();
+ }
+ } else if (fn.isNumber()) {
+ return fn.numberValue();
+ } else {
+ return fn.asText();
+ }
+ }
+
+ private void fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<GenericObject> schema,
+ GenericObject record,
+ Map<String, Object> data) {
+ switch (schema.getSchemaInfo().getType()) {
+ case JSON:
+ final JsonNode jsonNode = (JsonNode) record.getNativeObject();
+ final Iterator<String> fieldNames = jsonNode.fieldNames();
+ while (fieldNames.hasNext()) {
+ String fieldName = fieldNames.next();
+ final JsonNode nodeValue = jsonNode.get(fieldName);
+ data.put(fieldName, getValueFromJsonNode(nodeValue));
+ }
+ break;
+ case AVRO:
+ org.apache.avro.generic.GenericRecord avroNode =
+ (org.apache.avro.generic.GenericRecord) record.getNativeObject();
+ for (Schema.Field field : avroNode.getSchema().getFields()) {
+ data.put(field.name(), avroNode.get(field.name()));
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("unexpected schema type: "
+ + schema.getSchemaInfo().getType()
+ + " with KeyValueSchema");
+ }
+ }
}
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 3bc1fe16e7f..0fba57dcb29 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -133,12 +133,16 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
@Override
public void close() throws Exception {
- if (!connection.getAutoCommit()) {
+ if (connection != null && !connection.getAutoCommit()) {
connection.commit();
}
- flushExecutor.shutdown();
+ if (flushExecutor != null) {
+ flushExecutor.shutdown();
+ flushExecutor = null;
+ }
if (connection != null) {
connection.close();
+ connection = null;
}
log.info("Closed jdbc connection: {}", jdbcUrl);
}
diff --git a/pulsar-io/jdbc/sqlite/pom.xml b/pulsar-io/jdbc/sqlite/pom.xml
index d235930f112..2facee2aa50 100644
--- a/pulsar-io/jdbc/sqlite/pom.xml
+++ b/pulsar-io/jdbc/sqlite/pom.xml
@@ -54,6 +54,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index d6242740f5c..d92d437f664 100644
--- a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++ b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
@@ -24,22 +24,31 @@ import com.google.common.collect.Maps;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -66,7 +75,7 @@ public class SqliteJdbcSinkTest {
private int field3;
}
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
sqliteUtils.setUp();
sqliteUtils.createTable(
@@ -111,7 +120,7 @@ public class SqliteJdbcSinkTest {
}
private void testOpenAndWriteSinkNullValue(Map<String, String> actionProperties) throws Exception {
- Message<GenericRecord> insertMessage = mock(MessageImpl.class);
+ Message<GenericObject> insertMessage = mock(MessageImpl.class);
GenericSchema<GenericRecord> genericAvroSchema;
// prepare a foo Record
Foo insertObj = new Foo();
@@ -123,7 +132,7 @@ public class SqliteJdbcSinkTest {
byte[] insertBytes = schema.encode(insertObj);
CompletableFuture<Void> future = new CompletableFuture<>();
- Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
+ Record<GenericObject> insertRecord = PulsarRecord.<GenericObject>builder()
.message(insertMessage)
.topicName("fake_topic_name")
.ackFunction(() -> future.complete(null))
@@ -155,7 +164,7 @@ public class SqliteJdbcSinkTest {
}
private void testOpenAndWriteSinkJson(Map<String, String> actionProperties) throws Exception {
- Message<GenericRecord> insertMessage = mock(MessageImpl.class);
+ Message<GenericObject> insertMessage = mock(MessageImpl.class);
GenericSchema<GenericRecord> genericAvroSchema;
// prepare a foo Record
Foo insertObj = new Foo();
@@ -166,7 +175,7 @@ public class SqliteJdbcSinkTest {
byte[] insertBytes = schema.encode(insertObj);
CompletableFuture<Void> future = new CompletableFuture<>();
- Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
+ Record<GenericObject> insertRecord = PulsarRecord.<GenericObject>builder()
.message(insertMessage)
.topicName("fake_topic_name")
.ackFunction(() -> future.complete(null))
@@ -198,7 +207,7 @@ public class SqliteJdbcSinkTest {
}
private void testOpenAndWriteSinkNullValueJson(Map<String, String> actionProperties) throws Exception {
- Message<GenericRecord> insertMessage = mock(MessageImpl.class);
+ Message<GenericObject> insertMessage = mock(MessageImpl.class);
GenericSchema<GenericRecord> genericAvroSchema;
// prepare a foo Record
Foo insertObj = new Foo();
@@ -210,7 +219,7 @@ public class SqliteJdbcSinkTest {
byte[] insertBytes = schema.encode(insertObj);
CompletableFuture<Void> future = new CompletableFuture<>();
- Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
+ Record<GenericObject> insertRecord = PulsarRecord.<GenericObject>builder()
.message(insertMessage)
.topicName("fake_topic_name")
.ackFunction(() -> future.complete(null))
@@ -228,6 +237,7 @@ public class SqliteJdbcSinkTest {
jdbcSink.write(insertRecord);
log.info("executed write");
// sleep to wait backend flush complete
+ // sleep to wait backend flush complete
future.get(1, TimeUnit.SECONDS);
// value has been written to db, read it out and verify.
@@ -242,7 +252,7 @@ public class SqliteJdbcSinkTest {
}
private void testOpenAndWriteSink(Map<String, String> actionProperties) throws Exception {
- Message<GenericRecord> insertMessage = mock(MessageImpl.class);
+ Message<GenericObject> insertMessage = mock(MessageImpl.class);
GenericSchema<GenericRecord> genericAvroSchema;
// prepare a foo Record
Foo insertObj = new Foo();
@@ -253,7 +263,7 @@ public class SqliteJdbcSinkTest {
byte[] insertBytes = schema.encode(insertObj);
CompletableFuture<Void> future = new CompletableFuture<>();
- Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
+ Record<GenericObject> insertRecord = PulsarRecord.<GenericObject>builder()
.message(insertMessage)
.topicName("fake_topic_name")
.ackFunction(() -> future.complete(null))
@@ -311,7 +321,7 @@ public class SqliteJdbcSinkTest {
@Test
public void TestUnknownAction() throws Exception {
- Record<GenericRecord> recordRecord = mock(Record.class);
+ Record<GenericObject> recordRecord = mock(Record.class);
when(recordRecord.getProperties()).thenReturn(ImmutableMap.of("ACTION", "UNKNOWN"));
CompletableFuture<Void> future = new CompletableFuture<>();
doAnswer(a -> future.complete(null)).when(recordRecord).fail();
@@ -330,9 +340,9 @@ public class SqliteJdbcSinkTest {
updateObj.setField3(4);
byte[] updateBytes = schema.encode(updateObj);
- Message<GenericRecord> updateMessage = mock(MessageImpl.class);
+ Message<GenericObject> updateMessage = mock(MessageImpl.class);
CompletableFuture<Void> future = new CompletableFuture<>();
- Record<GenericRecord> updateRecord = PulsarRecord.<GenericRecord>builder()
+ Record<GenericObject> updateRecord = PulsarRecord.<GenericObject>builder()
.message(updateMessage)
.topicName("fake_topic_name")
.ackFunction(() -> future.complete(null))
@@ -371,9 +381,9 @@ public class SqliteJdbcSinkTest {
deleteObj.setField3(5);
byte[] deleteBytes = schema.encode(deleteObj);
- Message<GenericRecord> deleteMessage = mock(MessageImpl.class);
+ Message<GenericObject> deleteMessage = mock(MessageImpl.class);
CompletableFuture<Void> future = new CompletableFuture<>();
- Record<GenericRecord> deleteRecord = PulsarRecord.<GenericRecord>builder()
+ Record<GenericObject> deleteRecord = PulsarRecord.<GenericObject>builder()
.message(deleteMessage)
.topicName("fake_topic_name")
.ackFunction(() -> future.complete(null))
@@ -398,4 +408,106 @@ public class SqliteJdbcSinkTest {
Assert.assertEquals(sqliteUtils.select(deleteQuerySql, (resultSet) -> {}), 0);
}
+ @DataProvider(name = "schemaType")
+ public Object[] schemaType() {
+ return new Object[]{SchemaType.JSON, SchemaType.AVRO};
+ }
+
+
+ @Test(dataProvider = "schemaType")
+ public void testKeyValueSchema(SchemaType schemaType) throws Exception {
+ RecordSchemaBuilder keySchemaBuilder = org.apache.pulsar.client.api.schema.SchemaBuilder.record("key");
+ keySchemaBuilder.field("key").type(SchemaType.STRING).optional().defaultValue(null);
+ GenericSchema<GenericRecord> keySchema = Schema.generic(keySchemaBuilder.build(schemaType));
+ GenericRecord keyGenericRecord = keySchema.newRecordBuilder()
+ .set("key", "mykey")
+ .build();
+
+ RecordSchemaBuilder valueSchemaBuilder = org.apache.pulsar.client.api.schema.SchemaBuilder.record("value");
+ valueSchemaBuilder.field("string").type(SchemaType.STRING).optional().defaultValue(null);
+ valueSchemaBuilder.field("int").type(SchemaType.INT32).optional().defaultValue(null);
+ valueSchemaBuilder.field("bool").type(SchemaType.BOOLEAN).optional().defaultValue(null);
+ valueSchemaBuilder.field("double").type(SchemaType.DOUBLE).optional().defaultValue(null);
+ valueSchemaBuilder.field("float").type(SchemaType.FLOAT).optional().defaultValue(null);
+ valueSchemaBuilder.field("long").type(SchemaType.INT64).optional().defaultValue(null);
+ GenericSchema<GenericRecord> valueSchema = Schema.generic(valueSchemaBuilder.build(schemaType));
+
+ GenericRecord valueGenericRecord = valueSchema.newRecordBuilder()
+ .set("string", "thestring")
+ .set("int", Integer.MAX_VALUE)
+ .set("bool", true)
+ .set("double", Double.MAX_VALUE)
+ .set("float", Float.MAX_VALUE)
+ .set("long", Long.MIN_VALUE)
+ .build();
+
+ Schema<KeyValue<GenericRecord, GenericRecord>> keyValueSchema = Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE);
+ KeyValue<GenericRecord, GenericRecord> keyValue = new KeyValue<>(keyGenericRecord, valueGenericRecord);
+ GenericObject genericObject = new GenericObject() {
+ @Override
+ public SchemaType getSchemaType() {
+ return SchemaType.KEY_VALUE;
+ }
+
+ @Override
+ public Object getNativeObject() {
+ return keyValue;
+ }
+ };
+ Record<GenericObject> genericObjectRecord = new Record<>() {
+ @Override
+ public Optional<String> getTopicName() {
+ return Optional.of("topic");
+ }
+
+ @Override
+ public org.apache.pulsar.client.api.Schema getSchema() {
+ return keyValueSchema;
+ }
+
+ @Override
+ public GenericObject getValue() {
+ return genericObject;
+ }
+ };
+ jdbcSink.close();
+ sqliteUtils.createTable(
+ "CREATE TABLE kvtable (" +
+ " key TEXT," +
+ " int INTEGER," +
+ " nulltext TEXT," +
+ " bool NUMERIC," +
+ " double NUMERIC," +
+ " float NUMERIC," +
+ " long INTEGER," +
+ "PRIMARY KEY (key));"
+ );
+ String jdbcUrl = sqliteUtils.sqliteUri();
+
+ Map<String, Object> conf = Maps.newHashMap();
+ conf.put("jdbcUrl", jdbcUrl);
+ conf.put("tableName", "kvtable");
+ conf.put("key", "key");
+ conf.put("nonKey", "long,int,double,float,bool,nulltext");
+ // change batchSize to 1, to flush on each write.
+ conf.put("batchSize", 1);
+ try (SqliteJdbcAutoSchemaSink kvSchemaJdbcSink = new SqliteJdbcAutoSchemaSink();) {
+ kvSchemaJdbcSink.open(conf, null);
+ kvSchemaJdbcSink.write(genericObjectRecord);
+
+ Awaitility.await().untilAsserted(() -> {
+ final int count = sqliteUtils.select("select int,bool,double,float,long,nulltext from kvtable where key='mykey'", (resultSet) -> {
+ int index = 1;
+ Assert.assertEquals(resultSet.getInt(index++), Integer.MAX_VALUE);
+ Assert.assertEquals(resultSet.getBoolean(index++), true);
+ Assert.assertEquals(resultSet.getDouble(index++), Double.MAX_VALUE);
+ Assert.assertEquals(resultSet.getFloat(index++), Float.MAX_VALUE);
+ Assert.assertEquals(resultSet.getLong(index++), Long.MIN_VALUE);
+ Assert.assertNull(resultSet.getString(index++));
+ });
+ Assert.assertEquals(count, 1);
+ });
+ }
+ }
+
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/JdbcPostgresSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/JdbcPostgresSinkTester.java
index a7be52fc372..e67626580b7 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/JdbcPostgresSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/JdbcPostgresSinkTester.java
@@ -22,12 +22,19 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.util.LinkedHashMap;
import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Cleanup;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.containers.PostgreSQLContainer;
@@ -42,9 +49,6 @@ import static org.testng.Assert.fail;
@Slf4j
public class JdbcPostgresSinkTester extends SinkTester<PostgreSQLContainer> {
- /**
- * A Simple class to test jdbc class,
- */
@Data
public static class Foo {
private String field1;
@@ -52,15 +56,30 @@ public class JdbcPostgresSinkTester extends SinkTester<PostgreSQLContainer> {
private int field3;
}
+ @Data
+ @AllArgsConstructor
+ public static class KVSchemaKey {
+ private int field3;
+ }
+
+ @Data
+ @AllArgsConstructor
+ public static class KVSchemaValue {
+ private String field1;
+ private String field2;
+ }
+
private static final String NAME = "jdbc-postgres";
private static final String POSTGRES = "postgres";
private final AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
private final String tableName = "test";
private Connection connection;
+ private boolean keyValueSchema;
- public JdbcPostgresSinkTester() {
+ public JdbcPostgresSinkTester(boolean keyValueSchema) {
super(NAME, SinkType.JDBC_POSTGRES);
+ this.keyValueSchema = keyValueSchema;
// container default value is test
sinkConfig.put("userName", "test");
@@ -73,12 +92,16 @@ public class JdbcPostgresSinkTester extends SinkTester<PostgreSQLContainer> {
@Override
public Schema<?> getInputTopicSchema() {
- return schema;
+ if (keyValueSchema) {
+ return Schema.AUTO_CONSUME();
+ } else {
+ return schema;
+ }
}
@Override
protected PostgreSQLContainer createSinkService(PulsarCluster cluster) {
- return (PostgreSQLContainer) new PostgreSQLContainer()
+ return (PostgreSQLContainer) new PostgreSQLContainer("postgres:14.3")
.withNetworkAliases(POSTGRES);
}
@@ -100,10 +123,33 @@ public class JdbcPostgresSinkTester extends SinkTester<PostgreSQLContainer> {
log.info("created table in jdbc: {}, return value: {}", createTable, ret);
}
+ @Override
+ public void produceMessage(int numMessages, PulsarClient client, String inputTopicName, LinkedHashMap<String, String> kvs) throws Exception {
+ if (!keyValueSchema) {
+ super.produceMessage(numMessages, client, inputTopicName, kvs);
+ return;
+ }
+
+ @Cleanup
+ Producer<KeyValue<KVSchemaKey, KVSchemaValue>> producer = client.newProducer(Schema.KeyValue(Schema.JSON(KVSchemaKey.class),
+ Schema.AVRO(KVSchemaValue.class), KeyValueEncodingType.SEPARATED))
+ .topic(inputTopicName)
+ .create();
+
+ for (int i = 0; i < numMessages; i++) {
+ String key = "key-" + i;
+ kvs.put(key, key);
+ producer.newMessage()
+ .value(new KeyValue<>(new KVSchemaKey(i), new KVSchemaValue("f1_" + i, "f2_" + i)))
+ .send();
+ }
+
+ }
+
@Override
public void validateSinkResult(Map<String, String> kvs) {
log.info("Query table content from postgres server: {}", tableName);
- String querySql = "SELECT * FROM " + tableName;
+ String querySql = "SELECT * FROM " + tableName + " ORDER BY field3";
ResultSet rs;
try {
// backend flush may not complete.
@@ -114,25 +160,35 @@ public class JdbcPostgresSinkTester extends SinkTester<PostgreSQLContainer> {
ResultSet.CONCUR_UPDATABLE);
rs = statement.executeQuery();
- if (kvs.get("ACTION").equals("DELETE")) {
+ if (!keyValueSchema && kvs.get("ACTION").equals("DELETE")) {
assertFalse(rs.first());
return;
}
+ int index = 0;
while (rs.next()) {
String field1 = rs.getString(1);
String field2 = rs.getString(2);
int field3 = rs.getInt(3);
-
- String value = kvs.get("key-" + field3);
-
- Foo obj = schema.decode(value.getBytes());
- assertEquals(obj.field1, field1);
- assertEquals(obj.field2, field2);
- assertEquals(obj.field3, field3);
+ if (keyValueSchema) {
+ assertEquals(field1, "f1_" + index);
+ assertEquals(field2, "f2_" + index);
+ assertEquals(field3, index);
+ } else {
+ String value = kvs.get("key-" + field3);
+ Foo obj = schema.decode(value.getBytes());
+ assertEquals(obj.field1, field1);
+ assertEquals(obj.field2, field2);
+ assertEquals(obj.field3, field3);
+ }
+ index++;
}
} catch (Exception e) {
log.error("Got exception: ", e);
- fail("Got exception when op sql.");
+ fail("Got exception when op sql: " + e);
}
}
+
+ public boolean isKeyValueSchema() {
+ return keyValueSchema;
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
index b6500e36a84..5fa96d77565 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
@@ -85,7 +85,7 @@ public class PulsarIOSinkRunner extends PulsarIOTestRunner {
// produce messages
Map<String, String> kvs;
- if (tester instanceof JdbcPostgresSinkTester) {
+ if (tester instanceof JdbcPostgresSinkTester && !((JdbcPostgresSinkTester)tester).isKeyValueSchema()) {
kvs = produceSchemaInsertMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcPostgresSinkTester.Foo.class));
// wait for sink to process messages
Failsafe.with(statusRetryPolicy).run(() ->
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
index 04eda16c57b..bec9c99e8a8 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
@@ -55,9 +55,9 @@ public class PulsarSinksTest extends PulsarIOTestBase {
testSink(new HdfsSinkTester(), false);
}
- @Test(groups = "sink")
- public void testJdbcSink() throws Exception {
- testSink(new JdbcPostgresSinkTester(), true);
+ @Test(groups = "sink", dataProvider = "withSchema")
+ public void testJdbcSink(boolean kvSchema) throws Exception {
+ testSink(new JdbcPostgresSinkTester(kvSchema), true);
}
@Test(groups = "sink", dataProvider = "withSchema")