You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/14 12:18:10 UTC
[pulsar] 06/14: Use Message.getReaderSchema() in Pulsar IO Sinks
when possible (#10557)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch 2.7.2_ds_rootless
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5dee3ebe51ee44c859c96f580ddad4769b9f0895
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Fri May 14 07:29:53 2021 +0200
Use Message.getReaderSchema() in Pulsar IO Sinks when possible (#10557)
(cherry picked from commit 90117b2be8df9893e0f9a7b6829d48bdddc7c55f)
---
.../apache/pulsar/client/api/SimpleSchemaTest.java | 5 ++
.../client/impl/schema/AbstractStructSchema.java | 8 ++
.../pulsar/client/impl/schema/KeyValueSchema.java | 10 ++-
.../pulsar/functions/instance/SinkRecord.java | 12 ++-
.../integration/io/TestGenericObjectSink.java | 15 +++-
.../io/PulsarGenericObjectSinkTest.java | 86 +++++++++++++++++++++-
6 files changed, 131 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index d594688..af49f96 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -616,6 +616,11 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
assertEquals(data.getKey().getField("i"), i * 100);
assertEquals(data.getValue().getField("i"), i * 1000);
c0.acknowledge(wrapper);
+ Schema<?> schema = wrapper.getReaderSchema().get();
+ KeyValueSchema keyValueSchema = (KeyValueSchema) schema;
+ assertEquals(SchemaType.AVRO, keyValueSchema.getKeySchema().getSchemaInfo().getType());
+ assertEquals(SchemaType.AVRO, keyValueSchema.getValueSchema().getSchemaInfo().getType());
+ assertNotNull(schema.getSchemaInfo());
}
// verify c1
for (int i = 0; i < numMessages; i++) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
index c4444e7..ce68434 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
@@ -158,6 +158,14 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
return Optional.empty();
}
}
+
+ @Override
+ public String toString() {
+ return "VersionedSchema(type=" + schemaInfo.getType() +
+ ",schemaVersion="+BytesSchemaVersion.of(schemaVersion) +
+ ",name="+schemaInfo.getName()
+ + ")";
+ }
}
private AbstractStructSchema<T> getAbstractStructSchemaAtVersion(byte[] schemaVersion, SchemaInfo schemaInfo) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index f572bbf..c33de77 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -122,6 +122,8 @@ public class KeyValueSchema<K, V> extends AbstractSchema<KeyValue<K, V>> {
// defer configuring the key/value schema info until `configureSchemaInfo` is called.
if (!requireFetchingSchemaInfo()) {
configureKeyValueSchemaInfo();
+ } else {
+ buildKeyValueSchemaInfo();
}
}
@@ -224,10 +226,14 @@ public class KeyValueSchema<K, V> extends AbstractSchema<KeyValue<K, V>> {
return KeyValueSchema.of(keySchema.clone(), valueSchema.clone(), keyValueEncodingType);
}
- private void configureKeyValueSchemaInfo() {
+ private void buildKeyValueSchemaInfo() {
this.schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
- keySchema, valueSchema, keyValueEncodingType
+ keySchema, valueSchema, keyValueEncodingType
);
+ }
+
+ private void configureKeyValueSchemaInfo() {
+ buildKeyValueSchemaInfo();
this.keySchema.setSchemaInfoProvider(new SchemaInfoProvider() {
@Override
public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index 38c7036..a7ff0eb 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -98,8 +98,18 @@ public class SinkRecord<T> implements Record<T> {
if (sourceRecord.getSchema() != null) {
// unwrap actual schema
Schema<T> schema = sourceRecord.getSchema();
+ // AutoConsumeSchema is a special schema, that comes into play
+ // when the Sink is going to handle any Schema
+ // usually you see Sink<GenericObject> or Sink<GenericRecord> in this case
if (schema instanceof AutoConsumeSchema) {
- schema = (Schema<T>) ((AutoConsumeSchema) schema).getInternalSchema();
+ // extract the Schema from the message, this is the most accurate schema we have
+ // see PIP-85
+ if (sourceRecord.getMessage().isPresent()
+ && sourceRecord.getMessage().get().getReaderSchema().isPresent()) {
+ schema = (Schema<T>) sourceRecord.getMessage().get().getReaderSchema().get();
+ } else {
+ schema = (Schema<T>) ((AutoConsumeSchema) schema).getInternalSchema();
+ }
}
return schema;
}
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
index b7645ba..63b3bac 100644
--- a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
@@ -41,10 +41,13 @@ public class TestGenericObjectSink implements Sink<GenericObject> {
log.info("received record {} {}", record, record.getClass());
log.info("schema {}", record.getSchema());
log.info("native schema {}", record.getSchema().getNativeSchema().orElse(null));
+ log.info("schemaInfo {}", record.getSchema().getSchemaInfo());
+ log.info("schemaInfo.type {}", record.getSchema().getSchemaInfo().getType());
String expectedRecordType = record.getProperties().getOrDefault("expectedType", "MISSING");
+ log.info("expectedRecordType {}", expectedRecordType);
if (!expectedRecordType.equals(record.getSchema().getSchemaInfo().getType().name())) {
- throw new RuntimeException("Unexpected record type "+record.getSchema().getSchemaInfo().getType().name() +" is not "+expectedRecordType);
+ throw new RuntimeException("Unexpected record type " + record.getSchema().getSchemaInfo().getType().name() + " is not " + expectedRecordType);
}
log.info("value {}", record.getValue());
@@ -66,6 +69,16 @@ public class TestGenericObjectSink implements Sink<GenericObject> {
log.info("value schema type {}", record.getValue().getSchemaType());
log.info("value native object {}", record.getValue().getNativeObject());
+ String expectedSchemaDefinition = record.getProperties().getOrDefault("expectedSchemaDefinition", "");
+ log.info("schemaDefinition {}", record.getSchema().getSchemaInfo().getSchemaDefinition());
+ log.info("expectedSchemaDefinition {}", expectedSchemaDefinition);
+ if (!expectedSchemaDefinition.isEmpty()) {
+ String schemaDefinition = record.getSchema().getSchemaInfo().getSchemaDefinition();
+ if (!expectedSchemaDefinition.equals(schemaDefinition)) {
+ throw new RuntimeException("Unexpected schema definition " + schemaDefinition + " is not " + expectedSchemaDefinition);
+ }
+ }
+
record.ack();
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
index c810b1b..a728f5b 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
@@ -74,13 +74,21 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {
@Data
@Builder
- public static final class Pojo {
+ public static class Pojo {
private String field1;
private int field2;
}
@Data
@Builder
+ public static class PojoV2 {
+ private String field1;
+ private int field2;
+ private Double field3;
+ }
+
+ @Data
+ @Builder
public static final class PojoKey {
private String field1;
}
@@ -170,6 +178,82 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {
getSinkInfoNotFound(sinkName);
}
+ @Test(groups = {"sink"})
+ public void testGenericObjectSinkWithSchemaChange() throws Exception {
+
+ @Cleanup PulsarClient client = PulsarClient.builder()
+ .serviceUrl(container.getPlainTextServiceUrl())
+ .build();
+
+ @Cleanup
+ PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build();
+
+
+ final int numRecords = 2;
+
+ String sinkName = "genericobject-sink";
+ String topicName = "test-genericobject-sink-schema-change";
+
+ submitSinkConnector(sinkName, topicName, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
+ // get sink info
+ getSinkInfoSuccess(sinkName);
+ getSinkStatus(sinkName);
+
+ @Cleanup Producer<byte[]> producer = client.newProducer()
+ .topic(topicName)
+ .create();
+ Schema<Pojo> schemav1 = Schema.AVRO(Pojo.class);
+ Pojo record1 = Pojo.builder().field1("foo").field2(23).build();
+ producer.newMessage(schemav1)
+ .value(record1)
+ .property("expectedType", schemav1.getSchemaInfo().getType().toString())
+ .property("expectedSchemaDefinition", schemav1.getSchemaInfo().getSchemaDefinition())
+ .property("recordNumber", "1")
+ .send();
+
+ Schema<PojoV2> schemav2 = Schema.AVRO(PojoV2.class);
+ PojoV2 record2 = PojoV2.builder().field1("foo").field2(23).field3(42.5).build();
+ producer.newMessage(schemav2)
+ .value(record2)
+ .property("expectedType", schemav2.getSchemaInfo().getType().toString())
+ .property("expectedSchemaDefinition", schemav2.getSchemaInfo().getSchemaDefinition())
+ .property("recordNumber", "2")
+ .send();
+
+ // wait that sink processed all records without errors
+
+ try {
+ log.info("waiting for sink {}", sinkName);
+
+ for (int i = 0; i < 120; i++) {
+ SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
+ log.info("sink {} status {}", sinkName, status);
+ assertEquals(status.getInstances().size(), 1);
+ SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
+ if (instance.getStatus().numWrittenToSink >= numRecords
+ || instance.getStatus().numSinkExceptions > 0
+ || instance.getStatus().numSystemExceptions > 0
+ || instance.getStatus().numRestarts > 0) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
+ log.info("sink {} status {}", sinkName, status);
+ assertEquals(status.getInstances().size(), 1);
+ assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords);
+ assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
+ assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
+ log.info("sink {} is okay", sinkName);
+ } finally {
+ dumpFunctionLogs(sinkName);
+ }
+
+ deleteSink(sinkName);
+ getSinkInfoNotFound(sinkName);
+ }
+
private void submitSinkConnector(String sinkName,
String inputTopicName,
String className,