You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/13 07:46:22 UTC

[pulsar] 28/46: Sink unwrap internal AutoConsumeSchema and allow to handle topics with KeyValue schema (#10211)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ec3ad85d1b88dae36423c38a1a86651e6fd63c28
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Sat Apr 17 10:14:21 2021 +0200

    Sink<GenericObject> unwrap internal AutoConsumeSchema and allow to handle topics with KeyValue schema (#10211)
---
 .../pulsar/client/impl/TopicMessageImpl.java       |  9 +++
 .../pulsar/functions/instance/SinkRecord.java      |  8 ++-
 .../pulsar/functions/source/PulsarSource.java      |  4 ++
 .../integration/io/TestGenericObjectSink.java      | 29 +++++++--
 .../io/PulsarGenericObjectSinkTest.java            | 70 ++++++++++++++++++----
 5 files changed, 102 insertions(+), 18 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index fb62ba3..00869f1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Optional;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.EncryptionContext;
 
 public class TopicMessageImpl<T> implements Message<T> {
@@ -171,4 +172,12 @@ public class TopicMessageImpl<T> implements Message<T> {
     public Message<T> getMessage() {
         return msg;
     }
+
+    public Schema<T> getSchema() {
+        if (this.msg instanceof MessageImpl) {
+            MessageImpl message = (MessageImpl) this.msg;
+            return message.getSchema();
+        }
+        return null;
+    }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index dc1bfd1..38c7036 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -27,6 +27,7 @@ import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.functions.api.KVRecord;
 import org.apache.pulsar.functions.api.Record;
@@ -95,7 +96,12 @@ public class SinkRecord<T> implements Record<T> {
         }
 
         if (sourceRecord.getSchema() != null) {
-            return sourceRecord.getSchema();
+            // unwrap actual schema
+            Schema<T> schema =  sourceRecord.getSchema();
+            if (schema instanceof AutoConsumeSchema) {
+                schema = (Schema<T>) ((AutoConsumeSchema) schema).getInternalSchema();
+            }
+            return schema;
         }
 
         if (sourceRecord instanceof KVRecord) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 235c6ca..e1ec9f6 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.util.Reflections;
@@ -132,6 +133,9 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
         if (message instanceof MessageImpl) {
             MessageImpl impl = (MessageImpl) message;
             schema = impl.getSchema();
+        } else if (message instanceof TopicMessageImpl) {
+            TopicMessageImpl impl = (TopicMessageImpl) message;
+            schema = impl.getSchema();
         }
         Record<T> record = PulsarRecord.<T>builder()
                 .message(message)
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
index 6b0da0b..c0c4ac2 100644
--- a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
@@ -20,18 +20,13 @@ package org.apache.pulsar.tests.integration.io;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaType;
-import org.apache.pulsar.functions.api.KVRecord;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.core.Source;
-import org.apache.pulsar.io.core.SourceContext;
-
-import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
 @Slf4j
 public class TestGenericObjectSink implements Sink<GenericObject> {
@@ -41,10 +36,32 @@ public class TestGenericObjectSink implements Sink<GenericObject> {
     }
 
     public void write(Record<GenericObject> record) {
+
+        log.info("properties {}", record.getProperties());
         log.info("received record {} {}", record, record.getClass());
         log.info("schema {}", record.getSchema());
         log.info("native schema {}", record.getSchema().getNativeSchema().orElse(null));
 
+        String expectedRecordType = record.getProperties().getOrDefault("expectedType", "MISSING");
+        if (!expectedRecordType.equals(record.getSchema().getSchemaInfo().getType().name())) {
+            throw new RuntimeException("Unexpected record type "+record.getSchema().getSchemaInfo().getType().name() +" is not "+expectedRecordType);
+        }
+
+        log.info("value {}", record.getValue());
+        log.info("value schema type {}", record.getValue().getSchemaType());
+        log.info("value native object {}", record.getValue().getNativeObject());
+
+        if (record.getSchema().getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+            // assert that we are able to access the schema (leads to ClassCastException if there is a problem)
+            KeyValueSchema kvSchema = (KeyValueSchema) record.getSchema();
+            log.info("key schema type {}", kvSchema.getKeySchema());
+            log.info("value schema type {}", kvSchema.getValueSchema());
+            log.info("key encoding {}", kvSchema.getKeyValueEncodingType());
+
+            KeyValue keyValue = (KeyValue) record.getValue().getNativeObject();
+            log.info("kvkey {}", keyValue.getKey());
+            log.info("kvvalue {}", keyValue.getValue());
+        }
         log.info("value {}", record.getValue());
         log.info("value schema type {}", record.getValue().getSchemaType());
         log.info("value native object {}", record.getValue().getNativeObject());
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
index 25f1776..1e5fb74 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
@@ -22,8 +22,10 @@ import lombok.Builder;
 import lombok.Cleanup;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
@@ -36,8 +38,11 @@ import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
 import static org.testng.Assert.assertEquals;
@@ -71,16 +76,29 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {
         private int field2;
     }
 
+    @Data
+    @Builder
+    public static final class PojoKey {
+        private String field1;
+    }
+
     @Test(groups = {"sink"})
     public void testGenericObjectSink() throws Exception {
+
+        @Cleanup PulsarClient client = PulsarClient.builder()
+                .serviceUrl(container.getPlainTextServiceUrl())
+                .build();
+
         // we are not using a parametrized test in order to save resources
         // we create N sinks, send the records and verify each sink
         // sinks execution happens in parallel
         List<SinkSpec> specs = Arrays.asList(
                 new SinkSpec("test-kv-sink-input-string-" + randomName(8), "test-kv-sink-string-" + randomName(8), Schema.STRING, "foo"),
-                new SinkSpec("test-kv-sink-input-int-" + randomName(8), "test-kv-sink-int-" + randomName(8), Schema.INT32, 123),
                 new SinkSpec("test-kv-sink-input-avro-" + randomName(8), "test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
-                new SinkSpec("test-kv-sink-input-json-" + randomName(8), "test-kv-sink-json-" + randomName(8), Schema.JSON(Pojo.class), Pojo.builder().field1("a").field2(2).build())
+                new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                        Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)),
+                new SinkSpec("test-kv-sink-input-kv-avro-json-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
         );
         // submit all sinks
         for (SinkSpec spec : specs) {
@@ -93,10 +111,6 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {
             getSinkStatus(spec.sinkName);
         }
 
-        @Cleanup PulsarClient client = PulsarClient.builder()
-                .serviceUrl(container.getPlainTextServiceUrl())
-                .build();
-
         final int numRecords = 10;
 
         for (SinkSpec spec : specs) {
@@ -104,21 +118,41 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {
                     .topic(spec.outputTopicName)
                     .create();
             for (int i = 0; i < numRecords; i++) {
-                producer.send(spec.testValue);
+                MessageId messageId = producer.newMessage()
+                        .value(spec.testValue)
+                        .property("expectedType", spec.schema.getSchemaInfo().getType().toString())
+                        .send();
+                log.info("sent message {} {}  with ID {}", spec.testValue, spec.schema.getSchemaInfo().getType().toString(), messageId);
             }
         }
 
         // wait that all sinks processed all records without errors
         try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
-
             for (SinkSpec spec : specs) {
-                Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+                try {
+                    log.info("waiting for sink {}", spec.sinkName);
+                    for (int i = 0; i < 120; i++) {
+                        SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
+                        log.info("sink {} status {}", spec.sinkName, status);
+                        assertEquals(status.getInstances().size(), 1);
+                        SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
+                        if (instance.getStatus().numWrittenToSink >= numRecords) {
+                            break;
+                        }
+                        assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
+                        Thread.sleep(1000);
+                    }
+
                     SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
+                    log.info("sink {} status {}", spec.sinkName, status);
                     assertEquals(status.getInstances().size(), 1);
-                    assertTrue(status.getInstances().get(0).getStatus().numReadFromPulsar >= numRecords);
+                    assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords);
                     assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
                     assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
-                });
+                    log.info("sink {} is okay", spec.sinkName);
+                } finally {
+                    dumpSinkLogs(spec);
+                }
             }
         }
 
@@ -129,6 +163,18 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {
         }
     }
 
+    private void dumpSinkLogs(SinkSpec spec) {
+        try {
+            String logFile = "/pulsar/logs/functions/public/default/" + spec.sinkName + "/" + spec.sinkName + "-0.log";
+            String logs = container.<String>copyFileFromContainer(logFile, (inputStream) -> {
+                return IOUtils.toString(inputStream, "utf-8");
+            });
+            log.info("Sink {} logs {}", spec.sinkName, logs);
+        } catch (Throwable err) {
+            log.info("Cannot download sink {} logs", spec.sinkName, err);
+        }
+    }
+
     private void submitSinkConnector(String sinkName,
                                      String inputTopicName,
                                      String className,
@@ -169,6 +215,8 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {
                 "--namespace", "default",
                 "--name", sinkName
         );
+        log.info(result.getStdout());
+        log.info(result.getStderr());
         assertTrue(result.getStdout().contains("\"running\" : true"));
     }