You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/05/24 10:00:11 UTC

[pulsar] branch branch-2.9 updated: [Java Client] Fix messages sent by producers without schema cannot be decoded (#15622)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 4f65f65c6fe [Java Client] Fix messages sent by producers without schema cannot be decoded (#15622)
4f65f65c6fe is described below

commit 4f65f65c6fe6f7e97e83a13d038c6dbf28c3f8c6
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu May 19 11:49:25 2022 +0800

    [Java Client] Fix messages sent by producers without schema cannot be decoded (#15622)
    
    ### Motivation
    
    When I tried to consume a topic via a consumer with Avro schema while
    the topic was produced by a producer without schema, the consumption
    failed. It's because `MultiVersionSchemaInfoProvider#getSchemaByVersion`
    doesn't check if `schemaVersion` is an empty byte array. If yes, a
    `BytesSchemaVersion` of an empty array will be passed to `cache.get` and
    then passed to `loadSchema`.
    
    https://github.com/apache/pulsar/blob/f90ef9c6ad88c4f94ce1fcc682bbf3f3189cbf2a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java#L94-L98
    
    However, `LookupService#getSchema` cannot accept an empty byte array as
    the version, so `loadSchema` failed.
    
    The root cause is that the schema version was set unexpectly when
    messages were sent by a producer without schema. At broker side, the
    returned schema version is never null. If the schema version was an
    empty array, then it means the message doesn't have schema. However, at
    Java client side, the empty byte array is treated as an existing schema
    and the schema version field will be set. When consumer receives the
    message, it will try to load schema whose version is an empty array.
    
    ### Modifications
    
    - When a producer receives a response whose schema version is an empty
      byte array, just ignore it.
    - Make `MesasgeImpl#getSchemaVersion` return null if the schema version
      is an empty byte array so that the consumer can consume messages
      produced by older version producers without schema. And return the
      internal schema for `getRegetReaderSchema` when `getSchemaVersion`
      returns null.
    - Fix the existing tests. Since producer without schema won't set the
      `schema_version` field after this patch, some tests that rely on the
      precise stats should be modified.
    - Add `testConsumeAvroMessagesWithoutSchema` to cover the case that
      messages without schema are compatible with the schema.
    
    This patch also modifies the existing behavior when
    `schemaValidationEnforced` is false and messages are produced by a
    producer without schema and consumed by a consumer with schema.
    
    1. If the message is incompatible with the schema
       - Before: `getSchemaVersion` returns an empty array and `getValue`
         fails with `SerializationException`:
    
         > org.apache.commons.lang3.SerializationException: Failed at fetching schema info for EMPTY
    
       - After: `getSchemaVersion` returns `null` and `getValue` fails with
         `SchemaSerializationException`.
    
    2. Otherwise (the message is compatible with the schema)
       - Before: `getSchemaVersion` returns an empty array and `getValue`
         fails with `SerializationException`.
       - After: `getSchemaVersion` returns `null` and `getValue` returns the
         correctly decoded object.
    
    (cherry picked from commit ecd275dc21f33483a649e5b872990771257b1d45)
---
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  2 +-
 .../RGUsageMTAggrWaitForAllMsgsTest.java           |  7 ++-
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  2 +-
 .../apache/pulsar/client/api/SimpleSchemaTest.java | 52 ++++++++++++++++++++--
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 17 +++----
 .../org/apache/pulsar/client/impl/MessageImpl.java | 26 +++++++++--
 .../apache/pulsar/client/impl/ProducerImpl.java    | 11 +++--
 .../pulsar/client/impl/ProducerResponse.java       | 14 +++++-
 8 files changed, 105 insertions(+), 26 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 78f3d2e7a20..2baf5b0f469 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -737,7 +737,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
             completableFuture = batchProducer.sendAsync("a".getBytes());
         }
         completableFuture.get();
-        Assert.assertEquals(Optional.ofNullable(admin.topics().getBacklogSizeByMessageId(topicName + "-partition-0", MessageId.earliest)), Optional.of(350L));
+        Assert.assertEquals(Optional.ofNullable(admin.topics().getBacklogSizeByMessageId(topicName + "-partition-0", MessageId.earliest)), Optional.of(320L));
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index fda8693dd84..ce3f033a3b0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -556,7 +556,7 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
 
         log.debug("verfyProdConsStats: topicStatsMap has {} entries", topicStatsMap.size());
 
-        // Pulsar runtime adds some additional bytes in the exchanges: a 45-byte per-message
+        // Pulsar runtime adds some additional bytes in the exchanges: a 42-byte per-message
         // metadata of some kind, plus more as the number of messages increases.
         // Hence the ">=" assertion with ExpectedNumBytesSent/Received in the following checks.
         final int ExpectedNumBytesSent = sentNumBytes + PER_MESSAGE_METADATA_OHEAD * sentNumMsgs;
@@ -787,9 +787,8 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
     }
 
     private static final Logger log = LoggerFactory.getLogger(RGUsageMTAggrWaitForAllMsgsTest.class);
