You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/08/04 22:39:03 UTC

[pulsar] branch master updated: [ManagedLedger] Compress managed ledger info (#11490)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4361b6d  [ManagedLedger] Compress managed ledger info (#11490)
4361b6d is described below

commit 4361b6db563bf73232ecadafda245b33cc524aaa
Author: ran <ga...@126.com>
AuthorDate: Thu Aug 5 06:38:16 2021 +0800

    [ManagedLedger] Compress managed ledger info (#11490)
    
    * compress managed ledger info
    
    * 1. Move the `ManagedLedgerInfoMetadata` to `MLDataFormats.proto`.
    2. Add configuration managedLedgerInfoCompressionType to control ManagedLedgerInfo compression type.
    
    * use ByteBuf wrap bytes array, release ByteBuf if needed.
    
    * make the compressionType as a final field
    
    * fix comment
    
    * 1. throw exception if using a invalid compression type.
    2. change compression magic number.
    3. add a unit test to verify compression could work well.
    
    * change compression magic number.
    
    * fix test
---
 .../mledger/ManagedLedgerFactoryConfig.java        |   6 +
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |   2 +-
 .../bookkeeper/mledger/impl/MetaStoreImpl.java     | 118 ++++++++++++++++++-
 managed-ledger/src/main/proto/MLDataFormats.proto  |  13 +++
 .../impl/ManagedLedgerInfoMetadataTest.java        | 130 +++++++++++++++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java |   5 +
 .../pulsar/broker/ManagedLedgerClientFactory.java  |   1 +
 .../service/ManagedLedgerCompressionTest.java      |  99 ++++++++++++++++
 8 files changed, 370 insertions(+), 4 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index d2109ea..02ac4e9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger;
 import lombok.Data;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.pulsar.common.api.proto.CompressionType;
 
 /**
  * Configuration for a {@link ManagedLedgerFactory}.
@@ -75,4 +76,9 @@ public class ManagedLedgerFactoryConfig {
      * cluster name for prometheus stats
      */
     private String clusterName;
+
+    /**
+     * ManagedLedgerInfo compression type. If the compression type is null or invalid, don't compress data.
+     */
+    private String managedLedgerInfoCompressionType = CompressionType.NONE.name();
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index b135577..16d577a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -172,7 +172,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         this.bookkeeperFactory = bookKeeperGroupFactory;
         this.isBookkeeperManaged = isBookkeeperManaged;
         this.metadataStore = metadataStore;
-        this.store = new MetaStoreImpl(metadataStore, scheduledExecutor);
+        this.store = new MetaStoreImpl(metadataStore, scheduledExecutor, config.getManagedLedgerInfoCompressionType());
         this.config = config;
         this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
         this.entryCacheManager = new EntryCacheManager(this);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index caa21a1..cfe3aaf 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -25,15 +25,23 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletionException;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
 import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Stat;
@@ -47,9 +55,30 @@ public class MetaStoreImpl implements MetaStore {
     private final MetadataStore store;
     private final OrderedExecutor executor;
 
+    private static final int MAGIC_MANAGED_LEDGER_INFO_METADATA = 0x4778; // 0100 0111 0111 1000
+    private final CompressionType compressionType;
+
     public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) {
         this.store = store;
         this.executor = executor;
+        this.compressionType = CompressionType.NONE;
+    }
+
+    public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String compressionType) {
+        this.store = store;
+        this.executor = executor;
+        CompressionType finalCompressionType;
+        if (compressionType != null) {
+            try {
+                finalCompressionType = CompressionType.valueOf(compressionType);
+            } catch (Exception e) {
+                log.error("Failed to get compression type {} error msg: {}.", compressionType, e.getMessage());
+                throw e;
+            }
+        } else {
+            finalCompressionType = CompressionType.NONE;
+        }
+        this.compressionType = finalCompressionType;
     }
 
     @Override
@@ -62,7 +91,7 @@ public class MetaStoreImpl implements MetaStore {
                     if (optResult.isPresent()) {
                         ManagedLedgerInfo info;
                         try {
-                            info = ManagedLedgerInfo.parseFrom(optResult.get().getValue());
+                            info = parseManagedLedgerInfo(optResult.get().getValue());
                             info = updateMLInfoTimestamp(info);
                             callback.operationComplete(info, optResult.get().getStat());
                         } catch (InvalidProtocolBufferException e) {
@@ -101,9 +130,8 @@ public class MetaStoreImpl implements MetaStore {
             log.debug("[{}] Updating metadata version={} with content={}", ledgerName, stat, mlInfo);
         }
 
-        byte[] serializedMlInfo = mlInfo.toByteArray(); // Binary format
         String path = PREFIX + ledgerName;
-        store.put(path, serializedMlInfo, Optional.of(stat.getVersion()))
+        store.put(path, compressLedgerInfo(mlInfo), Optional.of(stat.getVersion()))
                 .thenAcceptAsync(newVersion -> callback.operationComplete(null, newVersion), executor.chooseThread(ledgerName))
                 .exceptionally(ex -> {
                     executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
@@ -264,4 +292,88 @@ public class MetaStoreImpl implements MetaStore {
             return new MetaStoreException(t);
         }
     }
+
+    /**
+     * Compress ManagedLedgerInfo data.
+     *
+     * compression data structure
+     * [MAGIC_NUMBER](2) + [METADATA_SIZE](4) + [METADATA_PAYLOAD] + [MANAGED_LEDGER_INFO_PAYLOAD]
+      */
+    public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
+        if (compressionType == null || compressionType.equals(CompressionType.NONE)) {
+            return managedLedgerInfo.toByteArray();
+        }
+        ByteBuf metadataByteBuf = null;
+        ByteBuf encodeByteBuf = null;
+        try {
+            MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
+                    .newBuilder()
+                    .setCompressionType(compressionType)
+                    .setUncompressedSize(managedLedgerInfo.getSerializedSize())
+                    .build();
+            metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(
+                    mlInfoMetadata.getSerializedSize() + 6, mlInfoMetadata.getSerializedSize() + 6);
+            metadataByteBuf.writeShort(MAGIC_MANAGED_LEDGER_INFO_METADATA);
+            metadataByteBuf.writeInt(mlInfoMetadata.getSerializedSize());
+            metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray());
+
+            encodeByteBuf = getCompressionCodec().encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray()));
+
+            CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+            compositeByteBuf.addComponent(true, metadataByteBuf);
+            compositeByteBuf.addComponent(true, encodeByteBuf);
+            byte[] dataBytes = new byte[compositeByteBuf.readableBytes()];
+            compositeByteBuf.readBytes(dataBytes);
+            return dataBytes;
+        } finally {
+            if (metadataByteBuf != null) {
+                metadataByteBuf.release();
+            }
+            if (encodeByteBuf != null) {
+                encodeByteBuf.release();
+            }
+        }
+    }
+
+    public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException {
+        ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
+        if (byteBuf.readableBytes() > 0 && byteBuf.readShort() == MAGIC_MANAGED_LEDGER_INFO_METADATA) {
+            ByteBuf decodeByteBuf = null;
+            try {
+                int metadataSize = byteBuf.readInt();
+                byte[] metadataBytes = new byte[metadataSize];
+                byteBuf.readBytes(metadataBytes);
+                MLDataFormats.ManagedLedgerInfoMetadata metadata =
+                        MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
+
+                long unpressedSize = metadata.getUncompressedSize();
+                decodeByteBuf = getCompressionCodec().decode(byteBuf, (int) unpressedSize);
+                byte[] decodeBytes;
+                // couldn't decode data by ZLIB compression byteBuf array() directly
+                if (decodeByteBuf.hasArray() && !CompressionType.ZLIB.equals(metadata.getCompressionType())) {
+                    decodeBytes = decodeByteBuf.array();
+                } else {
+                    decodeBytes = new byte[decodeByteBuf.readableBytes() - decodeByteBuf.readerIndex()];
+                    decodeByteBuf.readBytes(decodeBytes);
+                }
+                return ManagedLedgerInfo.parseFrom(decodeBytes);
+            } catch (Exception e) {
+                log.error("Failed to parse managedLedgerInfo metadata, "
+                        + "fall back to parse managedLedgerInfo directly.", e);
+                return ManagedLedgerInfo.parseFrom(data);
+            } finally {
+                if (decodeByteBuf != null) {
+                    decodeByteBuf.release();
+                }
+            }
+        } else {
+            return ManagedLedgerInfo.parseFrom(data);
+        }
+    }
+
+    private CompressionCodec getCompressionCodec() {
+        return CompressionCodecProvider.getCompressionCodec(
+                org.apache.pulsar.common.api.proto.CompressionType.valueOf(compressionType.name()));
+    }
+
 }
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto
index a5be8e4..a3528b6 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -124,3 +124,16 @@ message ManagedCursorInfo {
     // Store which index in the batch message has been deleted
     repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
 }
