You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/03 19:26:51 UTC
[pulsar] branch master updated: [schema] introduce AUTO_PRODUCE
schema (#2685)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1f37326 [schema] introduce AUTO_PRODUCE schema (#2685)
1f37326 is described below
commit 1f3732624bf8c398385175a4095fca8a3bcbf829
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Wed Oct 3 12:26:46 2018 -0700
[schema] introduce AUTO_PRODUCE schema (#2685)
*Motivation*
Currently trigger function is broken due to the schema enforcement we added recently:
A producer without schema can't produce messages into a topic with schema.
*Changes*
- Rename `AUTO` to `AUTO_CONSUME`
- Introduce `AUTO_PRODUCE` schema. The schema produces `byte[]`, but it will validate
the bytes are compatible with the schema associated with the topic before producing.
- Change trigger function to use `AUTO_PRODUCE` schema.
---
.../api/SimpleTypedProducerConsumerTest.java | 78 +++++++++++++++++++++-
.../java/org/apache/pulsar/client/api/Schema.java | 48 ++++++++++++-
.../{AutoSchema.java => AutoConsumeSchema.java} | 2 +-
...AutoSchema.java => AutoProduceBytesSchema.java} | 21 +++---
.../impl/schema/generic/GenericSchemaTest.java | 6 +-
.../pulsar/client/impl/PulsarClientImpl.java | 44 +++++++++---
.../apache/pulsar/common/schema/SchemaType.java | 13 +++-
.../pulsar/functions/source/TopicSchema.java | 7 +-
.../functions/worker/rest/api/FunctionsImpl.java | 9 ++-
.../org/apache/pulsar/io/jdbc/JdbcSinkTest.java | 8 +--
10 files changed, 198 insertions(+), 38 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index 24573d5..e2c8747 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.api;
+import static org.testng.Assert.fail;
+
import com.google.common.base.MoreObjects;
import com.google.common.collect.Sets;
import java.time.Clock;
@@ -459,7 +461,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
}
Consumer<GenericRecord> consumer = pulsarClient
- .newConsumer(Schema.AUTO())
+ .newConsumer(Schema.AUTO_CONSUME())
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("my-subscriber-name")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
@@ -507,7 +509,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
}
Reader<GenericRecord> reader = pulsarClient
- .newReader(Schema.AUTO())
+ .newReader(Schema.AUTO_CONSUME())
.topic("persistent://my-property/use/my-ns/my-topic1")
.startMessageId(MessageId.earliest)
.create();
@@ -535,4 +537,76 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
}
+ @Test
+ public void testAutoBytesProducer() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ AvroSchema<AvroEncodedPojo> avroSchema =
+ AvroSchema.of(AvroEncodedPojo.class);
+
+ try (Producer<AvroEncodedPojo> producer = pulsarClient
+ .newProducer(avroSchema)
+ .topic("persistent://my-property/use/my-ns/my-topic1")
+ .create()) {
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ producer.send(new AvroEncodedPojo(message));
+ }
+ }
+
+ try (Producer<byte[]> producer = pulsarClient
+ .newProducer(Schema.AUTO_PRODUCE_BYTES())
+ .topic("persistent://my-property/use/my-ns/my-topic1")
+ .create()) {
+ // try to produce junk data
+ for (int i = 10; i < 20; i++) {
+ String message = "my-message-" + i;
+ byte[] data = avroSchema.encode(new AvroEncodedPojo(message));
+ byte[] junkData = new byte[data.length / 2];
+ System.arraycopy(data, 0, junkData, 0, junkData.length);
+ try {
+ producer.send(junkData);
+ fail("Should fail on sending junk data");
+ } catch (SchemaSerializationException sse) {
+ // expected
+ }
+ }
+
+ for (int i = 10; i < 20; i++) {
+ String message = "my-message-" + i;
+ byte[] data = avroSchema.encode(new AvroEncodedPojo(message));
+ producer.send(data);
+ }
+ }
+
+ Consumer<GenericRecord> consumer = pulsarClient
+ .newConsumer(Schema.AUTO_CONSUME())
+ .topic("persistent://my-property/use/my-ns/my-topic1")
+ .subscriptionName("my-subscriber-name")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Message<GenericRecord> msg = null;
+ Set<String> messageSet = Sets.newHashSet();
+ for (int i = 0; i < 20; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ GenericRecord receivedMessage = msg.getValue();
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ String actualMessage = (String) receivedMessage.getField("message");
+ testMessageOrderAndDuplicates(messageSet, actualMessage, expectedMessage);
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+
+ SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService()
+ .getSchema("my-property/my-ns/my-topic1")
+ .get();
+
+ Assert.assertEquals(storedSchema.schema.getData(), avroSchema.getSchemaInfo().getSchema());
+
+ log.info("-- Exiting {} test --", methodName);
+
+ }
}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
index 113f26c..5432d43 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -19,12 +19,20 @@
package org.apache.pulsar.client.api;
import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.ByteSchema;
import org.apache.pulsar.client.impl.schema.BytesSchema;
+import org.apache.pulsar.client.impl.schema.DoubleSchema;
+import org.apache.pulsar.client.impl.schema.FloatSchema;
+import org.apache.pulsar.client.impl.schema.IntSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.LongSchema;
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
+import org.apache.pulsar.client.impl.schema.ShortSchema;
import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
/**
@@ -80,7 +88,43 @@ public interface Schema<T> {
return JSONSchema.of(clazz);
}
+ @Deprecated
static Schema<GenericRecord> AUTO() {
- return new AutoSchema();
+ return AUTO_CONSUME();
+ }
+
+ static Schema<GenericRecord> AUTO_CONSUME() {
+ return new AutoConsumeSchema();
+ }
+
+ static Schema<byte[]> AUTO_PRODUCE_BYTES() {
+ return new AutoProduceBytesSchema();
+ }
+
+ static Schema<?> getSchema(SchemaInfo schemaInfo) {
+ switch (schemaInfo.getType()) {
+ case INT8:
+ return ByteSchema.of();
+ case INT16:
+ return ShortSchema.of();
+ case INT32:
+ return IntSchema.of();
+ case INT64:
+ return LongSchema.of();
+ case STRING:
+ return StringSchema.utf8();
+ case FLOAT:
+ return FloatSchema.of();
+ case DOUBLE:
+ return DoubleSchema.of();
+ case BYTES:
+ return BytesSchema.of();
+ case JSON:
+ case AVRO:
+ return GenericSchema.of(schemaInfo);
+ default:
+ throw new IllegalArgumentException("Retrieve schema instance from schema info for type '"
+ + schemaInfo.getType() + "' is not supported yet");
+ }
}
}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
similarity index 96%
copy from pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java
copy to pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 5bf92b7..dc90931 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -28,7 +28,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
/**
* Auto detect schema.
*/
-public class AutoSchema implements Schema<GenericRecord> {
+public class AutoConsumeSchema implements Schema<GenericRecord> {
private Schema<GenericRecord> schema;
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
similarity index 75%
rename from pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java
rename to pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index 5bf92b7..36520a9 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -22,17 +22,16 @@ package org.apache.pulsar.client.impl.schema;
import static com.google.common.base.Preconditions.checkState;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.schema.SchemaInfo;
/**
* Auto detect schema.
*/
-public class AutoSchema implements Schema<GenericRecord> {
+public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
- private Schema<GenericRecord> schema;
+ private Schema<T> schema;
- public void setSchema(Schema<GenericRecord> schema) {
+ public void setSchema(Schema<T> schema) {
this.schema = schema;
}
@@ -41,17 +40,23 @@ public class AutoSchema implements Schema<GenericRecord> {
}
@Override
- public byte[] encode(GenericRecord message) {
+ public byte[] encode(byte[] message) {
ensureSchemaInitialized();
- return schema.encode(message);
+ // verify if the message can be decoded by the underlying schema
+ schema.decode(message);
+
+ return message;
}
@Override
- public GenericRecord decode(byte[] bytes) {
+ public byte[] decode(byte[] bytes) {
ensureSchemaInitialized();
- return schema.decode(bytes);
+ // verify the message can be detected by the underlying schema
+ schema.decode(bytes);
+
+ return bytes;
}
@Override
diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
index f07c928..f00d05d 100644
--- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
+++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
@@ -25,7 +25,7 @@ import static org.testng.Assert.assertTrue;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
import org.testng.annotations.Test;
@@ -53,7 +53,7 @@ public class GenericSchemaTest {
@Test
public void testAutoAvroSchema() {
Schema<Foo> encodeSchema = Schema.AVRO(Foo.class);
- AutoSchema decodeSchema = new AutoSchema();
+ AutoConsumeSchema decodeSchema = new AutoConsumeSchema();
decodeSchema.setSchema(GenericSchema.of(encodeSchema.getSchemaInfo()));
testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
}
@@ -61,7 +61,7 @@ public class GenericSchemaTest {
@Test
public void testAutoJsonSchema() {
Schema<Foo> encodeSchema = Schema.JSON(Foo.class);
- AutoSchema decodeSchema = new AutoSchema();
+ AutoConsumeSchema decodeSchema = new AutoConsumeSchema();
decodeSchema.setSchema(GenericSchema.of(encodeSchema.getSchemaInfo()));
testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 9640c09..bac6a85 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -62,7 +62,8 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
-import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
@@ -250,12 +251,12 @@ public class PulsarClientImpl implements PulsarClient {
ProducerInterceptors<T> interceptors) {
if (conf == null) {
return FutureUtil.failedFuture(
- new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
+ new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
}
- if (schema instanceof AutoSchema) {
+ if (schema instanceof AutoConsumeSchema) {
return FutureUtil.failedFuture(
- new PulsarClientException.InvalidConfigurationException("AutoSchema is only used by consumers to detect schemas automatically"));
+ new PulsarClientException.InvalidConfigurationException("AutoConsumeSchema is only used by consumers to detect schemas automatically"));
}
if (state.get() != State.Open) {
@@ -266,9 +267,30 @@ public class PulsarClientImpl implements PulsarClient {
if (!TopicName.isValid(topic)) {
return FutureUtil.failedFuture(
- new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'"));
+ new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'"));
}
+ if (schema instanceof AutoProduceBytesSchema) {
+ AutoProduceBytesSchema autoProduceBytesSchema = (AutoProduceBytesSchema) schema;
+ return lookup.getSchema(TopicName.get(conf.getTopicName()))
+ .thenCompose(schemaInfoOptional -> {
+ if (schemaInfoOptional.isPresent()) {
+ autoProduceBytesSchema.setSchema(Schema.getSchema(schemaInfoOptional.get()));
+ } else {
+ autoProduceBytesSchema.setSchema(Schema.BYTES);
+ }
+ return createProducerAsync(topic, conf, schema, interceptors);
+ });
+ } else {
+ return createProducerAsync(topic, conf, schema, interceptors);
+ }
+
+ }
+
+ private <T> CompletableFuture<Producer<T>> createProducerAsync(String topic,
+ ProducerConfigurationData conf,
+ Schema<T> schema,
+ ProducerInterceptors<T> interceptors) {
CompletableFuture<Producer<T>> producerCreatedFuture = new CompletableFuture<>();
getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
@@ -392,15 +414,15 @@ public class PulsarClientImpl implements PulsarClient {
}
private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
- if (schema instanceof AutoSchema) {
- AutoSchema autoSchema = (AutoSchema) schema;
+ if (schema instanceof AutoConsumeSchema) {
+ AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema) schema;
return lookup.getSchema(TopicName.get(conf.getSingleTopic()))
.thenCompose(schemaInfoOptional -> {
if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) {
GenericSchema genericSchema = GenericSchema.of(schemaInfoOptional.get());
log.info("Auto detected schema for topic {} : {}",
conf.getSingleTopic(), new String(schemaInfoOptional.get().getSchema(), UTF_8));
- autoSchema.setSchema(genericSchema);
+ autoConsumeSchema.setSchema(genericSchema);
return doSingleTopicSubscribeAsync(conf, schema, interceptors);
} else {
return FutureUtil.failedFuture(
@@ -546,15 +568,15 @@ public class PulsarClientImpl implements PulsarClient {
}
public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
- if (schema instanceof AutoSchema) {
- AutoSchema autoSchema = (AutoSchema) schema;
+ if (schema instanceof AutoConsumeSchema) {
+ AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema) schema;
return lookup.getSchema(TopicName.get(conf.getTopicName()))
.thenCompose(schemaInfoOptional -> {
if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) {
GenericSchema genericSchema = GenericSchema.of(schemaInfoOptional.get());
log.info("Auto detected schema for topic {} : {}",
conf.getTopicName(), new String(schemaInfoOptional.get().getSchema(), UTF_8));
- autoSchema.setSchema(genericSchema);
+ autoConsumeSchema.setSchema(genericSchema);
return doCreateReaderAsync(conf, schema);
} else {
return FutureUtil.failedFuture(
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index a19d587..4b4e576 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -85,5 +85,16 @@ public enum SchemaType {
/**
* Auto Detect Schema Type.
*/
- AUTO
+ @Deprecated
+ AUTO,
+
+ /**
+ * Auto Consume Type.
+ */
+ AUTO_CONSUME,
+
+ /**
+ * Auto Publish Type.
+ */
+ AUTO_PUBLISH
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index db9e880..cbf97f1 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -77,7 +77,7 @@ public class TopicSchema {
*/
private SchemaType getSchemaTypeOrDefault(String topic, Class<?> clazz) {
if (GenericRecord.class.isAssignableFrom(clazz)) {
- return SchemaType.AUTO;
+ return SchemaType.AUTO_CONSUME;
} else if (byte[].class.equals(clazz)) {
// if function uses bytes, we should ignore
return SchemaType.NONE;
@@ -96,7 +96,7 @@ public class TopicSchema {
return SchemaType.NONE;
} else if (GenericRecord.class.isAssignableFrom(clazz)) {
// the function is taking generic record, so we do auto schema detection
- return SchemaType.AUTO;
+ return SchemaType.AUTO_CONSUME;
} else if (String.class.equals(clazz)) {
// If type is String, then we use schema type string, otherwise we fallback on default schema
return SchemaType.STRING;
@@ -113,8 +113,9 @@ public class TopicSchema {
case NONE:
return (Schema<T>) Schema.BYTES;
+ case AUTO_CONSUME:
case AUTO:
- return (Schema<T>) Schema.AUTO();
+ return (Schema<T>) Schema.AUTO_CONSUME();
case STRING:
return (Schema<T>) Schema.STRING;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 4aa3016..ee3b480 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -71,6 +71,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -651,13 +652,15 @@ public class FunctionsImpl {
return Response.status(Status.BAD_REQUEST).build();
}
String outputTopic = functionMetaData.getFunctionDetails().getSink().getTopic();
- Reader reader = null;
- Producer producer = null;
+ Reader<byte[]> reader = null;
+ Producer<byte[]> producer = null;
try {
if (outputTopic != null && !outputTopic.isEmpty()) {
reader = worker().getClient().newReader().topic(outputTopic).startMessageId(MessageId.latest).create();
}
- producer = worker().getClient().newProducer().topic(inputTopicToWrite).create();
+ producer = worker().getClient().newProducer(Schema.AUTO_PRODUCE_BYTES())
+ .topic(inputTopicToWrite)
+ .create();
byte[] targetArray;
if (uploadedInputStream != null) {
targetArray = new byte[uploadedInputStream.available()];
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index 5067f55..24a7c6e 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -30,7 +30,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
import org.apache.pulsar.functions.api.Record;
@@ -99,10 +99,10 @@ public class JdbcSinkTest {
byte[] bytes = schema.encode(obj);
ByteBuf payload = Unpooled.copiedBuffer(bytes);
- AutoSchema autoSchema = new AutoSchema();
- autoSchema.setSchema(GenericSchema.of(schema.getSchemaInfo()));
+ AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+ autoConsumeSchema.setSchema(GenericSchema.of(schema.getSchemaInfo()));
- Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", conf, payload, autoSchema);
+ Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", conf, payload, autoConsumeSchema);
Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
.message(message)
.topicName("fake_topic_name")