-
-    // Empirically, there appears to be a 45-byte overhead for metadata, imposed by Pulsar runtime.
-    private static final int PER_MESSAGE_METADATA_OHEAD = 45;
+    // Empirically, there appears to be a 42-byte overhead for metadata, imposed by Pulsar runtime.
+    private static final int PER_MESSAGE_METADATA_OHEAD = 42;
 
     private static final int PUBLISH_INTERVAL_SECS = 10;
     private static final int NUM_PRODUCERS = 4;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 9f098935efa..3a7d23b06e7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -1217,7 +1217,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         assertEquals(cm.get(0).value, 10);
         cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_size");
         assertEquals(cm.size(), 1);
-        assertEquals(cm.get(0).value, 870);
+        assertEquals(cm.get(0).value, 840);
 
         pulsarClient.close();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index fd8036eaf9e..983a7f341e0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -49,7 +49,6 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.reader.AvroReader;
 import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -62,6 +61,7 @@ import org.testng.annotations.Factory;
 import org.testng.annotations.Test;
 
 import java.io.ByteArrayInputStream;
+import java.io.EOFException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
@@ -305,7 +305,13 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                             + " if SchemaValidationEnabled is enabled");
                 }
                 Message<V2Data> msg3 = c.receive();
-                Assert.assertEquals(msg3.getSchemaVersion(), SchemaVersion.Empty.bytes());
+                assertNull(msg3.getSchemaVersion());
+                try {
+                    msg3.getValue();
+                    fail("Schema should be incompatible");
+                } catch (SchemaSerializationException e) {
+                    assertTrue(e.getCause() instanceof EOFException);
+                }
             } catch (PulsarClientException e) {
                 if (schemaValidationEnforced) {
                     Assert.assertTrue(e instanceof IncompatibleSchemaException);
@@ -366,7 +372,13 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                             + " if SchemaValidationEnabled is enabled");
                 }
                 Message<V2Data> msg3 = c.receive();
-                Assert.assertEquals(msg3.getSchemaVersion(), SchemaVersion.Empty.bytes());
+                assertNull(msg3.getSchemaVersion());
+                try {
+                    msg3.getValue();
+                    fail("Schema should be incompatible");
+                } catch (SchemaSerializationException e) {
+                    assertTrue(e.getCause() instanceof EOFException);
+                }
             } catch (PulsarClientException e) {
                 if (schemaValidationEnforced) {
                     Assert.assertTrue(e instanceof IncompatibleSchemaException);
@@ -1253,4 +1265,38 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
 
         }
     }
+
+    @Test
+    public void testConsumeAvroMessagesWithoutSchema() throws Exception {
+        if (schemaValidationEnforced) {
+            return;
+        }
+        final String topic = "test-consume-avro-messages-without-schema-" + UUID.randomUUID();
+        final Schema<V1Data> schema = Schema.AVRO(V1Data.class);
+        final Consumer<V1Data> consumer = pulsarClient.newConsumer(schema)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        final Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+
+        final int numMessages = 5;
+        for (int i = 0; i < numMessages; i++) {
+            producer.send(schema.encode(new V1Data(i)));
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            final Message<V1Data> msg = consumer.receive(3, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            log.info("Received {} from {}", msg.getValue().i, topic);
+            assertEquals(msg.getValue().i, i);
+            assertEquals(msg.getReaderSchema().orElse(Schema.BYTES).getSchemaInfo(), schema.getSchemaInfo());
+            consumer.acknowledge(msg);
+        }
+
+        producer.close();
+        consumer.close();
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index c45888f3858..682d6a52b78 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -29,7 +29,6 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
 
-import com.google.common.base.Throwables;
 import lombok.EqualsAndHashCode;
 import org.apache.avro.Schema.Parser;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -1071,7 +1070,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         stopBroker();
         isTcpLookup = false;
         setup();
-        testEmptySchema();
+        testIncompatibleSchema();
     }
 
     @Test
@@ -1079,10 +1078,10 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         stopBroker();
         isTcpLookup = true;
         setup();
-        testEmptySchema();
+        testIncompatibleSchema();
     }
 
