You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/03 19:26:48 UTC

[GitHub] sijie closed pull request #2685: [schema] introduce AUTO_PRODUCE schema

sijie closed pull request #2685:  [schema] introduce AUTO_PRODUCE schema
URL: https://github.com/apache/pulsar/pull/2685
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index 24573d5bc9..e2c8747564 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.testng.Assert.fail;
+
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Sets;
 import java.time.Clock;
@@ -459,7 +461,7 @@ public void testAvroProducerAndAutoSchemaConsumer() throws Exception {
        }
 
        Consumer<GenericRecord> consumer = pulsarClient
-           .newConsumer(Schema.AUTO())
+           .newConsumer(Schema.AUTO_CONSUME())
            .topic("persistent://my-property/use/my-ns/my-topic1")
            .subscriptionName("my-subscriber-name")
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
@@ -507,7 +509,7 @@ public void testAvroProducerAndAutoSchemaReader() throws Exception {
        }
 
        Reader<GenericRecord> reader = pulsarClient
-           .newReader(Schema.AUTO())
+           .newReader(Schema.AUTO_CONSUME())
            .topic("persistent://my-property/use/my-ns/my-topic1")
            .startMessageId(MessageId.earliest)
            .create();
@@ -535,4 +537,76 @@ public void testAvroProducerAndAutoSchemaReader() throws Exception {
 
    }
 
+    @Test
+    public void testAutoBytesProducer() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        AvroSchema<AvroEncodedPojo> avroSchema =
+            AvroSchema.of(AvroEncodedPojo.class);
+
+        try (Producer<AvroEncodedPojo> producer = pulsarClient
+            .newProducer(avroSchema)
+            .topic("persistent://my-property/use/my-ns/my-topic1")
+            .create()) {
+            for (int i = 0; i < 10; i++) {
+                String message = "my-message-" + i;
+                producer.send(new AvroEncodedPojo(message));
+            }
+        }
+
+        try (Producer<byte[]> producer = pulsarClient
+            .newProducer(Schema.AUTO_PRODUCE_BYTES())
+            .topic("persistent://my-property/use/my-ns/my-topic1")
+            .create()) {
+            // try to produce junk data
+            for (int i = 10; i < 20; i++) {
+                String message = "my-message-" + i;
+                byte[] data = avroSchema.encode(new AvroEncodedPojo(message));
+                byte[] junkData = new byte[data.length / 2];
+                System.arraycopy(data, 0, junkData, 0, junkData.length);
+                try {
+                    producer.send(junkData);
+                    fail("Should fail on sending junk data");
+                } catch (SchemaSerializationException sse) {
+                    // expected
+                }
+            }
+
+            for (int i = 10; i < 20; i++) {
+                String message = "my-message-" + i;
+                byte[] data = avroSchema.encode(new AvroEncodedPojo(message));
+                producer.send(data);
+            }
+        }
+
+        Consumer<GenericRecord> consumer = pulsarClient
+            .newConsumer(Schema.AUTO_CONSUME())
+            .topic("persistent://my-property/use/my-ns/my-topic1")
+            .subscriptionName("my-subscriber-name")
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .subscribe();
+
+        Message<GenericRecord> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 20; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            GenericRecord receivedMessage = msg.getValue();
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            String actualMessage = (String) receivedMessage.getField("message");
+            testMessageOrderAndDuplicates(messageSet, actualMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+
+        SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService()
+            .getSchema("my-property/my-ns/my-topic1")
+            .get();
+
+        Assert.assertEquals(storedSchema.schema.getData(), avroSchema.getSchemaInfo().getSchema());
+
+        log.info("-- Exiting {} test --", methodName);
+
+    }
 }
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
index 113f26c50e..5432d43108 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -19,12 +19,20 @@
 package org.apache.pulsar.client.api;
 
 import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.ByteSchema;
 import org.apache.pulsar.client.impl.schema.BytesSchema;
