You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/13 07:46:22 UTC
[pulsar] 28/46: Sink unwrap internal
AutoConsumeSchema and allow to handle topics with KeyValue schema (#10211)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ec3ad85d1b88dae36423c38a1a86651e6fd63c28
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Sat Apr 17 10:14:21 2021 +0200
Sink<GenericObject> unwrap internal AutoConsumeSchema and allow to handle topics with KeyValue schema (#10211)
---
.../pulsar/client/impl/TopicMessageImpl.java | 9 +++
.../pulsar/functions/instance/SinkRecord.java | 8 ++-
.../pulsar/functions/source/PulsarSource.java | 4 ++
.../integration/io/TestGenericObjectSink.java | 29 +++++++--
.../io/PulsarGenericObjectSinkTest.java | 70 ++++++++++++++++++----
5 files changed, 102 insertions(+), 18 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index fb62ba3..00869f1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.EncryptionContext;
public class TopicMessageImpl<T> implements Message<T> {
@@ -171,4 +172,12 @@ public class TopicMessageImpl<T> implements Message<T> {
public Message<T> getMessage() {
return msg;
}
+
+ public Schema<T> getSchema() {
+ if (this.msg instanceof MessageImpl) {
+ MessageImpl message = (MessageImpl) this.msg;
+ return message.getSchema();
+ }
+ return null;
+ }
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index dc1bfd1..38c7036 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -27,6 +27,7 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.functions.api.Record;
@@ -95,7 +96,12 @@ public class SinkRecord<T> implements Record<T> {
}
if (sourceRecord.getSchema() != null) {
- return sourceRecord.getSchema();
+ // unwrap actual schema
+ Schema<T> schema = sourceRecord.getSchema();
+ if (schema instanceof AutoConsumeSchema) {
+ schema = (Schema<T>) ((AutoConsumeSchema) schema).getInternalSchema();
+ }
+ return schema;
}
if (sourceRecord instanceof KVRecord) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 235c6ca..e1ec9f6 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.util.Reflections;
@@ -132,6 +133,9 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
if (message instanceof MessageImpl) {
MessageImpl impl = (MessageImpl) message;
schema = impl.getSchema();
+ } else if (message instanceof TopicMessageImpl) {
+ TopicMessageImpl impl = (TopicMessageImpl) message;
+ schema = impl.getSchema();
}
Record<T> record = PulsarRecord.<T>builder()
.message(message)
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
index 6b0da0b..c0c4ac2 100644
--- a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
@@ -20,18 +20,13 @@ package org.apache.pulsar.tests.integration.io;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
-import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.core.Source;
-import org.apache.pulsar.io.core.SourceContext;
-
-import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
@Slf4j
public class TestGenericObjectSink implements Sink<GenericObject> {
@@ -41,10 +36,32 @@ public class TestGenericObjectSink implements Sink<GenericObject> {
}
public void write(Record<GenericObject> record) {
+
+ log.info("properties {}", record.getProperties());
log.info("received record {} {}", record, record.getClass());
log.info("schema {}", record.getSchema());
log.info("native schema {}", record.getSchema().getNativeSchema().orElse(null));
+ String expectedRecordType = record.getProperties().getOrDefault("expectedType", "MISSING");
+ if (!expectedRecordType.equals(record.getSchema().getSchemaInfo().getType().name())) {
+ throw new RuntimeException("Unexpected record type "+record.getSchema().getSchemaInfo().getType().name() +" is not "+expectedRecordType);
+ }
+
+ log.info("value {}", record.getValue());
+ log.info("value schema type {}", record.getValue().getSchemaType());
+ log.info("value native object {}", record.getValue().getNativeObject());
+
+ if (record.getSchema().getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+ // assert that we are able to access the schema (leads to ClassCastException if there is a problem)
+ KeyValueSchema kvSchema = (KeyValueSchema) record.getSchema();
+ log.info("key schema type {}", kvSchema.getKeySchema());
+ log.info("value schema type {}", kvSchema.getValueSchema());
+ log.info("key encoding {}", kvSchema.getKeyValueEncodingType());
+
+ KeyValue keyValue = (KeyValue) record.getValue().getNativeObject();
+ log.info("kvkey {}", keyValue.getKey());
+ log.info("kvvalue {}", keyValue.getValue());
+ }
log.info("value {}", record.getValue());
log.info("value schema type {}", record.getValue().getSchemaType());
log.info("value native object {}", record.getValue().getNativeObject());
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
index 25f1776..1e5fb74 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
@@ -22,8 +22,10 @@ import lombok.Builder;
import lombok.Cleanup;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
@@ -36,8 +38,11 @@ import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
import static org.testng.Assert.assertEquals;
@@ -71,16 +76,29 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {
private int field2;
}
+ @Data
+ @Builder
+ public static final class PojoKey {
+ private String field1;
+ }
+
@Test(groups = {"sink"})
public void testGenericObjectSink() throws Exception {
+
+ @Cleanup PulsarClient client = PulsarClient.builder()
+ .serviceUrl(container.getPlainTextServiceUrl())
+ .build();
+
// we are not using a parametrized test in order to save resources
// we create N sinks, send the records and verify each sink
// sinks execution happens in parallel
List<SinkSpec> specs = Arrays.asList(
new SinkSpec("test-kv-sink-input-string-" + randomName(8), "test-kv-sink-string-" + randomName(8), Schema.STRING, "foo"),
- new SinkSpec("test-kv-sink-input-int-" + randomName(8), "test-kv-sink-int-" + randomName(8), Schema.INT32, 123),
new SinkSpec("test-kv-sink-input-avro-" + randomName(8), "test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
- new SinkSpec("test-kv-sink-input-json-" + randomName(8), "test-kv-sink-json-" + randomName(8), Schema.JSON(Pojo.class), Pojo.builder().field1("a").field2(2).build())
+ new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+ Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)),
+ new SinkSpec("test-kv-sink-input-kv-avro-json-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+ Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
);
// submit all sinks
for (SinkSpec spec : specs) {
@@ -93,10 +111,6 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {
getSinkStatus(spec.sinkName);
}
- @Cleanup PulsarClient client = PulsarClient.builder()
- .serviceUrl(container.getPlainTextServiceUrl())
- .build();
-
final int numRecords = 10;
for (SinkSpec spec : specs) {
@@ -104,21 +118,41 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {
.topic(spec.outputTopicName)
.create();
for (int i = 0; i < numRecords; i++) {
- producer.send(spec.testValue);
+ MessageId messageId = producer.newMessage()
+ .value(spec.testValue)
+ .property("expectedType", spec.schema.getSchemaInfo().getType().toString())
+ .send();
+ log.info("sent message {} {} with ID {}", spec.testValue, spec.schema.getSchemaInfo().getType().toString(), messageId);
}
}
// wait that all sinks processed all records without errors
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
-
for (SinkSpec spec : specs) {
- Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+ try {
+ log.info("waiting for sink {}", spec.sinkName);
+ for (int i = 0; i < 120; i++) {
+ SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
+ log.info("sink {} status {}", spec.sinkName, status);
+ assertEquals(status.getInstances().size(), 1);
+ SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
+ if (instance.getStatus().numWrittenToSink >= numRecords) {
+ break;
+ }
+ assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
+ Thread.sleep(1000);
+ }
+
SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
+ log.info("sink {} status {}", spec.sinkName, status);
assertEquals(status.getInstances().size(), 1);
- assertTrue(status.getInstances().get(0).getStatus().numReadFromPulsar >= numRecords);
+ assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords);
assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
- });
+ log.info("sink {} is okay", spec.sinkName);
+ } finally {
+ dumpSinkLogs(spec);
+ }
}
}
@@ -129,6 +163,18 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {
}
}
+ private void dumpSinkLogs(SinkSpec spec) {
+ try {
+ String logFile = "/pulsar/logs/functions/public/default/" + spec.sinkName + "/" + spec.sinkName + "-0.log";
+ String logs = container.<String>copyFileFromContainer(logFile, (inputStream) -> {
+ return IOUtils.toString(inputStream, "utf-8");
+ });
+ log.info("Sink {} logs {}", spec.sinkName, logs);
+ } catch (Throwable err) {
+ log.info("Cannot download sink {} logs", spec.sinkName, err);
+ }
+ }
+
private void submitSinkConnector(String sinkName,
String inputTopicName,
String className,
@@ -169,6 +215,8 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {
"--namespace", "default",
"--name", sinkName
);
+ log.info(result.getStdout());
+ log.info(result.getStderr());
assertTrue(result.getStdout().contains("\"running\" : true"));
}