You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/13 07:46:23 UTC

[pulsar] 29/46: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload (#10248)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ff5924492d784fb69d795d694e3e58183dd989c9
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Mon Apr 19 14:18:29 2021 +0200

    Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload (#10248)
    
    (cherry picked from commit 54523bbff9b2b2b861409a93cfb3712bfdb73f27)
---
 .../org/apache/pulsar/client/impl/MessageImpl.java |  11 +-
 .../client/impl/schema/AutoConsumeSchema.java      |  72 ++++++++----
 .../integration/io/TestGenericObjectSink.java      |   4 +-
 .../io/PulsarGenericObjectSinkTest.java            | 123 ++++++++++-----------
 .../suites/PulsarStandaloneTestSuite.java          |   1 +
 .../topologies/PulsarStandaloneTestBase.java       |  15 +++
 6 files changed, 137 insertions(+), 89 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 3cb2235..a148677 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
 public class MessageImpl<T> implements Message<T> {
@@ -279,10 +280,18 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
+    private SchemaInfo getSchemaInfo() {
+        if (schema instanceof AutoConsumeSchema) {
+            ((AutoConsumeSchema) schema).fetchSchemaIfNeeded();
+        }
+        return schema.getSchemaInfo();
+    }
+
     @Override
     public T getValue() {
         checkNotNull(msgMetadataBuilder);
-        if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
+        SchemaInfo schemaInfo = getSchemaInfo();
+        if (schemaInfo != null && SchemaType.KEY_VALUE == schemaInfo.getType()) {
             if (schema.supportSchemaVersioning()) {
                 return getKeyValueBySchemaVersion();
             } else {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 41b1260..07c807c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl.schema;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
-import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema;
@@ -30,7 +29,11 @@ import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static com.google.common.base.Preconditions.checkState;
 
@@ -77,27 +80,7 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
 
     @Override
     public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
-        if (schema == null) {
-            SchemaInfo schemaInfo = null;
-            try {
-                schemaInfo = schemaInfoProvider.getLatestSchema().get();
-                if (schemaInfo == null) {
-                    // schemaless topic
-                    schemaInfo = BytesSchema.of().getSchemaInfo();
-                }
-            } catch (InterruptedException | ExecutionException e ) {
-                if (e instanceof InterruptedException) {
-                    Thread.currentThread().interrupt();
-                }
-                log.error("Can't get last schema for topic {} use AutoConsumeSchema", topicName);
-                throw new SchemaSerializationException(e.getCause());
-            }
-            // schemaInfo null means that there is no schema attached to the topic.
-            schema = generateSchema(schemaInfo);
-            schema.setSchemaInfoProvider(schemaInfoProvider);
-            log.info("Configure {} schema for topic {} : {}",
-                    componentName, topicName, schemaInfo.getSchemaDefinition());
-        }
+        fetchSchemaIfNeeded();
         ensureSchemaInitialized();
         return adapt(schema.decode(bytes, schemaVersion), schemaVersion);
     }
@@ -157,7 +140,7 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
         }
     }
 
-    private Schema<?> generateSchema(SchemaInfo schemaInfo) {
+    private static Schema<?> generateSchema(SchemaInfo schemaInfo) {
         // when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
         // to decode the messages.
         final boolean useProvidedSchemaAsReaderSchema = false;
@@ -260,4 +243,47 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
     public Schema<?> getInternalSchema() {
         return schema;
     }
+
+    /**
+     * It may happen that the schema is not loaded but we need it, for instance in order to call getSchemaInfo()
+     * We cannot call this method in getSchemaInfo, because getSchemaInfo is called in many
+     * places and we will introduce lots of deadlocks.
+     */
+    public void fetchSchemaIfNeeded() throws SchemaSerializationException {
+        if (schema == null) {
+            if (schemaInfoProvider == null) {
+                throw new SchemaSerializationException("Can't get accurate schema information for topic " + topicName +
+                                                "using AutoConsumeSchema because SchemaInfoProvider is not set yet");
+            } else {
+                SchemaInfo schemaInfo = null;
+                try {
+                    schemaInfo = schemaInfoProvider.getLatestSchema().get();
+                    if (schemaInfo == null) {
+                        // schemaless topic
+                        schemaInfo = BytesSchema.of().getSchemaInfo();
+                    }
+                } catch (InterruptedException | ExecutionException e) {
+                    if (e instanceof InterruptedException) {
+                        Thread.currentThread().interrupt();
+                    }
+                    log.error("Can't get last schema for topic {} using AutoConsumeSchema", topicName);
+                    throw new SchemaSerializationException(e.getCause());
+                }
+                // schemaInfo null means that there is no schema attached to the topic.
+                schema = generateSchema(schemaInfo);
+                schema.setSchemaInfoProvider(schemaInfoProvider);
+                log.info("Configure {} schema for topic {} : {}",
+                        componentName, topicName, schemaInfo.getSchemaDefinition());
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        if (schema != null && schema.getSchemaInfo() != null) {
+            return "AUTO_CONSUME(schematype=" + schema.getSchemaInfo().getType() + ")";
+        } else {
+            return "AUTO_CONSUME(uninitialized)";
+        }
+    }
 }
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
index c0c4ac2..b7645ba 100644
--- a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
@@ -36,7 +36,7 @@ public class TestGenericObjectSink implements Sink<GenericObject> {
     }
 
     public void write(Record<GenericObject> record) {
-
+        log.info("topic {}", record.getTopicName().orElse(null));
         log.info("properties {}", record.getProperties());
         log.info("received record {} {}", record, record.getClass());
         log.info("schema {}", record.getSchema());
@@ -65,6 +65,8 @@ public class TestGenericObjectSink implements Sink<GenericObject> {
         log.info("value {}", record.getValue());
         log.info("value schema type {}", record.getValue().getSchemaType());
         log.info("value native object {}", record.getValue().getNativeObject());
+
+        record.ack();
     }
 
     @Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
index 1e5fb74..c810b1b 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.tests.integration.io;
 import lombok.Builder;
 import lombok.Cleanup;
 import lombok.Data;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -31,6 +32,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
@@ -43,6 +45,8 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
 import static org.testng.Assert.assertEquals;
@@ -55,15 +59,14 @@ import static org.testng.Assert.fail;
 @Slf4j
 public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {
 
+    @Getter
     private static final class SinkSpec<T> {
         final String outputTopicName;
-        final String sinkName;
         final Schema<T> schema;
         final T testValue;
 
-        public SinkSpec(String outputTopicName, String sinkName, Schema<T> schema, T testValue) {
+        public SinkSpec(String outputTopicName, Schema<T> schema, T testValue) {
             this.outputTopicName = outputTopicName;
-            this.sinkName = sinkName;
             this.schema = schema;
             this.testValue = testValue;
         }
@@ -89,90 +92,82 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {
                 .serviceUrl(container.getPlainTextServiceUrl())
                 .build();
 
+        @Cleanup
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build();
+
         // we are not using a parametrized test in order to save resources
-        // we create N sinks, send the records and verify each sink
-        // sinks execution happens in parallel
+        // we create one sink that listens on multiple topics, send the records and verify the sink
         List<SinkSpec> specs = Arrays.asList(
-                new SinkSpec("test-kv-sink-input-string-" + randomName(8), "test-kv-sink-string-" + randomName(8), Schema.STRING, "foo"),
-                new SinkSpec("test-kv-sink-input-avro-" + randomName(8), "test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
-                new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                new SinkSpec("test-kv-sink-input-string-" + randomName(8), Schema.STRING, "foo"),
+                new SinkSpec("test-kv-sink-input-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
+                new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8),
                         Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)),
-                new SinkSpec("test-kv-sink-input-kv-avro-json-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
-                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
+                new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.INLINE), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build())),
+                new SinkSpec("test-kv-sink-input-kv-avro-json-sep-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.SEPARATED), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
         );
-        // submit all sinks
-        for (SinkSpec spec : specs) {
-            submitSinkConnector(spec.sinkName, spec.outputTopicName, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
-        }
-        // check all sinks
-        for (SinkSpec spec : specs) {
-            // get sink info
-            getSinkInfoSuccess(spec.sinkName);
-            getSinkStatus(spec.sinkName);
-        }
 
-        final int numRecords = 10;
+        final int numRecordsPerTopic = 2;
+
+        String sinkName = "genericobject-sink";
+        String topicNames = specs
+                .stream()
+                .map(SinkSpec::getOutputTopicName)
+                .collect(Collectors.joining(","));
+        submitSinkConnector(sinkName, topicNames, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
+        // get sink info
+        getSinkInfoSuccess(sinkName);
+        getSinkStatus(sinkName);
+
 
         for (SinkSpec spec : specs) {
+
             @Cleanup Producer<Object> producer = client.newProducer(spec.schema)
                     .topic(spec.outputTopicName)
                     .create();
-            for (int i = 0; i < numRecords; i++) {
+            for (int i = 0; i < numRecordsPerTopic; i++) {
                 MessageId messageId = producer.newMessage()
                         .value(spec.testValue)
                         .property("expectedType", spec.schema.getSchemaInfo().getType().toString())
+                        .property("recordNumber", i + "")
                         .send();
                 log.info("sent message {} {}  with ID {}", spec.testValue, spec.schema.getSchemaInfo().getType().toString(), messageId);
             }
         }
 
-        // wait that all sinks processed all records without errors
-        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
-            for (SinkSpec spec : specs) {
-                try {
-                    log.info("waiting for sink {}", spec.sinkName);
-                    for (int i = 0; i < 120; i++) {
-                        SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
-                        log.info("sink {} status {}", spec.sinkName, status);
-                        assertEquals(status.getInstances().size(), 1);
-                        SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
-                        if (instance.getStatus().numWrittenToSink >= numRecords) {
-                            break;
-                        }
-                        assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
-                        Thread.sleep(1000);
-                    }
-
-                    SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
-                    log.info("sink {} status {}", spec.sinkName, status);
-                    assertEquals(status.getInstances().size(), 1);
-                    assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords);
-                    assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
-                    assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
-                    log.info("sink {} is okay", spec.sinkName);
-                } finally {
-                    dumpSinkLogs(spec);
+        // wait that sink processed all records without errors
+
+        try {
+            log.info("waiting for sink {}", sinkName);
+
+            for (int i = 0; i < 120; i++) {
+                SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
+                log.info("sink {} status {}", sinkName, status);
+                assertEquals(status.getInstances().size(), 1);
+                SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
+                if (instance.getStatus().numWrittenToSink >= numRecordsPerTopic * specs.size()
+                    || instance.getStatus().numSinkExceptions > 0
+                    || instance.getStatus().numSystemExceptions > 0
+                    || instance.getStatus().numRestarts > 0) {
+                    break;
                 }
+                Thread.sleep(1000);
             }
-        }
-
 
-        for (SinkSpec spec : specs) {
-            deleteSink(spec.sinkName);
-            getSinkInfoNotFound(spec.sinkName);
+            SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
+            log.info("sink {} status {}", sinkName, status);
+            assertEquals(status.getInstances().size(), 1);
+            assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecordsPerTopic * specs.size());
+            assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
+            assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
+            log.info("sink {} is okay", sinkName);
+        } finally {
+            dumpFunctionLogs(sinkName);
         }
-    }
 
-    private void dumpSinkLogs(SinkSpec spec) {
-        try {
-            String logFile = "/pulsar/logs/functions/public/default/" + spec.sinkName + "/" + spec.sinkName + "-0.log";
-            String logs = container.<String>copyFileFromContainer(logFile, (inputStream) -> {
-                return IOUtils.toString(inputStream, "utf-8");
-            });
-            log.info("Sink {} logs {}", spec.sinkName, logs);
-        } catch (Throwable err) {
-            log.info("Cannot download sink {} logs", spec.sinkName, err);
-        }
+        deleteSink(sinkName);
+        getSinkInfoNotFound(sinkName);
     }
 
     private void submitSinkConnector(String sinkName,
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java
index 4e2e601..93e2e07 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java
@@ -40,4 +40,5 @@ public class PulsarStandaloneTestSuite extends PulsarStandaloneTestBase implemen
     public String getTestName() {
         return "pulsar-standalone-suite";
     }
+
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
index 03132f5..459837b 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.tests.integration.topologies;
 import static org.testng.Assert.assertEquals;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
 import org.apache.pulsar.tests.integration.containers.StandaloneContainer;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.testcontainers.containers.Network;
@@ -90,4 +91,18 @@ public abstract class PulsarStandaloneTestBase extends PulsarTestBase {
         network.close();
     }
 
+
+
+    protected void dumpFunctionLogs(String name) {
+        try {
+            String logFile = "/pulsar/logs/functions/public/default/" + name + "/" + name + "-0.log";
+            String logs = container.<String>copyFileFromContainer(logFile, (inputStream) -> {
+                return IOUtils.toString(inputStream, "utf-8");
+            });
+            log.info("Function {} logs {}", name, logs);
+        } catch (Throwable err) {
+            log.info("Cannot download {} logs", name, err);
+        }
+    }
+
 }