+
+enum CompressionType {
+    NONE   = 0;
+    LZ4    = 1;
+    ZLIB   = 2;
+    ZSTD   = 3;
+    SNAPPY   = 4;
+}
+
+message ManagedLedgerInfoMetadata {
+    required CompressionType compressionType = 1;
+    required int32 uncompressedSize = 2;
+}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java
new file mode 100644
index 0000000..2f27489
--- /dev/null
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * ManagedLedgerInfo metadata test.
+ */
+@Slf4j
+public class ManagedLedgerInfoMetadataTest {
+
+    @DataProvider(name = "compressionTypeProvider")
+    private Object[][] compressionTypeProvider() {
+        return new Object[][] {
+                {null},
+                {"INVALID_TYPE"},
+                {CompressionType.NONE.name()},
+                {CompressionType.LZ4.name()},
+                {CompressionType.ZLIB.name()},
+                {CompressionType.ZSTD.name()},
+                {CompressionType.SNAPPY.name()}
+        };
+    }
+
+    @Test(dataProvider = "compressionTypeProvider")
+    public void testEncodeAndDecode(String compressionType) throws IOException {
+        long ledgerId = 10000;
+        List<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerInfoList = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            MLDataFormats.ManagedLedgerInfo.LedgerInfo.Builder builder = MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder();
+            builder.setLedgerId(ledgerId);
+            builder.setEntries(RandomUtils.nextInt());
+            builder.setSize(RandomUtils.nextLong());
+            builder.setTimestamp(System.currentTimeMillis());
+
+            UUID uuid = UUID.randomUUID();
+            builder.getOffloadContextBuilder()
+                    .setUidMsb(uuid.getMostSignificantBits())
+                    .setUidLsb(uuid.getLeastSignificantBits());
+            Map<String, String> offloadDriverMetadata = new HashMap<>();
+            offloadDriverMetadata.put("bucket", "test-bucket");
+            offloadDriverMetadata.put("managedLedgerOffloadDriver", "pulsar-offload-dev");
+            offloadDriverMetadata.put("serviceEndpoint", "https://s3.eu-west-1.amazonaws.com");
+            offloadDriverMetadata.put("region", "eu-west-1");
+            OffloadUtils.setOffloadDriverMetadata(
+                    builder,
+                    "aws-s3",
+                    offloadDriverMetadata
+            );
+
+            MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = builder.build();
+            ledgerInfoList.add(ledgerInfo);
+            ledgerId ++;
+        }
+
+        MLDataFormats.ManagedLedgerInfo managedLedgerInfo = MLDataFormats.ManagedLedgerInfo.newBuilder()
+                .addAllLedgerInfo(ledgerInfoList)
+                .build();
+
+        MetaStoreImpl metaStore;
+        try {
+            metaStore = new MetaStoreImpl(null, null, compressionType);
+            if ("INVALID_TYPE".equals(compressionType)) {
+                Assert.fail("The managedLedgerInfo compression type is invalid, should fail.");
+            }
+        } catch (Exception e) {
+            if ("INVALID_TYPE".equals(compressionType)) {
+                Assert.assertEquals(e.getClass(), IllegalArgumentException.class);
+                Assert.assertEquals(
+                        "No enum constant org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType."
+                                + compressionType, e.getMessage());
+                return;
+            } else {
+                throw e;
+            }
+        }
+
+        byte[] compressionBytes = metaStore.compressLedgerInfo(managedLedgerInfo);
+        log.info("[{}] Uncompressed data size: {}, compressed data size: {}",
+                compressionType, managedLedgerInfo.getSerializedSize(), compressionBytes.length);
+        if (compressionType == null || compressionType.equals(CompressionType.NONE.name())) {
+            Assert.assertEquals(compressionBytes.length, managedLedgerInfo.getSerializedSize());
+        }
+
+        // parse compression data and unCompression data, check their results.
+        MLDataFormats.ManagedLedgerInfo info1 = metaStore.parseManagedLedgerInfo(compressionBytes);
+        MLDataFormats.ManagedLedgerInfo info2 = metaStore.parseManagedLedgerInfo(managedLedgerInfo.toByteArray());
+        Assert.assertEquals(info1, info2);
+    }
+
+    @Test
+    public void testParseEmptyData() throws InvalidProtocolBufferException {
+        MetaStoreImpl metaStore = new MetaStoreImpl(null, null);
+        MLDataFormats.ManagedLedgerInfo managedLedgerInfo = metaStore.parseManagedLedgerInfo(new byte[0]);
+        Assert.assertEquals(managedLedgerInfo.toString(), "");
+    }
+
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 7894214..e9926f4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1603,6 +1603,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private String managedLedgerDataReadPriority = OffloadedReadPriority.TIERED_STORAGE_FIRST
             .getValue();
 