+import org.apache.pulsar.client.impl.schema.DoubleSchema;
+import org.apache.pulsar.client.impl.schema.FloatSchema;
+import org.apache.pulsar.client.impl.schema.IntSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.LongSchema;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
+import org.apache.pulsar.client.impl.schema.ShortSchema;
 import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 /**
@@ -80,7 +88,43 @@
         return JSONSchema.of(clazz);
     }
 
+    @Deprecated
     static Schema<GenericRecord> AUTO() {
-        return new AutoSchema();
+        return AUTO_CONSUME();
+    }
+
+    static Schema<GenericRecord> AUTO_CONSUME() {
+        return new AutoConsumeSchema();
+    }
+
+    static Schema<byte[]> AUTO_PRODUCE_BYTES() {
+        return new AutoProduceBytesSchema();
+    }
+
+    static Schema<?> getSchema(SchemaInfo schemaInfo) {
+        switch (schemaInfo.getType()) {
+            case INT8:
+                return ByteSchema.of();
+            case INT16:
+                return ShortSchema.of();
+            case INT32:
+                return IntSchema.of();
+            case INT64:
+                return LongSchema.of();
+            case STRING:
+                return StringSchema.utf8();
+            case FLOAT:
+                return FloatSchema.of();
+            case DOUBLE:
+                return DoubleSchema.of();
+            case BYTES:
+                return BytesSchema.of();
+            case JSON:
+            case AVRO:
+                return GenericSchema.of(schemaInfo);
+            default:
+                throw new IllegalArgumentException("Retrieve schema instance from schema info for type '"
+                    + schemaInfo.getType() + "' is not supported yet");
+        }
     }
 }
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
similarity index 96%
rename from pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java
rename to pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 5bf92b71cb..dc90931d4f 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -28,7 +28,7 @@
 /**
  * Auto detect schema.
  */
