You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/20 15:06:10 UTC

[pulsar] 28/31: [Java Client] Fix messages sent by producers without schema cannot be decoded (#15622)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a4e6e2e3651d093cc167b445531fae0e9bc24017
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu May 19 11:49:25 2022 +0800

    [Java Client] Fix messages sent by producers without schema cannot be decoded (#15622)
    
    ### Motivation
    
    When I tried to consume a topic via a consumer with Avro schema while
    the topic was produced by a producer without schema, the consumption
    failed. It's because `MultiVersionSchemaInfoProvider#getSchemaByVersion`
    doesn't check if `schemaVersion` is an empty byte array. If yes, a
    `BytesSchemaVersion` of an empty array will be passed to `cache.get` and
    then passed to `loadSchema`.
    
    https://github.com/apache/pulsar/blob/f90ef9c6ad88c4f94ce1fcc682bbf3f3189cbf2a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java#L94-L98
    
    However, `LookupService#getSchema` cannot accept an empty byte array as
    the version, so `loadSchema` failed.
    
    The root cause is that the schema version was set unexpectly when
    messages were sent by a producer without schema. At broker side, the
    returned schema version is never null. If the schema version was an
    empty array, then it means the message doesn't have schema. However, at
    Java client side, the empty byte array is treated as an existing schema
    and the schema version field will be set. When consumer receives the
    message, it will try to load schema whose version is an empty array.
    
    ### Modifications
    
    - When a producer receives a response whose schema version is an empty
      byte array, just ignore it.
    - Make `MesasgeImpl#getSchemaVersion` return null if the schema version
      is an empty byte array so that the consumer can consume messages
      produced by older version producers without schema. And return the
      internal schema for `getRegetReaderSchema` when `getSchemaVersion`
      returns null.
    - Fix the existing tests. Since producer without schema won't set the
      `schema_version` field after this patch, some tests that rely on the
      precise stats should be modified.
    - Add `testConsumeAvroMessagesWithoutSchema` to cover the case that
      messages without schema are compatible with the schema.
    
    This patch also modifies the existing behavior when
    `schemaValidationEnforced` is false and messages are produced by a
    producer without schema and consumed by a consumer with schema.
    
    1. If the message is incompatible with the schema
       - Before: `getSchemaVersion` returns an empty array and `getValue`
         fails with `SerializationException`:
    
         > org.apache.commons.lang3.SerializationException: Failed at fetching schema info for EMPTY
    
       - After: `getSchemaVersion` returns `null` and `getValue` fails with
         `SchemaSerializationException`.
    
    2. Otherwise (the message is compatible with the schema)
       - Before: `getSchemaVersion` returns an empty array and `getValue`
         fails with `SerializationException`.
       - After: `getSchemaVersion` returns `null` and `getValue` returns the
         correctly decoded object.
    
    (cherry picked from commit ecd275dc21f33483a649e5b872990771257b1d45)
---
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  8 ++--
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  2 +-
 .../RGUsageMTAggrWaitForAllMsgsTest.java           |  6 +--
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  2 +-
 .../apache/pulsar/client/api/SimpleSchemaTest.java | 52 ++++++++++++++++++++--
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 17 +++----
 .../org/apache/pulsar/client/impl/MessageImpl.java | 26 +++++++++--
 .../apache/pulsar/client/impl/ProducerImpl.java    | 11 +++--
 .../pulsar/client/impl/ProducerResponse.java       | 14 +++++-
 9 files changed, 109 insertions(+), 29 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index a6b89112634..e8eee716faf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -1291,7 +1291,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
         assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
 
         topicStats = admin.topics().getStats(topic, true, true);
-        assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 43);
+        assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 40);
         assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 1);
         consumer.acknowledge(message);
 
@@ -1545,7 +1545,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
 
         topicStats = admin.topics().getPartitionedStats(topic, false, true, true);
         assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 1);
-        assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 43);
+        assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 40);
     }
 
     @Test(timeOut = 30000)
@@ -1584,7 +1584,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
 
         TopicStats topicStats = admin.topics().getPartitionedStats(topic, false, true, true);
         assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
-        assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 470);
+        assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 440);
         assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 5);
 
         for (int i = 0; i < 5; i++) {
@@ -1594,7 +1594,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
         Awaitility.await().untilAsserted(() -> {
             TopicStats topicStats2 = admin.topics().getPartitionedStats(topic, false, true, true);
             assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 5);
-            assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 238);
+            assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 223);
             assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 0);
         });
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index f1040840a09..f75d41be675 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -775,7 +775,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
             completableFuture = batchProducer.sendAsync("a".getBytes());
         }
         completableFuture.get();
-        Assert.assertEquals(Optional.ofNullable(admin.topics().getBacklogSizeByMessageId(topicName + "-partition-0", MessageId.earliest)), Optional.of(350L));
+        Assert.assertEquals(Optional.ofNullable(admin.topics().getBacklogSizeByMessageId(topicName + "-partition-0", MessageId.earliest)), Optional.of(320L));
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index 386e8bf83b5..27f9e905262 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -551,7 +551,7 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
 
         log.debug("verifyProdConsStats: topicStatsMap has {} entries", topicStatsMap.size());
 