+    @FieldContext(category = CATEGORY_STORAGE_ML,
+            doc = "ManagedLedgerInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY). \n"
+                    + "If value is invalid or NONE, then save the ManagedLedgerInfo bytes data directly.")
+    private String managedLedgerInfoCompressionType = "NONE";
+
     /*** --- Load balancer --- ****/
     @FieldContext(
             category = CATEGORY_LOAD_BALANCER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 3b684bf..431cb72 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -69,6 +69,7 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
                 conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
         managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());
         managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds());
+        managedLedgerFactoryConfig.setManagedLedgerInfoCompressionType(conf.getManagedLedgerInfoCompressionType());
 
         Configuration configuration = new ClientConfiguration();
         if (conf.isBookkeeperClientExposeStatsToPrometheus()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
new file mode 100644
index 0000000..626522d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * ManagedLedgerInfo compression configuration test.
+ */
+public class ManagedLedgerCompressionTest extends BrokerTestBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        conf.setManagedLedgerInfoCompressionType(CompressionType.NONE.name());
+        super.baseSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 1000 * 10)
+    public void testRestartBrokerEnableManagedLedgerInfoCompression() throws Exception {
+        String topic = newTopicName();
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("test")
+                .subscribe();
+
+        int messageCnt = 100;
+        for (int i = 0; i < messageCnt; i++) {
+            producer.newMessage().value("test".getBytes()).send();
+        }
+        for (int i = 0; i < messageCnt; i++) {
+            Message<byte[]> message = consumer.receive(1000, TimeUnit.SECONDS);
+            consumer.acknowledge(message);
+            Assert.assertNotNull(message);
+        }
+
+        stopBroker();
+        conf.setManagedLedgerInfoCompressionType(CompressionType.ZSTD.name());
+        startBroker();
+
+        for (int i = 0; i < messageCnt; i++) {
+            producer.newMessage().value("test".getBytes()).send();
+        }
+        for (int i = 0; i < messageCnt; i++) {
+            Message<byte[]> message = consumer.receive(1000, TimeUnit.SECONDS);
+            Assert.assertNotNull(message);
+            consumer.acknowledge(message);
+        }
+
+        stopBroker();
+        conf.setManagedLedgerInfoCompressionType("INVALID");
+        try {
+            startBroker();
+            Assert.fail("The managedLedgerInfo compression type is invalid, should fail.");
+        } catch (Exception e) {
+            Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class);
+            Assert.assertEquals(
+                    "No enum constant org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType.INVALID",
+                    e.getCause().getMessage());
+        }
+    }
+
+}