You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/03 19:26:51 UTC

[pulsar] branch master updated: [schema] introduce AUTO_PRODUCE schema (#2685)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f37326   [schema] introduce AUTO_PRODUCE schema (#2685)
1f37326 is described below

commit 1f3732624bf8c398385175a4095fca8a3bcbf829
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Wed Oct 3 12:26:46 2018 -0700

     [schema] introduce AUTO_PRODUCE schema (#2685)
    
    *Motivation*
    
    Currently trigger function is broken due to the schema enforcement we added recently:
    A producer without schema can't produce messages into a topic with schema.
    
    *Changes*
    
    - Rename `AUTO` to `AUTO_CONSUME`
    - Introduce `AUTO_PRODUCE` schema. The schema produces `byte[]`, but it will validate
      the bytes are compatible with the schema associated with the topic before producing.
    - Change trigger function to use `AUTO_PRODUCE` schema.
---
 .../api/SimpleTypedProducerConsumerTest.java       | 78 +++++++++++++++++++++-
 .../java/org/apache/pulsar/client/api/Schema.java  | 48 ++++++++++++-
 .../{AutoSchema.java => AutoConsumeSchema.java}    |  2 +-
 ...AutoSchema.java => AutoProduceBytesSchema.java} | 21 +++---
 .../impl/schema/generic/GenericSchemaTest.java     |  6 +-
 .../pulsar/client/impl/PulsarClientImpl.java       | 44 +++++++++---
 .../apache/pulsar/common/schema/SchemaType.java    | 13 +++-
 .../pulsar/functions/source/TopicSchema.java       |  7 +-
 .../functions/worker/rest/api/FunctionsImpl.java   |  9 ++-
 .../org/apache/pulsar/io/jdbc/JdbcSinkTest.java    |  8 +--
 10 files changed, 198 insertions(+), 38 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index 24573d5..e2c8747 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.testng.Assert.fail;
+
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Sets;
 import java.time.Clock;
@@ -459,7 +461,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
        }
 
        Consumer<GenericRecord> consumer = pulsarClient
-           .newConsumer(Schema.AUTO())
+           .newConsumer(Schema.AUTO_CONSUME())
            .topic("persistent://my-property/use/my-ns/my-topic1")
            .subscriptionName("my-subscriber-name")
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
@@ -507,7 +509,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
        }
 
        Reader<GenericRecord> reader = pulsarClient
-           .newReader(Schema.AUTO())
+           .newReader(Schema.AUTO_CONSUME())
            .topic("persistent://my-property/use/my-ns/my-topic1")
            .startMessageId(MessageId.earliest)
            .create();
@@ -535,4 +537,76 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
 
    }
 
+    @Test
+    public void testAutoBytesProducer() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        AvroSchema<AvroEncodedPojo> avroSchema =
+            AvroSchema.of(AvroEncodedPojo.class);
+
+        try (Producer<AvroEncodedPojo> producer = pulsarClient
+            .newProducer(avroSchema)
+            .topic("persistent://my-property/use/my-ns/my-topic1")
+            .create()) {
+            for (int i = 0; i < 10; i++) {
+                String message = "my-message-" + i;
+                producer.send(new AvroEncodedPojo(message));
+            }
+        }
+
+        try (Producer<byte[]> producer = pulsarClient
+            .newProducer(Schema.AUTO_PRODUCE_BYTES())
+            .topic("persistent://my-property/use/my-ns/my-topic1")
+            .create()) {
+            // try to produce junk data
+            for (int i = 10; i < 20; i++) {
+                String message = "my-message-" + i;
+                byte[] data = avroSchema.encode(new AvroEncodedPojo(message));
+                byte[] junkData = new byte[data.length / 2];
+                System.arraycopy(data, 0, junkData, 0, junkData.length);
+                try {
+                    producer.send(junkData);
+                    fail("Should fail on sending junk data");
+                } catch (SchemaSerializationException sse) {
+                    // expected
+                }
+            }
+
+            for (int i = 10; i < 20; i++) {
+                String message = "my-message-" + i;
+                byte[] data = avroSchema.encode(new AvroEncodedPojo(message));
+                producer.send(data);
+            }
+        }
+
+        Consumer<GenericRecord> consumer = pulsarClient
+            .newConsumer(Schema.AUTO_CONSUME())
+            .topic("persistent://my-property/use/my-ns/my-topic1")
+            .subscriptionName("my-subscriber-name")
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .subscribe();
+
+        Message<GenericRecord> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 20; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            GenericRecord receivedMessage = msg.getValue();
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            String actualMessage = (String) receivedMessage.getField("message");
+            testMessageOrderAndDuplicates(messageSet, actualMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+
+        SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService()
+            .getSchema("my-property/my-ns/my-topic1")
+            .get();
+
+        Assert.assertEquals(storedSchema.schema.getData(), avroSchema.getSchemaInfo().getSchema());
+
+        log.info("-- Exiting {} test --", methodName);
+
+    }
 }
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
index 113f26c..5432d43 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -19,12 +19,20 @@
 package org.apache.pulsar.client.api;
 
 import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.ByteSchema;
 import org.apache.pulsar.client.impl.schema.BytesSchema;