-        // Pulsar runtime adds some additional bytes in the exchanges: a 45-byte per-message
+        // Pulsar runtime adds some additional bytes in the exchanges: a 42-byte per-message
         // metadata of some kind, plus more as the number of messages increases.
         // Hence the ">=" assertion with ExpectedNumBytesSent/Received in the following checks.
         final int ExpectedNumBytesSent = sentNumBytes + PER_MESSAGE_METADATA_OHEAD * sentNumMsgs;
@@ -769,8 +769,8 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
         Assert.assertNotEquals(ninthPercentileValue, 0);
     }
 
-    // Empirically, there appears to be a 45-byte overhead for metadata, imposed by Pulsar runtime.
-    private static final int PER_MESSAGE_METADATA_OHEAD = 45;
+    // Empirically, there appears to be a 42-byte overhead for metadata, imposed by Pulsar runtime.
+    private static final int PER_MESSAGE_METADATA_OHEAD = 42;
 
     private static final int PUBLISH_INTERVAL_SECS = 10;
     private static final int NUM_PRODUCERS = 4;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index c2d46e2edb3..3f64619a983 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -1361,7 +1361,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         assertEquals(cm.get(0).value, 10);
         cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_size");
         assertEquals(cm.size(), 1);
-        assertEquals(cm.get(0).value, 870);
+        assertEquals(cm.get(0).value, 840);
 
         pulsarClient.close();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index fd8036eaf9e..983a7f341e0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -49,7 +49,6 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.reader.AvroReader;
 import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -62,6 +61,7 @@ import org.testng.annotations.Factory;
 import org.testng.annotations.Test;
 
 import java.io.ByteArrayInputStream;
+import java.io.EOFException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
@@ -305,7 +305,13 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                             + " if SchemaValidationEnabled is enabled");
                 }
                 Message<V2Data> msg3 = c.receive();
-                Assert.assertEquals(msg3.getSchemaVersion(), SchemaVersion.Empty.bytes());
+                assertNull(msg3.getSchemaVersion());
+                try {
+                    msg3.getValue();
+                    fail("Schema should be incompatible");
+                } catch (SchemaSerializationException e) {
+                    assertTrue(e.getCause() instanceof EOFException);
+                }
             } catch (PulsarClientException e) {
                 if (schemaValidationEnforced) {
                     Assert.assertTrue(e instanceof IncompatibleSchemaException);
@@ -366,7 +372,13 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                             + " if SchemaValidationEnabled is enabled");
                 }
                 Message<V2Data> msg3 = c.receive();
-                Assert.assertEquals(msg3.getSchemaVersion(), SchemaVersion.Empty.bytes());
+                assertNull(msg3.getSchemaVersion());
+                try {
+                    msg3.getValue();
+                    fail("Schema should be incompatible");
+                } catch (SchemaSerializationException e) {
+                    assertTrue(e.getCause() instanceof EOFException);
+                }
             } catch (PulsarClientException e) {
                 if (schemaValidationEnforced) {
                     Assert.assertTrue(e instanceof IncompatibleSchemaException);
@@ -1253,4 +1265,38 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
 
         }
     }
+
+    @Test
+    public void testConsumeAvroMessagesWithoutSchema() throws Exception {
+        if (schemaValidationEnforced) {
+            return;
+        }
+        final String topic = "test-consume-avro-messages-without-schema-" + UUID.randomUUID();
+        final Schema<V1Data> schema = Schema.AVRO(V1Data.class);
+        final Consumer<V1Data> consumer = pulsarClient.newConsumer(schema)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        final Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+
+        final int numMessages = 5;
+        for (int i = 0; i < numMessages; i++) {
+            producer.send(schema.encode(new V1Data(i)));
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            final Message<V1Data> msg = consumer.receive(3, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            log.info("Received {} from {}", msg.getValue().i, topic);
+            assertEquals(msg.getValue().i, i);
+            assertEquals(msg.getReaderSchema().orElse(Schema.BYTES).getSchemaInfo(), schema.getSchemaInfo());
+            consumer.acknowledge(msg);
+        }
+
+        producer.close();
+        consumer.close();
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 217e76820f1..c661ae07bf2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -29,7 +29,6 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
 
-import com.google.common.base.Throwables;
 import lombok.EqualsAndHashCode;
 import org.apache.avro.Schema.Parser;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -1119,7 +1118,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         stopBroker();
         isTcpLookup = false;
         setup();
-        testEmptySchema();
+        testIncompatibleSchema();
     }
 
     @Test
@@ -1127,10 +1126,10 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         stopBroker();
         isTcpLookup = true;
         setup();
-        testEmptySchema();
+        testIncompatibleSchema();
     }
 