-public class AutoSchema implements Schema<GenericRecord> {
+public class AutoConsumeSchema implements Schema<GenericRecord> {
 
     private Schema<GenericRecord> schema;
 
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
new file mode 100644
index 0000000000..36520a9ee7
--- /dev/null
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * Auto detect schema.
+ */
+public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
+
+    private Schema<T> schema;
+
+    public void setSchema(Schema<T> schema) {
+        this.schema = schema;
+    }
+
+    private void ensureSchemaInitialized() {
+        checkState(null != schema, "Schema is not initialized before used");
+    }
+
+    @Override
+    public byte[] encode(byte[] message) {
+        ensureSchemaInitialized();
+
+        // verify if the message can be decoded by the underlying schema
+        schema.decode(message);
+
+        return message;
+    }
+
+    @Override
+    public byte[] decode(byte[] bytes) {
+        ensureSchemaInitialized();
+
+        // verify the message can be detected by the underlying schema
+        schema.decode(bytes);
+
+        return bytes;
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        ensureSchemaInitialized();
+
+        return schema.getSchemaInfo();
+    }
+}
diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
index f07c928870..f00d05da0b 100644
--- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
+++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
@@ -25,7 +25,7 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
 import org.testng.annotations.Test;
@@ -53,7 +53,7 @@ public void testGenericJsonSchema() {
     @Test
     public void testAutoAvroSchema() {
         Schema<Foo> encodeSchema = Schema.AVRO(Foo.class);
-        AutoSchema decodeSchema = new AutoSchema();
+        AutoConsumeSchema decodeSchema = new AutoConsumeSchema();
         decodeSchema.setSchema(GenericSchema.of(encodeSchema.getSchemaInfo()));
         testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
     }
@@ -61,7 +61,7 @@ public void testAutoAvroSchema() {
     @Test
     public void testAutoJsonSchema() {
         Schema<Foo> encodeSchema = Schema.JSON(Foo.class);
-        AutoSchema decodeSchema = new AutoSchema();
+        AutoConsumeSchema decodeSchema = new AutoConsumeSchema();
         decodeSchema.setSchema(GenericSchema.of(encodeSchema.getSchemaInfo()));
         testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 9640c09dd7..bac6a85164 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -62,7 +62,8 @@
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
-import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
@@ -250,12 +251,12 @@ public ClientConfigurationData getConfiguration() {
           ProducerInterceptors<T> interceptors) {
         if (conf == null) {
             return FutureUtil.failedFuture(
-                    new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
+                new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
         }
 
-        if (schema instanceof AutoSchema) {
+        if (schema instanceof AutoConsumeSchema) {
             return FutureUtil.failedFuture(
-                    new PulsarClientException.InvalidConfigurationException("AutoSchema is only used by consumers to detect schemas automatically"));
+                new PulsarClientException.InvalidConfigurationException("AutoConsumeSchema is only used by consumers to detect schemas automatically"));
         }
 
         if (state.get() != State.Open) {
@@ -266,9 +267,30 @@ public ClientConfigurationData getConfiguration() {
 
         if (!TopicName.isValid(topic)) {
             return FutureUtil.failedFuture(
-                    new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'"));
+                new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'"));
         }
 
+        if (schema instanceof AutoProduceBytesSchema) {
+            AutoProduceBytesSchema autoProduceBytesSchema = (AutoProduceBytesSchema) schema;
+            return lookup.getSchema(TopicName.get(conf.getTopicName()))
+                    .thenCompose(schemaInfoOptional -> {
+                        if (schemaInfoOptional.isPresent()) {
+                            autoProduceBytesSchema.setSchema(Schema.getSchema(schemaInfoOptional.get()));
+                        } else {
+                            autoProduceBytesSchema.setSchema(Schema.BYTES);
+                        }
+                        return createProducerAsync(topic, conf, schema, interceptors);
+                    });
+        } else {
+            return createProducerAsync(topic, conf, schema, interceptors);
+        }
+
+    }
+
+    private <T> CompletableFuture<Producer<T>> createProducerAsync(String topic,
+                                                                   ProducerConfigurationData conf,
+                                                                   Schema<T> schema,
+                                                                   ProducerInterceptors<T> interceptors) {
         CompletableFuture<Producer<T>> producerCreatedFuture = new CompletableFuture<>();
 
         getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
@@ -392,15 +414,15 @@ public ClientConfigurationData getConfiguration() {
     }
 
     private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
-        if (schema instanceof AutoSchema) {
-            AutoSchema autoSchema = (AutoSchema) schema;
+        if (schema instanceof AutoConsumeSchema) {
+            AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema) schema;
             return lookup.getSchema(TopicName.get(conf.getSingleTopic()))
                     .thenCompose(schemaInfoOptional -> {
                         if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) {
                             GenericSchema genericSchema = GenericSchema.of(schemaInfoOptional.get());
                             log.info("Auto detected schema for topic {} : {}",
                                 conf.getSingleTopic(), new String(schemaInfoOptional.get().getSchema(), UTF_8));
-                            autoSchema.setSchema(genericSchema);
+                            autoConsumeSchema.setSchema(genericSchema);
                             return doSingleTopicSubscribeAsync(conf, schema, interceptors);
                         } else {
                             return FutureUtil.failedFuture(
@@ -546,15 +568,15 @@ public ClientConfigurationData getConfiguration() {
     }
 
     public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
-        if (schema instanceof AutoSchema) {
-            AutoSchema autoSchema = (AutoSchema) schema;
+        if (schema instanceof AutoConsumeSchema) {
+            AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema) schema;
             return lookup.getSchema(TopicName.get(conf.getTopicName()))
                     .thenCompose(schemaInfoOptional -> {
                         if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) {
                             GenericSchema genericSchema = GenericSchema.of(schemaInfoOptional.get());
                             log.info("Auto detected schema for topic {} : {}",
                                 conf.getTopicName(), new String(schemaInfoOptional.get().getSchema(), UTF_8));
-                            autoSchema.setSchema(genericSchema);
+                            autoConsumeSchema.setSchema(genericSchema);
                             return doCreateReaderAsync(conf, schema);
                         } else {
                             return FutureUtil.failedFuture(
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index a19d587e17..4b4e576b6d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -85,5 +85,16 @@
     /**
      * Auto Detect Schema Type.
      */
-    AUTO
+    @Deprecated
+    AUTO,
+
+    /**
+     * Auto Consume Type.
+     */
+    AUTO_CONSUME,
+
+    /**
+     * Auto Publish Type.
+     */
+    AUTO_PUBLISH
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index db9e880b68..cbf97f1330 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -77,7 +77,7 @@ public TopicSchema(PulsarClient client) {
      */
     private SchemaType getSchemaTypeOrDefault(String topic, Class<?> clazz) {
         if (GenericRecord.class.isAssignableFrom(clazz)) {
-            return SchemaType.AUTO;
+            return SchemaType.AUTO_CONSUME;
         } else if (byte[].class.equals(clazz)) {
             // if function uses bytes, we should ignore
             return SchemaType.NONE;
@@ -96,7 +96,7 @@ private static SchemaType getDefaultSchemaType(Class<?> clazz) {
             return SchemaType.NONE;
         } else if (GenericRecord.class.isAssignableFrom(clazz)) {
             // the function is taking generic record, so we do auto schema detection
-            return SchemaType.AUTO;
+            return SchemaType.AUTO_CONSUME;
         } else if (String.class.equals(clazz)) {
             // If type is String, then we use schema type string, otherwise we fallback on default schema
             return SchemaType.STRING;
@@ -113,8 +113,9 @@ private static SchemaType getDefaultSchemaType(Class<?> clazz) {
         case NONE:
             return (Schema<T>) Schema.BYTES;
 
+        case AUTO_CONSUME:
         case AUTO:
-            return (Schema<T>) Schema.AUTO();
+            return (Schema<T>) Schema.AUTO_CONSUME();
 
         case STRING:
             return (Schema<T>) Schema.STRING;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index b92ae27474..45a861bce1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -71,6 +71,7 @@
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -635,13 +636,15 @@ public Response triggerFunction(final String tenant, final String namespace, fin
             return Response.status(Status.BAD_REQUEST).build();
         }
         String outputTopic = functionMetaData.getFunctionDetails().getSink().getTopic();
-        Reader reader = null;
-        Producer producer = null;
+        Reader<byte[]> reader = null;
+        Producer<byte[]> producer = null;
         try {
             if (outputTopic != null && !outputTopic.isEmpty()) {
                 reader = worker().getClient().newReader().topic(outputTopic).startMessageId(MessageId.latest).create();
             }
-            producer = worker().getClient().newProducer().topic(inputTopicToWrite).create();
+            producer = worker().getClient().newProducer(Schema.AUTO_PRODUCE_BYTES())
+                .topic(inputTopicToWrite)
+                .create();
             byte[] targetArray;
             if (uploadedInputStream != null) {
                 targetArray = new byte[uploadedInputStream.available()];
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index 5067f55de7..24a7c6e883 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -30,7 +30,7 @@
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
 import org.apache.pulsar.functions.api.Record;
@@ -99,10 +99,10 @@ public void TestOpenAndWriteSink() throws Exception {
 
         byte[] bytes = schema.encode(obj);
         ByteBuf payload = Unpooled.copiedBuffer(bytes);
-        AutoSchema autoSchema = new AutoSchema();
-        autoSchema.setSchema(GenericSchema.of(schema.getSchemaInfo()));
+        AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+        autoConsumeSchema.setSchema(GenericSchema.of(schema.getSchemaInfo()));
 
-        Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", conf, payload, autoSchema);
+        Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", conf, payload, autoConsumeSchema);
         Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
             .message(message)
             .topicName("fake_topic_name")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services