+import org.apache.pulsar.client.impl.schema.DoubleSchema;
+import org.apache.pulsar.client.impl.schema.FloatSchema;
+import org.apache.pulsar.client.impl.schema.IntSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.LongSchema;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
+import org.apache.pulsar.client.impl.schema.ShortSchema;
 import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 /**
@@ -80,7 +88,43 @@ public interface Schema<T> {
         return JSONSchema.of(clazz);
     }
 
+    @Deprecated
     static Schema<GenericRecord> AUTO() {
-        return new AutoSchema();
+        return AUTO_CONSUME();
+    }
+
+    static Schema<GenericRecord> AUTO_CONSUME() {
+        return new AutoConsumeSchema();
+    }
+
+    static Schema<byte[]> AUTO_PRODUCE_BYTES() {
+        return new AutoProduceBytesSchema();
+    }
+
+    static Schema<?> getSchema(SchemaInfo schemaInfo) {
+        switch (schemaInfo.getType()) {
+            case INT8:
+                return ByteSchema.of();
+            case INT16:
+                return ShortSchema.of();
+            case INT32:
+                return IntSchema.of();
+            case INT64:
+                return LongSchema.of();
+            case STRING:
+                return StringSchema.utf8();
+            case FLOAT:
+                return FloatSchema.of();
+            case DOUBLE:
+                return DoubleSchema.of();
+            case BYTES:
+                return BytesSchema.of();
+            case JSON:
+            case AVRO:
+                return GenericSchema.of(schemaInfo);
+            default:
+                throw new IllegalArgumentException("Retrieve schema instance from schema info for type '"
+                    + schemaInfo.getType() + "' is not supported yet");
+        }
     }
 }
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
similarity index 96%
copy from pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java
copy to pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 5bf92b7..dc90931 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -28,7 +28,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 /**
  * Auto detect schema.
  */