-    private void testEmptySchema() throws Exception {
+    private void testIncompatibleSchema() throws Exception {
         final String namespace = "test-namespace-" + randomName(16);
         String ns = PUBLIC_TENANT + "/" + namespace;
         admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
@@ -1164,12 +1163,14 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         producer.send("test".getBytes(StandardCharsets.UTF_8));
         Message<User> message1 = consumer.receive();
         Assert.assertEquals(test, message1.getValue());
+        Message<User> message2 = consumer.receive();
         try {
-            Message<User> message2 = consumer.receive();
             message2.getValue();
-        } catch (Throwable ex) {
-            Assert.assertTrue(Throwables.getRootCause(ex) instanceof SchemaSerializationException);
-            Assert.assertEquals(Throwables.getRootCause(ex).getMessage(),"Empty schema version");
+        } catch (SchemaSerializationException e) {
+            final String schemaString =
+                    new String(Schema.AVRO(User.class).getSchemaInfo().getSchema(), StandardCharsets.UTF_8);
+            Assert.assertTrue(e.getMessage().contains(schemaString));
+            Assert.assertTrue(e.getMessage().contains("payload (4 bytes)"));
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index a184bd145b8..2fb9311b4b5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -28,6 +28,7 @@ import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.List;
@@ -40,6 +41,7 @@ import lombok.Getter;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.impl.schema.AbstractSchema;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
@@ -382,12 +384,14 @@ public class MessageImpl<T> implements Message<T> {
         if (schema == null) {
             return Optional.empty();
         }
+        byte[] schemaVersion = getSchemaVersion();
+        if (schemaVersion == null) {
+            return Optional.of(schema);
+        }
         if (schema instanceof AutoConsumeSchema) {
-            byte[] schemaVersion = getSchemaVersion();
             return Optional.of(((AutoConsumeSchema) schema)
                     .atSchemaVersion(schemaVersion));
         } else if (schema instanceof AbstractSchema) {
-            byte[] schemaVersion = getSchemaVersion();
             return Optional.of(((AbstractSchema<?>) schema)
                     .atSchemaVersion(schemaVersion));
         } else {
@@ -395,10 +399,13 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
+    // For messages produced by older version producers without schema, the schema version is an empty byte array
+    // rather than null.
     @Override
     public byte[] getSchemaVersion() {
         if (msgMetadata.hasSchemaVersion()) {
-            return msgMetadata.getSchemaVersion();
+            byte[] schemaVersion = msgMetadata.getSchemaVersion();
+            return (schemaVersion.length == 0) ? null : schemaVersion;
         } else {
             return null;
         }
@@ -464,8 +471,19 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
-
     private T decode(byte[] schemaVersion) {
+        try {
+            return decodeBySchema(schemaVersion);
+        } catch (ArrayIndexOutOfBoundsException e) {
+            // It usually means the message was produced without schema check while the message is not compatible with
+            // the current schema. Therefore, convert it to SchemaSerializationException with a better description.
+            final int payloadSize = payload.readableBytes();
+            throw new SchemaSerializationException("payload (" + payloadSize + " bytes) cannot be decoded with schema "
+                    + new String(schema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8));
+        }
+    }
+
+    private T decodeBySchema(byte[] schemaVersion) {
         T value = poolMessage ? schema.decode(payload.nioBuffer(), schemaVersion) : null;
         if (value != null) {
             return value;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 0bf37a93a3a..99197262585 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -699,9 +699,14 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 }
             } else {
                 log.info("[{}] [{}] GetOrCreateSchema succeed", topic, producerName);
-                SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
-                schemaCache.putIfAbsent(schemaHash, v);
-                msg.getMessageBuilder().setSchemaVersion(v);
+                // In broker, if schema version is an empty byte array, it means the topic doesn't have schema. In this
+                // case, we should not cache the schema version so that the schema version of the message metadata will
+                // be null, instead of an empty array.
+                if (v.length != 0) {
+                    SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
+                    schemaCache.putIfAbsent(schemaHash, v);
+                    msg.getMessageBuilder().setSchemaVersion(v);
+                }
                 msg.setSchemaState(MessageImpl.SchemaState.Ready);
             }
             cnx.ctx().channel().eventLoop().execute(() -> {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java
index efe418d89cd..7d710815bf2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java
@@ -20,9 +20,9 @@ package org.apache.pulsar.client.impl;
 
 import java.util.Optional;
 import lombok.AllArgsConstructor;
-import lombok.Data;
+import lombok.Getter;
 
-@Data
+@Getter
 @AllArgsConstructor
 public class ProducerResponse {
     private String producerName;
@@ -30,4 +30,14 @@ public class ProducerResponse {
     private byte[] schemaVersion;
 
     private Optional<Long> topicEpoch;
+
+    // Shadow the default getter generated by lombok. In broker, if the schema version is an empty byte array, it means
+    // the topic doesn't have schema.
+    public byte[] getSchemaVersion() {
+        if (schemaVersion != null && schemaVersion.length != 0) {
+            return schemaVersion;
+        } else {
+            return null;
+        }
+    }
 }