-    private void testEmptySchema() throws Exception {
+    private void testIncompatibleSchema() throws Exception {
         final String namespace = "test-namespace-" + randomName(16);
         String ns = PUBLIC_TENANT + "/" + namespace;
         admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
@@ -1116,12 +1115,14 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         producer.send("test".getBytes(StandardCharsets.UTF_8));
         Message<User> message1 = consumer.receive();
         Assert.assertEquals(test, message1.getValue());
+        Message<User> message2 = consumer.receive();
         try {
-            Message<User> message2 = consumer.receive();
             message2.getValue();
-        } catch (Throwable ex) {
-            Assert.assertTrue(Throwables.getRootCause(ex) instanceof SchemaSerializationException);
-            Assert.assertEquals(Throwables.getRootCause(ex).getMessage(),"Empty schema version");
+        } catch (SchemaSerializationException e) {
+            final String schemaString =
+                    new String(Schema.AVRO(User.class).getSchemaInfo().getSchema(), StandardCharsets.UTF_8);
+            Assert.assertTrue(e.getMessage().contains(schemaString));
+            Assert.assertTrue(e.getMessage().contains("payload (4 bytes)"));
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 67c176cfe63..fd00475de71 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -31,6 +31,7 @@ import io.netty.util.Recycler.Handle;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.List;
@@ -42,6 +43,7 @@ import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.impl.schema.AbstractSchema;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
@@ -391,12 +393,14 @@ public class MessageImpl<T> implements Message<T> {
         if (schema == null) {
             return Optional.empty();
         }
+        byte[] schemaVersion = getSchemaVersion();
+        if (schemaVersion == null) {
+            return Optional.of(schema);
+        }
         if (schema instanceof AutoConsumeSchema) {
-            byte[] schemaVersion = getSchemaVersion();
             return Optional.of(((AutoConsumeSchema) schema)
                     .atSchemaVersion(schemaVersion));
         } else if (schema instanceof AbstractSchema) {
-            byte[] schemaVersion = getSchemaVersion();
             return Optional.of(((AbstractSchema<?>) schema)
                     .atSchemaVersion(schemaVersion));
         } else {
@@ -404,10 +408,13 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
+    // For messages produced by older version producers without schema, the schema version is an empty byte array
+    // rather than null.
     @Override
     public byte[] getSchemaVersion() {
         if (msgMetadata.hasSchemaVersion()) {
-            return msgMetadata.getSchemaVersion();
+            byte[] schemaVersion = msgMetadata.getSchemaVersion();
+            return (schemaVersion.length == 0) ? null : schemaVersion;
         } else {
             return null;
         }
@@ -472,8 +479,19 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
-
     private T decode(byte[] schemaVersion) {
+        try {
+            return decodeBySchema(schemaVersion);
+        } catch (ArrayIndexOutOfBoundsException e) {
+            // It usually means the message was produced without schema check while the message is not compatible with
+            // the current schema. Therefore, convert it to SchemaSerializationException with a better description.
+            final int payloadSize = payload.readableBytes();
+            throw new SchemaSerializationException("payload (" + payloadSize + " bytes) cannot be decoded with schema "
+                    + new String(schema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8));
+        }
+    }
+
+    private T decodeBySchema(byte[] schemaVersion) {
         T value = poolMessage ? schema.decode(payload.nioBuffer(), schemaVersion) : null;
         if (value != null) {
             return value;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 2637c495304..d5d1d6e7382 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -675,9 +675,14 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 }
             } else {
                 log.info("[{}] [{}] GetOrCreateSchema succeed", topic, producerName);
-                SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
-                schemaCache.putIfAbsent(schemaHash, v);
-                msg.getMessageBuilder().setSchemaVersion(v);
+                // In broker, if schema version is an empty byte array, it means the topic doesn't have schema. In this
+                // case, we should not cache the schema version so that the schema version of the message metadata will
+                // be null, instead of an empty array.
+                if (v.length != 0) {
+                    SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
+                    schemaCache.putIfAbsent(schemaHash, v);
+                    msg.getMessageBuilder().setSchemaVersion(v);
+                }
                 msg.setSchemaState(MessageImpl.SchemaState.Ready);
             }
             cnx.ctx().channel().eventLoop().execute(() -> {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java
index 36b47f2b6d6..2c9cfa74d1a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java
@@ -21,9 +21,9 @@ package org.apache.pulsar.client.impl;
 import java.util.Optional;
 
 import lombok.AllArgsConstructor;
-import lombok.Data;
+import lombok.Getter;
 
-@Data
+@Getter
 @AllArgsConstructor
 public class ProducerResponse {
     private String producerName;
@@ -31,4 +31,14 @@ public class ProducerResponse {
     private byte[] schemaVersion;
 
     private Optional<Long> topicEpoch;
+
+    // Shadow the default getter generated by lombok. In broker, if the schema version is an empty byte array, it means
+    // the topic doesn't have schema.
+    public byte[] getSchemaVersion() {
+        if (schemaVersion != null && schemaVersion.length != 0) {
+            return schemaVersion;
+        } else {
+            return null;
+        }
+    }
 }