You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/13 07:46:23 UTC
[pulsar] 29/46: Java Client: MessageImpl - ensure that
AutoConsumeSchema downloaded the schema before decoding the payload
(#10248)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ff5924492d784fb69d795d694e3e58183dd989c9
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Mon Apr 19 14:18:29 2021 +0200
Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload (#10248)
(cherry picked from commit 54523bbff9b2b2b861409a93cfb3712bfdb73f27)
---
.../org/apache/pulsar/client/impl/MessageImpl.java | 11 +-
.../client/impl/schema/AutoConsumeSchema.java | 72 ++++++++----
.../integration/io/TestGenericObjectSink.java | 4 +-
.../io/PulsarGenericObjectSinkTest.java | 123 ++++++++++-----------
.../suites/PulsarStandaloneTestSuite.java | 1 +
.../topologies/PulsarStandaloneTestBase.java | 15 +++
6 files changed, 137 insertions(+), 89 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 3cb2235..a148677 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
public class MessageImpl<T> implements Message<T> {
@@ -279,10 +280,18 @@ public class MessageImpl<T> implements Message<T> {
}
}
+ private SchemaInfo getSchemaInfo() {
+ if (schema instanceof AutoConsumeSchema) {
+ ((AutoConsumeSchema) schema).fetchSchemaIfNeeded();
+ }
+ return schema.getSchemaInfo();
+ }
+
@Override
public T getValue() {
checkNotNull(msgMetadataBuilder);
- if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
+ SchemaInfo schemaInfo = getSchemaInfo();
+ if (schemaInfo != null && SchemaType.KEY_VALUE == schemaInfo.getType()) {
if (schema.supportSchemaVersioning()) {
return getKeyValueBySchemaVersion();
} else {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 41b1260..07c807c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl.schema;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
-import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema;
@@ -30,7 +29,11 @@ import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static com.google.common.base.Preconditions.checkState;
@@ -77,27 +80,7 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
@Override
public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
- if (schema == null) {
- SchemaInfo schemaInfo = null;
- try {
- schemaInfo = schemaInfoProvider.getLatestSchema().get();
- if (schemaInfo == null) {
- // schemaless topic
- schemaInfo = BytesSchema.of().getSchemaInfo();
- }
- } catch (InterruptedException | ExecutionException e ) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- log.error("Can't get last schema for topic {} use AutoConsumeSchema", topicName);
- throw new SchemaSerializationException(e.getCause());
- }
- // schemaInfo null means that there is no schema attached to the topic.
- schema = generateSchema(schemaInfo);
- schema.setSchemaInfoProvider(schemaInfoProvider);
- log.info("Configure {} schema for topic {} : {}",
- componentName, topicName, schemaInfo.getSchemaDefinition());
- }
+ fetchSchemaIfNeeded();
ensureSchemaInitialized();
return adapt(schema.decode(bytes, schemaVersion), schemaVersion);
}
@@ -157,7 +140,7 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
}
}
- private Schema<?> generateSchema(SchemaInfo schemaInfo) {
+ private static Schema<?> generateSchema(SchemaInfo schemaInfo) {
// when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
// to decode the messages.
final boolean useProvidedSchemaAsReaderSchema = false;
@@ -260,4 +243,47 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
public Schema<?> getInternalSchema() {
return schema;
}
+
+ /**
+ * It may happen that the schema is not loaded but we need it, for instance in order to call getSchemaInfo()
+ * We cannot call this method in getSchemaInfo, because getSchemaInfo is called in many
+ * places and we will introduce lots of deadlocks.
+ */
+ public void fetchSchemaIfNeeded() throws SchemaSerializationException {
+ if (schema == null) {
+ if (schemaInfoProvider == null) {
+ throw new SchemaSerializationException("Can't get accurate schema information for topic " + topicName +
+ "using AutoConsumeSchema because SchemaInfoProvider is not set yet");
+ } else {
+ SchemaInfo schemaInfo = null;
+ try {
+ schemaInfo = schemaInfoProvider.getLatestSchema().get();
+ if (schemaInfo == null) {
+ // schemaless topic
+ schemaInfo = BytesSchema.of().getSchemaInfo();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ log.error("Can't get last schema for topic {} using AutoConsumeSchema", topicName);
+ throw new SchemaSerializationException(e.getCause());
+ }
+ // schemaInfo null means that there is no schema attached to the topic.
+ schema = generateSchema(schemaInfo);
+ schema.setSchemaInfoProvider(schemaInfoProvider);
+ log.info("Configure {} schema for topic {} : {}",
+ componentName, topicName, schemaInfo.getSchemaDefinition());
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (schema != null && schema.getSchemaInfo() != null) {
+ return "AUTO_CONSUME(schematype=" + schema.getSchemaInfo().getType() + ")";
+ } else {
+ return "AUTO_CONSUME(uninitialized)";
+ }
+ }
}
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
index c0c4ac2..b7645ba 100644
--- a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
@@ -36,7 +36,7 @@ public class TestGenericObjectSink implements Sink<GenericObject> {
}
public void write(Record<GenericObject> record) {
-
+ log.info("topic {}", record.getTopicName().orElse(null));
log.info("properties {}", record.getProperties());
log.info("received record {} {}", record, record.getClass());
log.info("schema {}", record.getSchema());
@@ -65,6 +65,8 @@ public class TestGenericObjectSink implements Sink<GenericObject> {
log.info("value {}", record.getValue());
log.info("value schema type {}", record.getValue().getSchemaType());
log.info("value native object {}", record.getValue().getNativeObject());
+
+ record.ack();
}
@Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
index 1e5fb74..c810b1b 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.tests.integration.io;
import lombok.Builder;
import lombok.Cleanup;
import lombok.Data;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
@@ -31,6 +32,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
@@ -43,6 +45,8 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
import static org.testng.Assert.assertEquals;
@@ -55,15 +59,14 @@ import static org.testng.Assert.fail;
@Slf4j
public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {
+ @Getter
private static final class SinkSpec<T> {
final String outputTopicName;
- final String sinkName;
final Schema<T> schema;
final T testValue;
- public SinkSpec(String outputTopicName, String sinkName, Schema<T> schema, T testValue) {
+ public SinkSpec(String outputTopicName, Schema<T> schema, T testValue) {
this.outputTopicName = outputTopicName;
- this.sinkName = sinkName;
this.schema = schema;
this.testValue = testValue;
}
@@ -89,90 +92,82 @@ public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {
.serviceUrl(container.getPlainTextServiceUrl())
.build();
+ @Cleanup
+ PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build();
+
// we are not using a parametrized test in order to save resources
- // we create N sinks, send the records and verify each sink
- // sinks execution happens in parallel
+ // we create one sink that listens on multiple topics, send the records and verify the sink
List<SinkSpec> specs = Arrays.asList(
- new SinkSpec("test-kv-sink-input-string-" + randomName(8), "test-kv-sink-string-" + randomName(8), Schema.STRING, "foo"),
- new SinkSpec("test-kv-sink-input-avro-" + randomName(8), "test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
- new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+ new SinkSpec("test-kv-sink-input-string-" + randomName(8), Schema.STRING, "foo"),
+ new SinkSpec("test-kv-sink-input-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
+ new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8),
Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)),
- new SinkSpec("test-kv-sink-input-kv-avro-json-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
- Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
+ new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + randomName(8),
+ Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.INLINE), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build())),
+ new SinkSpec("test-kv-sink-input-kv-avro-json-sep-" + randomName(8),
+ Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.SEPARATED), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
);
- // submit all sinks
- for (SinkSpec spec : specs) {
- submitSinkConnector(spec.sinkName, spec.outputTopicName, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
- }
- // check all sinks
- for (SinkSpec spec : specs) {
- // get sink info
- getSinkInfoSuccess(spec.sinkName);
- getSinkStatus(spec.sinkName);
- }
- final int numRecords = 10;
+ final int numRecordsPerTopic = 2;
+
+ String sinkName = "genericobject-sink";
+ String topicNames = specs
+ .stream()
+ .map(SinkSpec::getOutputTopicName)
+ .collect(Collectors.joining(","));
+ submitSinkConnector(sinkName, topicNames, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
+ // get sink info
+ getSinkInfoSuccess(sinkName);
+ getSinkStatus(sinkName);
+
for (SinkSpec spec : specs) {
+
@Cleanup Producer<Object> producer = client.newProducer(spec.schema)
.topic(spec.outputTopicName)
.create();
- for (int i = 0; i < numRecords; i++) {
+ for (int i = 0; i < numRecordsPerTopic; i++) {
MessageId messageId = producer.newMessage()
.value(spec.testValue)
.property("expectedType", spec.schema.getSchemaInfo().getType().toString())
+ .property("recordNumber", i + "")
.send();
log.info("sent message {} {} with ID {}", spec.testValue, spec.schema.getSchemaInfo().getType().toString(), messageId);
}
}
- // wait that all sinks processed all records without errors
- try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
- for (SinkSpec spec : specs) {
- try {
- log.info("waiting for sink {}", spec.sinkName);
- for (int i = 0; i < 120; i++) {
- SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
- log.info("sink {} status {}", spec.sinkName, status);
- assertEquals(status.getInstances().size(), 1);
- SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
- if (instance.getStatus().numWrittenToSink >= numRecords) {
- break;
- }
- assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
- Thread.sleep(1000);
- }
-
- SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
- log.info("sink {} status {}", spec.sinkName, status);
- assertEquals(status.getInstances().size(), 1);
- assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords);
- assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
- assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
- log.info("sink {} is okay", spec.sinkName);
- } finally {
- dumpSinkLogs(spec);
+ // wait that sink processed all records without errors
+
+ try {
+ log.info("waiting for sink {}", sinkName);
+
+ for (int i = 0; i < 120; i++) {
+ SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
+ log.info("sink {} status {}", sinkName, status);
+ assertEquals(status.getInstances().size(), 1);
+ SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
+ if (instance.getStatus().numWrittenToSink >= numRecordsPerTopic * specs.size()
+ || instance.getStatus().numSinkExceptions > 0
+ || instance.getStatus().numSystemExceptions > 0
+ || instance.getStatus().numRestarts > 0) {
+ break;
}
+ Thread.sleep(1000);
}
- }
-
- for (SinkSpec spec : specs) {
- deleteSink(spec.sinkName);
- getSinkInfoNotFound(spec.sinkName);
+ SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
+ log.info("sink {} status {}", sinkName, status);
+ assertEquals(status.getInstances().size(), 1);
+ assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecordsPerTopic * specs.size());
+ assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
+ assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
+ log.info("sink {} is okay", sinkName);
+ } finally {
+ dumpFunctionLogs(sinkName);
}
- }
- private void dumpSinkLogs(SinkSpec spec) {
- try {
- String logFile = "/pulsar/logs/functions/public/default/" + spec.sinkName + "/" + spec.sinkName + "-0.log";
- String logs = container.<String>copyFileFromContainer(logFile, (inputStream) -> {
- return IOUtils.toString(inputStream, "utf-8");
- });
- log.info("Sink {} logs {}", spec.sinkName, logs);
- } catch (Throwable err) {
- log.info("Cannot download sink {} logs", spec.sinkName, err);
- }
+ deleteSink(sinkName);
+ getSinkInfoNotFound(sinkName);
}
private void submitSinkConnector(String sinkName,
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java
index 4e2e601..93e2e07 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java
@@ -40,4 +40,5 @@ public class PulsarStandaloneTestSuite extends PulsarStandaloneTestBase implemen
public String getTestName() {
return "pulsar-standalone-suite";
}
+
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
index 03132f5..459837b 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.tests.integration.topologies;
import static org.testng.Assert.assertEquals;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
import org.apache.pulsar.tests.integration.containers.StandaloneContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.testcontainers.containers.Network;
@@ -90,4 +91,18 @@ public abstract class PulsarStandaloneTestBase extends PulsarTestBase {
network.close();
}
+
+
+ protected void dumpFunctionLogs(String name) {
+ try {
+ String logFile = "/pulsar/logs/functions/public/default/" + name + "/" + name + "-0.log";
+ String logs = container.<String>copyFileFromContainer(logFile, (inputStream) -> {
+ return IOUtils.toString(inputStream, "utf-8");
+ });
+ log.info("Function {} logs {}", name, logs);
+ } catch (Throwable err) {
+ log.info("Cannot download {} logs", name, err);
+ }
+ }
+
}