You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/05/30 10:27:25 UTC

[pulsar] branch master updated: [improve][connector] JDBC Sinks: support KeyValue schema (#15807)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 29308e48b78 [improve][connector] JDBC Sinks: support KeyValue schema (#15807)
29308e48b78 is described below

commit 29308e48b78a01959ee8067dbcac67d0e903e8a4
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Mon May 30 12:27:14 2022 +0200

    [improve][connector] JDBC Sinks: support KeyValue schema (#15807)
    
    * [improve][connector] JDBC Sinks: support KeyValue schema
    
    * revert JDK
    
    * debug
    
    * checkstyle and itest
    
    * fix test
    
    * better error message
---
 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java     |  88 ++++++++++++-
 .../apache/pulsar/io/jdbc/JdbcAbstractSink.java    |   8 +-
 pulsar-io/jdbc/sqlite/pom.xml                      |   5 +
 .../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java  | 140 ++++++++++++++++++---
 .../io/sinks/JdbcPostgresSinkTester.java           |  88 ++++++++++---
 .../integration/io/sinks/PulsarIOSinkRunner.java   |   2 +-
 .../integration/io/sinks/PulsarSinksTest.java      |   6 +-
 7 files changed, 296 insertions(+), 41 deletions(-)

diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index d45be932797..a4a746ebd89 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -19,11 +19,20 @@
 
 package org.apache.pulsar.io.jdbc;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.Lists;
 import java.sql.PreparedStatement;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
 
@@ -31,13 +40,32 @@ import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
  * An abstract Jdbc sink, which interprets input Record in generic record.
  */
 @Slf4j
-public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {
+public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericObject> {
 
     @Override
     public void bindValue(PreparedStatement statement,
-                          Record<GenericRecord> message, String action) throws Exception {
+                          Record<GenericObject> message, String action) throws Exception {
+        final GenericObject record = message.getValue();
+        Function<String, Object> recordValueGetter;
+        if (message.getSchema() != null && message.getSchema() instanceof KeyValueSchema) {
+            KeyValueSchema<GenericObject, GenericObject> keyValueSchema = (KeyValueSchema) message.getSchema();
+
+            final org.apache.pulsar.client.api.Schema<GenericObject> keySchema = keyValueSchema.getKeySchema();
+            final org.apache.pulsar.client.api.Schema<GenericObject> valueSchema = keyValueSchema.getValueSchema();
+            KeyValue<GenericObject, GenericObject> keyValue =
+                    (KeyValue<GenericObject, GenericObject>) record.getNativeObject();
+
+            final GenericObject key = keyValue.getKey();
+            final GenericObject value = keyValue.getValue();
+
+            Map<String, Object> data = new HashMap<>();
+            fillKeyValueSchemaData(keySchema, key, data);
+            fillKeyValueSchemaData(valueSchema, value, data);
+            recordValueGetter = (k) -> data.get(k);
+        } else {
+            recordValueGetter = (key) -> ((GenericRecord) record).getField(key);
+        }
 
-        GenericRecord record = message.getValue();
         List<ColumnId> columns = Lists.newArrayList();
         if (action == null || action.equals(INSERT)) {
             columns = tableDefinition.getColumns();
@@ -56,7 +84,7 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericRec
                 log.debug("colName: {} colType: {}", colName, colType);
             }
             try {
-                Object obj = record.getField(colName);
+                Object obj = recordValueGetter.apply(colName);
                 if (obj != null) {
                     setColumnValue(statement, index++, obj);
                 } else {
@@ -73,7 +101,6 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericRec
                 }
                 setColumnNull(statement, index++, colType);
             }
-
         }
     }
 
@@ -107,5 +134,56 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericRec
             throw new Exception("Not support value type, need to add it. " + value.getClass());
         }
     }
+
+    private Object getValueFromJsonNode(final JsonNode fn) {
+        if (fn == null || fn.isNull()) {
+            return null;
+        }
+        if (fn.isContainerNode()) {
+            throw new IllegalArgumentException("Container nodes are not supported, the JSON must contains only "
+                    + "first level fields.");
+        } else if (fn.isBoolean()) {
+            return fn.asBoolean();
+        } else if (fn.isFloatingPointNumber()) {
+            return fn.asDouble();
+        } else if (fn.isBigInteger()) {
+            if (fn.canConvertToLong()) {
+                return fn.asLong();
+            } else {
+                return fn.asText();
+            }
+        } else if (fn.isNumber()) {
+            return fn.numberValue();
+        } else {
+            return fn.asText();
+        }
+    }
+
+    private void fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<GenericObject> schema,
+                                        GenericObject record,
+                                        Map<String, Object> data) {
+        switch (schema.getSchemaInfo().getType()) {
+            case JSON:
+                final JsonNode jsonNode = (JsonNode) record.getNativeObject();
+                final Iterator<String> fieldNames = jsonNode.fieldNames();
+                while (fieldNames.hasNext()) {
+                    String fieldName = fieldNames.next();
+                    final JsonNode nodeValue = jsonNode.get(fieldName);
+                    data.put(fieldName, getValueFromJsonNode(nodeValue));
+                }
+                break;
+            case AVRO:
+                org.apache.avro.generic.GenericRecord avroNode =
+                        (org.apache.avro.generic.GenericRecord) record.getNativeObject();
+                for (Schema.Field field : avroNode.getSchema().getFields()) {
+                    data.put(field.name(), avroNode.get(field.name()));
+                }
+                break;
+            default:
+                throw new IllegalArgumentException("unexpected schema type: "
+                        + schema.getSchemaInfo().getType()
+                        + " with KeyValueSchema");
+        }
+    }
 }
 
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 3bc1fe16e7f..0fba57dcb29 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -133,12 +133,16 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
 
     @Override
     public void close() throws Exception {
-        if (!connection.getAutoCommit()) {
+        if (connection != null && !connection.getAutoCommit()) {
             connection.commit();
         }
-        flushExecutor.shutdown();
+        if (flushExecutor != null) {
+            flushExecutor.shutdown();
+            flushExecutor = null;
+        }
         if (connection != null) {
             connection.close();
+            connection = null;
         }
         log.info("Closed jdbc connection: {}", jdbcUrl);
     }
diff --git a/pulsar-io/jdbc/sqlite/pom.xml b/pulsar-io/jdbc/sqlite/pom.xml
index d235930f112..2facee2aa50 100644
--- a/pulsar-io/jdbc/sqlite/pom.xml
+++ b/pulsar-io/jdbc/sqlite/pom.xml
@@ -54,6 +54,11 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index d6242740f5c..d92d437f664 100644
--- a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++ b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
@@ -24,22 +24,31 @@ import com.google.common.collect.Maps;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -66,7 +75,7 @@ public class SqliteJdbcSinkTest {
         private int field3;
     }
 
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     public void setUp() throws Exception {
         sqliteUtils.setUp();
         sqliteUtils.createTable(
@@ -111,7 +120,7 @@ public class SqliteJdbcSinkTest {
     }
 
     private void testOpenAndWriteSinkNullValue(Map<String, String> actionProperties) throws Exception {
-        Message<GenericRecord> insertMessage = mock(MessageImpl.class);
+        Message<GenericObject> insertMessage = mock(MessageImpl.class);
         GenericSchema<GenericRecord> genericAvroSchema;
         // prepare a foo Record
         Foo insertObj = new Foo();
@@ -123,7 +132,7 @@ public class SqliteJdbcSinkTest {
 
         byte[] insertBytes = schema.encode(insertObj);
         CompletableFuture<Void> future = new CompletableFuture<>();
-        Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
+        Record<GenericObject> insertRecord = PulsarRecord.<GenericObject>builder()
             .message(insertMessage)
             .topicName("fake_topic_name")
             .ackFunction(() -> future.complete(null))
@@ -155,7 +164,7 @@ public class SqliteJdbcSinkTest {
     }
 
     private void testOpenAndWriteSinkJson(Map<String, String> actionProperties) throws Exception {
-        Message<GenericRecord> insertMessage = mock(MessageImpl.class);
+        Message<GenericObject> insertMessage = mock(MessageImpl.class);
         GenericSchema<GenericRecord> genericAvroSchema;
         // prepare a foo Record
         Foo insertObj = new Foo();
@@ -166,7 +175,7 @@ public class SqliteJdbcSinkTest {
 
         byte[] insertBytes = schema.encode(insertObj);
         CompletableFuture<Void> future = new CompletableFuture<>();
-        Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
+        Record<GenericObject> insertRecord = PulsarRecord.<GenericObject>builder()
             .message(insertMessage)
             .topicName("fake_topic_name")
             .ackFunction(() -> future.complete(null))
@@ -198,7 +207,7 @@ public class SqliteJdbcSinkTest {
     }
 
     private void testOpenAndWriteSinkNullValueJson(Map<String, String> actionProperties) throws Exception {
-        Message<GenericRecord> insertMessage = mock(MessageImpl.class);
+        Message<GenericObject> insertMessage = mock(MessageImpl.class);
         GenericSchema<GenericRecord> genericAvroSchema;
         // prepare a foo Record
         Foo insertObj = new Foo();
@@ -210,7 +219,7 @@ public class SqliteJdbcSinkTest {
 
         byte[] insertBytes = schema.encode(insertObj);
         CompletableFuture<Void> future = new CompletableFuture<>();
-        Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
+        Record<GenericObject> insertRecord = PulsarRecord.<GenericObject>builder()
             .message(insertMessage)
             .topicName("fake_topic_name")
             .ackFunction(() -> future.complete(null))
@@ -228,6 +237,7 @@ public class SqliteJdbcSinkTest {
         jdbcSink.write(insertRecord);
         log.info("executed write");
         // sleep to wait backend flush complete
+        // sleep to wait backend flush complete
         future.get(1, TimeUnit.SECONDS);
 
         // value has been written to db, read it out and verify.
@@ -242,7 +252,7 @@ public class SqliteJdbcSinkTest {
     }
 
     private void testOpenAndWriteSink(Map<String, String> actionProperties) throws Exception {
-        Message<GenericRecord> insertMessage = mock(MessageImpl.class);
+        Message<GenericObject> insertMessage = mock(MessageImpl.class);
         GenericSchema<GenericRecord> genericAvroSchema;
         // prepare a foo Record
         Foo insertObj = new Foo();
@@ -253,7 +263,7 @@ public class SqliteJdbcSinkTest {
 
         byte[] insertBytes = schema.encode(insertObj);
         CompletableFuture<Void> future = new CompletableFuture<>();
-        Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
+        Record<GenericObject> insertRecord = PulsarRecord.<GenericObject>builder()
             .message(insertMessage)
             .topicName("fake_topic_name")
             .ackFunction(() -> future.complete(null))
@@ -311,7 +321,7 @@ public class SqliteJdbcSinkTest {
 
     @Test
     public void TestUnknownAction() throws Exception {
-        Record<GenericRecord> recordRecord = mock(Record.class);
+        Record<GenericObject> recordRecord = mock(Record.class);
         when(recordRecord.getProperties()).thenReturn(ImmutableMap.of("ACTION", "UNKNOWN"));
         CompletableFuture<Void> future = new CompletableFuture<>();
         doAnswer(a -> future.complete(null)).when(recordRecord).fail();
@@ -330,9 +340,9 @@ public class SqliteJdbcSinkTest {
         updateObj.setField3(4);
 
         byte[] updateBytes = schema.encode(updateObj);
-        Message<GenericRecord> updateMessage = mock(MessageImpl.class);
+        Message<GenericObject> updateMessage = mock(MessageImpl.class);
         CompletableFuture<Void> future = new CompletableFuture<>();
-        Record<GenericRecord> updateRecord = PulsarRecord.<GenericRecord>builder()
+        Record<GenericObject> updateRecord = PulsarRecord.<GenericObject>builder()
                 .message(updateMessage)
                 .topicName("fake_topic_name")
                 .ackFunction(() -> future.complete(null))
@@ -371,9 +381,9 @@ public class SqliteJdbcSinkTest {
         deleteObj.setField3(5);
 
         byte[] deleteBytes = schema.encode(deleteObj);
-        Message<GenericRecord> deleteMessage = mock(MessageImpl.class);
+        Message<GenericObject> deleteMessage = mock(MessageImpl.class);
         CompletableFuture<Void> future = new CompletableFuture<>();
-        Record<GenericRecord> deleteRecord = PulsarRecord.<GenericRecord>builder()
+        Record<GenericObject> deleteRecord = PulsarRecord.<GenericObject>builder()
                 .message(deleteMessage)
                 .topicName("fake_topic_name")
                 .ackFunction(() -> future.complete(null))
@@ -398,4 +408,106 @@ public class SqliteJdbcSinkTest {
         Assert.assertEquals(sqliteUtils.select(deleteQuerySql, (resultSet) -> {}), 0);
     }
 
+    @DataProvider(name = "schemaType")
+    public Object[] schemaType() {
+        return new Object[]{SchemaType.JSON, SchemaType.AVRO};
+    }
+
+
+    @Test(dataProvider = "schemaType")
+    public void testKeyValueSchema(SchemaType schemaType) throws Exception {
+        RecordSchemaBuilder keySchemaBuilder = org.apache.pulsar.client.api.schema.SchemaBuilder.record("key");
+        keySchemaBuilder.field("key").type(SchemaType.STRING).optional().defaultValue(null);
+        GenericSchema<GenericRecord> keySchema = Schema.generic(keySchemaBuilder.build(schemaType));
+        GenericRecord keyGenericRecord = keySchema.newRecordBuilder()
+                .set("key", "mykey")
+                .build();
+
+        RecordSchemaBuilder valueSchemaBuilder = org.apache.pulsar.client.api.schema.SchemaBuilder.record("value");
+        valueSchemaBuilder.field("string").type(SchemaType.STRING).optional().defaultValue(null);
+        valueSchemaBuilder.field("int").type(SchemaType.INT32).optional().defaultValue(null);
+        valueSchemaBuilder.field("bool").type(SchemaType.BOOLEAN).optional().defaultValue(null);
+        valueSchemaBuilder.field("double").type(SchemaType.DOUBLE).optional().defaultValue(null);
+        valueSchemaBuilder.field("float").type(SchemaType.FLOAT).optional().defaultValue(null);
+        valueSchemaBuilder.field("long").type(SchemaType.INT64).optional().defaultValue(null);
+        GenericSchema<GenericRecord> valueSchema = Schema.generic(valueSchemaBuilder.build(schemaType));
+
+        GenericRecord valueGenericRecord = valueSchema.newRecordBuilder()
+                .set("string", "thestring")
+                .set("int", Integer.MAX_VALUE)
+                .set("bool", true)
+                .set("double", Double.MAX_VALUE)
+                .set("float", Float.MAX_VALUE)
+                .set("long", Long.MIN_VALUE)
+                .build();
+
+        Schema<KeyValue<GenericRecord, GenericRecord>> keyValueSchema = Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE);
+        KeyValue<GenericRecord, GenericRecord> keyValue = new KeyValue<>(keyGenericRecord, valueGenericRecord);
+        GenericObject genericObject = new GenericObject() {
+            @Override
+            public SchemaType getSchemaType() {
+                return SchemaType.KEY_VALUE;
+            }
+
+            @Override
+            public Object getNativeObject() {
+                return keyValue;
+            }
+        };
+        Record<GenericObject> genericObjectRecord = new Record<>() {
+            @Override
+            public Optional<String> getTopicName() {
+                return Optional.of("topic");
+            }
+
+            @Override
+            public org.apache.pulsar.client.api.Schema getSchema() {
+                return keyValueSchema;
+            }
+
+            @Override
+            public GenericObject getValue() {
+                return genericObject;
+            }
+        };
+        jdbcSink.close();
+        sqliteUtils.createTable(
+                "CREATE TABLE kvtable (" +
+                        "    key  TEXT," +
+                        "    int  INTEGER," +
+                        "    nulltext  TEXT," +
+                        "    bool  NUMERIC," +
+                        "    double NUMERIC," +
+                        "    float NUMERIC," +
+                        "    long INTEGER," +
+                        "PRIMARY KEY (key));"
+        );
+        String jdbcUrl = sqliteUtils.sqliteUri();
+
+        Map<String, Object> conf = Maps.newHashMap();
+        conf.put("jdbcUrl", jdbcUrl);
+        conf.put("tableName", "kvtable");
+        conf.put("key", "key");
+        conf.put("nonKey", "long,int,double,float,bool,nulltext");
+        // change batchSize to 1, to flush on each write.
+        conf.put("batchSize", 1);
+        try (SqliteJdbcAutoSchemaSink kvSchemaJdbcSink = new SqliteJdbcAutoSchemaSink();) {
+            kvSchemaJdbcSink.open(conf, null);
+            kvSchemaJdbcSink.write(genericObjectRecord);
+
+            Awaitility.await().untilAsserted(() -> {
+                final int count = sqliteUtils.select("select int,bool,double,float,long,nulltext from kvtable where key='mykey'", (resultSet) -> {
+                    int index = 1;
+                    Assert.assertEquals(resultSet.getInt(index++), Integer.MAX_VALUE);
+                    Assert.assertEquals(resultSet.getBoolean(index++), true);
+                    Assert.assertEquals(resultSet.getDouble(index++), Double.MAX_VALUE);
+                    Assert.assertEquals(resultSet.getFloat(index++), Float.MAX_VALUE);
+                    Assert.assertEquals(resultSet.getLong(index++), Long.MIN_VALUE);
+                    Assert.assertNull(resultSet.getString(index++));
+                });
+                Assert.assertEquals(count, 1);
+            });
+        }
+    }
+
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/JdbcPostgresSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/JdbcPostgresSinkTester.java
index a7be52fc372..e67626580b7 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/JdbcPostgresSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/JdbcPostgresSinkTester.java
@@ -22,12 +22,19 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.util.LinkedHashMap;
 import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Cleanup;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.containers.PostgreSQLContainer;
 
@@ -42,9 +49,6 @@ import static org.testng.Assert.fail;
 @Slf4j
 public class JdbcPostgresSinkTester extends SinkTester<PostgreSQLContainer> {
 
-    /**
-     * A Simple class to test jdbc class,
-     */
     @Data
     public static class Foo {
         private String field1;
@@ -52,15 +56,30 @@ public class JdbcPostgresSinkTester extends SinkTester<PostgreSQLContainer> {
         private int field3;
     }
 
+    @Data
+    @AllArgsConstructor
+    public static class KVSchemaKey {
+        private int field3;
+    }
+
+    @Data
+    @AllArgsConstructor
+    public static class KVSchemaValue {
+        private String field1;
+        private String field2;
+    }
+
     private static final String NAME = "jdbc-postgres";
     private static final String POSTGRES = "postgres";
 
     private final AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
     private final String tableName = "test";
     private Connection connection;
+    private boolean keyValueSchema;
 
-    public JdbcPostgresSinkTester() {
+    public JdbcPostgresSinkTester(boolean keyValueSchema) {
         super(NAME, SinkType.JDBC_POSTGRES);
+        this.keyValueSchema = keyValueSchema;
 
         // container default value is test
         sinkConfig.put("userName", "test");
@@ -73,12 +92,16 @@ public class JdbcPostgresSinkTester extends SinkTester<PostgreSQLContainer> {
 
     @Override
     public Schema<?> getInputTopicSchema() {
-        return schema;
+        if (keyValueSchema) {
+            return Schema.AUTO_CONSUME();
+        } else {
+            return schema;
+        }
     }
 
     @Override
     protected PostgreSQLContainer createSinkService(PulsarCluster cluster) {
-        return (PostgreSQLContainer) new PostgreSQLContainer()
+        return (PostgreSQLContainer) new PostgreSQLContainer("postgres:14.3")
             .withNetworkAliases(POSTGRES);
     }
 
@@ -100,10 +123,33 @@ public class JdbcPostgresSinkTester extends SinkTester<PostgreSQLContainer> {
         log.info("created table in jdbc: {}, return value: {}", createTable, ret);
     }
 
+    @Override
+    public void produceMessage(int numMessages, PulsarClient client, String inputTopicName, LinkedHashMap<String, String> kvs) throws Exception {
+        if (!keyValueSchema) {
+            super.produceMessage(numMessages, client, inputTopicName, kvs);
+            return;
+        }
+
+        @Cleanup
+        Producer<KeyValue<KVSchemaKey, KVSchemaValue>> producer = client.newProducer(Schema.KeyValue(Schema.JSON(KVSchemaKey.class),
+                        Schema.AVRO(KVSchemaValue.class), KeyValueEncodingType.SEPARATED))
+                .topic(inputTopicName)
+                .create();
+
+        for (int i = 0; i < numMessages; i++) {
+            String key = "key-" + i;
+            kvs.put(key, key);
+            producer.newMessage()
+                    .value(new KeyValue<>(new KVSchemaKey(i), new KVSchemaValue("f1_" + i, "f2_" + i)))
+                    .send();
+        }
+
+    }
+
     @Override
     public void validateSinkResult(Map<String, String> kvs) {
         log.info("Query table content from postgres server: {}", tableName);
-        String querySql = "SELECT * FROM " + tableName;
+        String querySql = "SELECT * FROM " + tableName + " ORDER BY field3";
         ResultSet rs;
         try {
             // backend flush may not complete.
@@ -114,25 +160,35 @@ public class JdbcPostgresSinkTester extends SinkTester<PostgreSQLContainer> {
                 ResultSet.CONCUR_UPDATABLE);
             rs = statement.executeQuery();
 
-            if (kvs.get("ACTION").equals("DELETE")) {
+            if (!keyValueSchema && kvs.get("ACTION").equals("DELETE")) {
                 assertFalse(rs.first());
                 return;
             }
+            int index = 0;
             while (rs.next()) {
                 String field1 = rs.getString(1);
                 String field2 = rs.getString(2);
                 int field3 = rs.getInt(3);
-
-                String value = kvs.get("key-" + field3);
-
-                Foo obj = schema.decode(value.getBytes());
-                assertEquals(obj.field1, field1);
-                assertEquals(obj.field2, field2);
-                assertEquals(obj.field3, field3);
+                if (keyValueSchema) {
+                    assertEquals(field1, "f1_" + index);
+                    assertEquals(field2, "f2_" + index);
+                    assertEquals(field3, index);
+                } else {
+                    String value = kvs.get("key-" + field3);
+                    Foo obj = schema.decode(value.getBytes());
+                    assertEquals(obj.field1, field1);
+                    assertEquals(obj.field2, field2);
+                    assertEquals(obj.field3, field3);
+                }
+                index++;
             }
         } catch (Exception e) {
             log.error("Got exception: ", e);
-            fail("Got exception when op sql.");
+            fail("Got exception when op sql: " + e);
         }
     }
+
+    public boolean isKeyValueSchema() {
+        return keyValueSchema;
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
index b6500e36a84..5fa96d77565 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
@@ -85,7 +85,7 @@ public class PulsarIOSinkRunner extends PulsarIOTestRunner {
 
             // produce messages
             Map<String, String> kvs;
-            if (tester instanceof JdbcPostgresSinkTester) {
+            if (tester instanceof JdbcPostgresSinkTester && !((JdbcPostgresSinkTester)tester).isKeyValueSchema()) {
                 kvs = produceSchemaInsertMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(JdbcPostgresSinkTester.Foo.class));
                 // wait for sink to process messages
                 Failsafe.with(statusRetryPolicy).run(() ->
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
index 04eda16c57b..bec9c99e8a8 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
@@ -55,9 +55,9 @@ public class PulsarSinksTest extends PulsarIOTestBase {
         testSink(new HdfsSinkTester(), false);
     }
 
-    @Test(groups = "sink")
-    public void testJdbcSink() throws Exception {
-        testSink(new JdbcPostgresSinkTester(), true);
+    @Test(groups = "sink", dataProvider = "withSchema")
+    public void testJdbcSink(boolean kvSchema) throws Exception {
+        testSink(new JdbcPostgresSinkTester(kvSchema), true);
     }
 
     @Test(groups = "sink", dataProvider = "withSchema")