-public class AutoSchema implements Schema<GenericRecord> {
+public class AutoConsumeSchema implements Schema<GenericRecord> {
 
     private Schema<GenericRecord> schema;
 
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
similarity index 75%
rename from pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java
rename to pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index 5bf92b7..36520a9 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -22,17 +22,16 @@ package org.apache.pulsar.client.impl.schema;
 import static com.google.common.base.Preconditions.checkState;
 
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 /**
  * Auto detect schema.
  */
-public class AutoSchema implements Schema<GenericRecord> {
+public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
 
-    private Schema<GenericRecord> schema;
+    private Schema<T> schema;
 
-    public void setSchema(Schema<GenericRecord> schema) {
+    public void setSchema(Schema<T> schema) {
         this.schema = schema;
     }
 
@@ -41,17 +40,23 @@ public class AutoSchema implements Schema<GenericRecord> {
     }
 
     @Override
-    public byte[] encode(GenericRecord message) {
+    public byte[] encode(byte[] message) {
         ensureSchemaInitialized();
 
-        return schema.encode(message);
+        // verify if the message can be decoded by the underlying schema
+        schema.decode(message);
+
+        return message;
     }
 
     @Override
-    public GenericRecord decode(byte[] bytes) {
+    public byte[] decode(byte[] bytes) {
         ensureSchemaInitialized();
 
-        return schema.decode(bytes);
+        // verify the message can be detected by the underlying schema
+        schema.decode(bytes);
+
+        return bytes;
     }
 
     @Override
diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
index f07c928..f00d05d 100644
--- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
+++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
@@ -25,7 +25,7 @@ import static org.testng.Assert.assertTrue;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
 import org.testng.annotations.Test;
@@ -53,7 +53,7 @@ public class GenericSchemaTest {
     @Test
     public void testAutoAvroSchema() {
         Schema<Foo> encodeSchema = Schema.AVRO(Foo.class);
-        AutoSchema decodeSchema = new AutoSchema();
+        AutoConsumeSchema decodeSchema = new AutoConsumeSchema();
         decodeSchema.setSchema(GenericSchema.of(encodeSchema.getSchemaInfo()));
         testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
     }
@@ -61,7 +61,7 @@ public class GenericSchemaTest {
     @Test
     public void testAutoJsonSchema() {
         Schema<Foo> encodeSchema = Schema.JSON(Foo.class);
-        AutoSchema decodeSchema = new AutoSchema();
+        AutoConsumeSchema decodeSchema = new AutoConsumeSchema();
         decodeSchema.setSchema(GenericSchema.of(encodeSchema.getSchemaInfo()));
         testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 9640c09..bac6a85 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -62,7 +62,8 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
-import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
@@ -250,12 +251,12 @@ public class PulsarClientImpl implements PulsarClient {
           ProducerInterceptors<T> interceptors) {
         if (conf == null) {
             return FutureUtil.failedFuture(
-                    new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
+                new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
         }
 
-        if (schema instanceof AutoSchema) {
+        if (schema instanceof AutoConsumeSchema) {
             return FutureUtil.failedFuture(
-                    new PulsarClientException.InvalidConfigurationException("AutoSchema is only used by consumers to detect schemas automatically"));
+                new PulsarClientException.InvalidConfigurationException("AutoConsumeSchema is only used by consumers to detect schemas automatically"));
         }
 
         if (state.get() != State.Open) {
@@ -266,9 +267,30 @@ public class PulsarClientImpl implements PulsarClient {
 
         if (!TopicName.isValid(topic)) {
             return FutureUtil.failedFuture(
-                    new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'"));
+                new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'"));
         }
 
+        if (schema instanceof AutoProduceBytesSchema) {
+            AutoProduceBytesSchema autoProduceBytesSchema = (AutoProduceBytesSchema) schema;
+            return lookup.getSchema(TopicName.get(conf.getTopicName()))
+                    .thenCompose(schemaInfoOptional -> {
+                        if (schemaInfoOptional.isPresent()) {
+                            autoProduceBytesSchema.setSchema(Schema.getSchema(schemaInfoOptional.get()));
+                        } else {
+                            autoProduceBytesSchema.setSchema(Schema.BYTES);
+                        }
+                        return createProducerAsync(topic, conf, schema, interceptors);
+                    });
+        } else {
+            return createProducerAsync(topic, conf, schema, interceptors);
+        }
+
+    }
+
+    private <T> CompletableFuture<Producer<T>> createProducerAsync(String topic,
+                                                                   ProducerConfigurationData conf,
+                                                                   Schema<T> schema,
+                                                                   ProducerInterceptors<T> interceptors) {
         CompletableFuture<Producer<T>> producerCreatedFuture = new CompletableFuture<>();
 
         getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
@@ -392,15 +414,15 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
-        if (schema instanceof AutoSchema) {
-            AutoSchema autoSchema = (AutoSchema) schema;
+        if (schema instanceof AutoConsumeSchema) {
+            AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema) schema;
             return lookup.getSchema(TopicName.get(conf.getSingleTopic()))
                     .thenCompose(schemaInfoOptional -> {
                         if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) {
                             GenericSchema genericSchema = GenericSchema.of(schemaInfoOptional.get());
                             log.info("Auto detected schema for topic {} : {}",
                                 conf.getSingleTopic(), new String(schemaInfoOptional.get().getSchema(), UTF_8));
-                            autoSchema.setSchema(genericSchema);
+                            autoConsumeSchema.setSchema(genericSchema);
                             return doSingleTopicSubscribeAsync(conf, schema, interceptors);
                         } else {
                             return FutureUtil.failedFuture(
@@ -546,15 +568,15 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
-        if (schema instanceof AutoSchema) {
-            AutoSchema autoSchema = (AutoSchema) schema;
+        if (schema instanceof AutoConsumeSchema) {
+            AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema) schema;
             return lookup.getSchema(TopicName.get(conf.getTopicName()))
                     .thenCompose(schemaInfoOptional -> {
                         if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) {
                             GenericSchema genericSchema = GenericSchema.of(schemaInfoOptional.get());
                             log.info("Auto detected schema for topic {} : {}",
                                 conf.getTopicName(), new String(schemaInfoOptional.get().getSchema(), UTF_8));
-                            autoSchema.setSchema(genericSchema);
+                            autoConsumeSchema.setSchema(genericSchema);
                             return doCreateReaderAsync(conf, schema);
                         } else {
                             return FutureUtil.failedFuture(
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index a19d587..4b4e576 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -85,5 +85,16 @@ public enum SchemaType {
     /**
      * Auto Detect Schema Type.
      */
-    AUTO
+    @Deprecated
+    AUTO,
+
+    /**
+     * Auto Consume Type.
+     */
+    AUTO_CONSUME,
+
+    /**
+     * Auto Publish Type.
+     */
+    AUTO_PUBLISH
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index db9e880..cbf97f1 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -77,7 +77,7 @@ public class TopicSchema {
      */
     private SchemaType getSchemaTypeOrDefault(String topic, Class<?> clazz) {
         if (GenericRecord.class.isAssignableFrom(clazz)) {
-            return SchemaType.AUTO;
+            return SchemaType.AUTO_CONSUME;
         } else if (byte[].class.equals(clazz)) {
             // if function uses bytes, we should ignore
             return SchemaType.NONE;
@@ -96,7 +96,7 @@ public class TopicSchema {
             return SchemaType.NONE;
         } else if (GenericRecord.class.isAssignableFrom(clazz)) {
             // the function is taking generic record, so we do auto schema detection
-            return SchemaType.AUTO;
+            return SchemaType.AUTO_CONSUME;
         } else if (String.class.equals(clazz)) {
             // If type is String, then we use schema type string, otherwise we fallback on default schema
             return SchemaType.STRING;
@@ -113,8 +113,9 @@ public class TopicSchema {
         case NONE:
             return (Schema<T>) Schema.BYTES;
 
+        case AUTO_CONSUME:
         case AUTO:
-            return (Schema<T>) Schema.AUTO();
+            return (Schema<T>) Schema.AUTO_CONSUME();
 
         case STRING:
             return (Schema<T>) Schema.STRING;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 4aa3016..ee3b480 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -71,6 +71,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -651,13 +652,15 @@ public class FunctionsImpl {
             return Response.status(Status.BAD_REQUEST).build();
         }
         String outputTopic = functionMetaData.getFunctionDetails().getSink().getTopic();
-        Reader reader = null;
-        Producer producer = null;
+        Reader<byte[]> reader = null;
+        Producer<byte[]> producer = null;
         try {
             if (outputTopic != null && !outputTopic.isEmpty()) {
                 reader = worker().getClient().newReader().topic(outputTopic).startMessageId(MessageId.latest).create();
             }
-            producer = worker().getClient().newProducer().topic(inputTopicToWrite).create();
+            producer = worker().getClient().newProducer(Schema.AUTO_PRODUCE_BYTES())
+                .topic(inputTopicToWrite)
+                .create();
             byte[] targetArray;
             if (uploadedInputStream != null) {
                 targetArray = new byte[uploadedInputStream.available()];
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index 5067f55..24a7c6e 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -30,7 +30,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
 import org.apache.pulsar.functions.api.Record;
@@ -99,10 +99,10 @@ public class JdbcSinkTest {
 
         byte[] bytes = schema.encode(obj);
         ByteBuf payload = Unpooled.copiedBuffer(bytes);
-        AutoSchema autoSchema = new AutoSchema();
-        autoSchema.setSchema(GenericSchema.of(schema.getSchemaInfo()));
+        AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+        autoConsumeSchema.setSchema(GenericSchema.of(schema.getSchemaInfo()));
 
-        Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", conf, payload, autoSchema);
+        Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", conf, payload, autoConsumeSchema);
         Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
             .message(message)
             .topicName("fake_topic_name")