You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/06 04:44:48 UTC

[pulsar] branch branch-2.7 updated: [Patch] Fix decode compression managedLedgerInfo data (#11569)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new f83f44b  [Patch] Fix decode compression managedLedgerInfo data (#11569)
f83f44b is described below

commit f83f44b26f8def98d58bce7a486c82ca75285e75
Author: ran <ga...@126.com>
AuthorDate: Fri Aug 6 05:13:35 2021 +0800

    [Patch] Fix decode compression managedLedgerInfo data (#11569)
    
    * unify using the class `MLDataFormats.CompressionType` in package `managed-ledger`.
    * fix compression data decode error
    
    (cherry picked from commit f008777b8d32a58b9ce9d9ad6299ec3a0b181d27)
---
 .../mledger/ManagedLedgerFactoryConfig.java        |  5 +--
 .../bookkeeper/mledger/impl/MetaStoreImpl.java     |  8 ++--
 .../service/ManagedLedgerCompressionTest.java      | 47 +++++++++++++---------
 3 files changed, 34 insertions(+), 26 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index 668a953..3a987fa 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -21,8 +21,7 @@ package org.apache.bookkeeper.mledger;
 import lombok.Data;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
-import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
-
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 /**
  * Configuration for a {@link ManagedLedgerFactory}.
  */
@@ -80,5 +79,5 @@ public class ManagedLedgerFactoryConfig {
     /**
      * ManagedLedgerInfo compression type. If the compression type is null or invalid, don't compress data.
      */
-    private String managedLedgerInfoCompressionType = CompressionType.NONE.name();
+    private String managedLedgerInfoCompressionType = MLDataFormats.CompressionType.NONE.name();
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index bb36065..083e641 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -317,7 +317,8 @@ public class MetaStoreImpl implements MetaStore {
             metadataByteBuf.writeInt(mlInfoMetadata.getSerializedSize());
             metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray());
 
-            encodeByteBuf = getCompressionCodec().encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray()));
+            encodeByteBuf = getCompressionCodec(compressionType)
+                    .encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray()));
 
             CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
             compositeByteBuf.addComponent(true, metadataByteBuf);
@@ -347,7 +348,8 @@ public class MetaStoreImpl implements MetaStore {
                         MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
 
                 long unpressedSize = metadata.getUncompressedSize();
-                decodeByteBuf = getCompressionCodec().decode(byteBuf, (int) unpressedSize);
+                decodeByteBuf = getCompressionCodec(metadata.getCompressionType())
+                        .decode(byteBuf, (int) unpressedSize);
                 byte[] decodeBytes;
                 // couldn't decode data by ZLIB compression byteBuf array() directly
                 if (decodeByteBuf.hasArray() && !CompressionType.ZLIB.equals(metadata.getCompressionType())) {
@@ -371,7 +373,7 @@ public class MetaStoreImpl implements MetaStore {
         }
     }
 
-    private CompressionCodec getCompressionCodec() {
+    private CompressionCodec getCompressionCodec(CompressionType compressionType) {
         return CompressionCodecProvider.getCompressionCodec(
                 org.apache.pulsar.common.api.proto.PulsarApi.CompressionType.valueOf(compressionType.name()));
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
index 626522d..9a3bc2f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
@@ -20,10 +20,11 @@ package org.apache.pulsar.broker.service;
 
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
-import org.apache.pulsar.client.api.CompressionType;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -37,7 +38,7 @@ public class ManagedLedgerCompressionTest extends BrokerTestBase {
     @BeforeClass
     @Override
     protected void setup() throws Exception {
-        conf.setManagedLedgerInfoCompressionType(CompressionType.NONE.name());
+        conf.setManagedLedgerInfoCompressionType(MLDataFormats.CompressionType.NONE.name());
         super.baseSetup();
     }
 
@@ -47,7 +48,7 @@ public class ManagedLedgerCompressionTest extends BrokerTestBase {
         super.internalCleanup();
     }
 
-    @Test(timeOut = 1000 * 10)
+    @Test(timeOut = 1000 * 20)
     public void testRestartBrokerEnableManagedLedgerInfoCompression() throws Exception {
         String topic = newTopicName();
         @Cleanup
@@ -61,27 +62,17 @@ public class ManagedLedgerCompressionTest extends BrokerTestBase {
                 .subscribe();
 
         int messageCnt = 100;
-        for (int i = 0; i < messageCnt; i++) {
-            producer.newMessage().value("test".getBytes()).send();
-        }
-        for (int i = 0; i < messageCnt; i++) {
-            Message<byte[]> message = consumer.receive(1000, TimeUnit.SECONDS);
-            consumer.acknowledge(message);
-            Assert.assertNotNull(message);
-        }
+        produceAndConsume(producer, consumer, messageCnt);
 
         stopBroker();
-        conf.setManagedLedgerInfoCompressionType(CompressionType.ZSTD.name());
+        conf.setManagedLedgerInfoCompressionType(MLDataFormats.CompressionType.ZSTD.name());
         startBroker();
+        produceAndConsume(producer, consumer, messageCnt);
 
-        for (int i = 0; i < messageCnt; i++) {
-            producer.newMessage().value("test".getBytes()).send();
-        }
-        for (int i = 0; i < messageCnt; i++) {
-            Message<byte[]> message = consumer.receive(1000, TimeUnit.SECONDS);
-            Assert.assertNotNull(message);
-            consumer.acknowledge(message);
-        }
+        stopBroker();
+        conf.setManagedLedgerInfoCompressionType(MLDataFormats.CompressionType.LZ4.name());
+        startBroker();
+        produceAndConsume(producer, consumer, messageCnt);
 
         stopBroker();
         conf.setManagedLedgerInfoCompressionType("INVALID");
@@ -94,6 +85,22 @@ public class ManagedLedgerCompressionTest extends BrokerTestBase {
                     "No enum constant org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType.INVALID",
                     e.getCause().getMessage());
         }
+
+        conf.setManagedLedgerInfoCompressionType(MLDataFormats.CompressionType.NONE.name());
+        startBroker();
+        produceAndConsume(producer, consumer, messageCnt);
+    }
+
+    private void produceAndConsume(Producer<byte[]> producer,
+                                   Consumer<byte[]> consumer, int messageCnt) throws PulsarClientException {
+        for (int i = 0; i < messageCnt; i++) {
+            producer.newMessage().value("test".getBytes()).send();
+        }
+        for (int i = 0; i < messageCnt; i++) {
+            Message<byte[]> message = consumer.receive(1000, TimeUnit.SECONDS);
+            consumer.acknowledge(message);
+            Assert.assertNotNull(message);
+        }
     }
 
 }