You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/02 05:41:56 UTC

[pulsar] branch branch-2.10 updated: [PIP-146] ManagedCursorInfo compression (#14542)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 70c77948c60 [PIP-146] ManagedCursorInfo compression (#14542)
70c77948c60 is described below

commit 70c77948c6069469501eb272f0f8bc41864e2c18
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Tue Apr 19 17:18:25 2022 +0800

    [PIP-146] ManagedCursorInfo compression (#14542)
    
    Fixes #14529
    
    ### Motivation
    
    The cursor data is managed by ZooKeeper/etcd metadata store. When cursor data becomes more and more, the data size will increase and will take a lot of time to pull the data. Therefore, it is necessary to add compression for the cursor, which can reduce the size of data and reduce the time of pulling data.
    
    ### Modifications
    
    - Add a named `ManagedCursorInfoMetadata` message to `MLDataFormats.proto` for as compression metadata
    - Add the `managedCursorInfoCompressionType` to `org.apache.pulsar.broker.ServiceConfiguration` and `org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig`
    - This feature is the same as the implementation of ManagedLedgerInfo compression, so the code is optimized to avoid duplication
    
    (cherry picked from commit 43987336c28585021943af72d7b3a35ff8d95c19)
---
 .../mledger/ManagedLedgerFactoryConfig.java        |   5 +
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |   3 +-
 .../bookkeeper/mledger/impl/MetaStoreImpl.java     | 183 +++++++++++++--------
 managed-ledger/src/main/proto/MLDataFormats.proto  |   5 +
 .../impl/ManagedCursorInfoMetadataTest.java        |  96 +++++++++++
 .../impl/ManagedLedgerInfoMetadataTest.java        |  15 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |   5 +
 .../pulsar/broker/ManagedLedgerClientFactory.java  |   1 +
 8 files changed, 236 insertions(+), 77 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index a00c1616410..25fcb377e3e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -86,4 +86,9 @@ public class ManagedLedgerFactoryConfig {
      * ManagedLedgerInfo compression type. If the compression type is null or invalid, don't compress data.
      */
     private String managedLedgerInfoCompressionType = MLDataFormats.CompressionType.NONE.name();
+
+    /**
+     * ManagedCursorInfo compression type. If the compression type is null or invalid, don't compress data.
+     */
+    private String managedCursorInfoCompressionType = MLDataFormats.CompressionType.NONE.name();
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index f183d1ce841..5ba1dc14c44 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -184,7 +184,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         this.bookkeeperFactory = bookKeeperGroupFactory;
         this.isBookkeeperManaged = isBookkeeperManaged;
         this.metadataStore = metadataStore;
-        this.store = new MetaStoreImpl(metadataStore, scheduledExecutor, config.getManagedLedgerInfoCompressionType());
+        this.store = new MetaStoreImpl(metadataStore, scheduledExecutor, config.getManagedLedgerInfoCompressionType(),
+                config.getManagedCursorInfoCompressionType());
         this.config = config;
         this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
         this.entryCacheManager = new EntryCacheManager(this);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index 1c03c481ba4..3359b78679e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
 import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
@@ -55,30 +56,39 @@ public class MetaStoreImpl implements MetaStore {
     private final MetadataStore store;
     private final OrderedExecutor executor;
 
-    private static final int MAGIC_MANAGED_LEDGER_INFO_METADATA = 0x4778; // 0100 0111 0111 1000
-    private final CompressionType compressionType;
+    private static final int MAGIC_MANAGED_INFO_METADATA = 0x4778; // 0100 0111 0111 1000
+    private final CompressionType ledgerInfoCompressionType;
+    private final CompressionType cursorInfoCompressionType;
 
     public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) {
         this.store = store;
         this.executor = executor;
-        this.compressionType = CompressionType.NONE;
+        this.ledgerInfoCompressionType = CompressionType.NONE;
+        this.cursorInfoCompressionType = CompressionType.NONE;
     }
 
-    public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String compressionType) {
+    public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String ledgerInfoCompressionType,
+                         String cursorInfoCompressionType) {
         this.store = store;
         this.executor = executor;
-        CompressionType finalCompressionType;
-        if (compressionType != null) {
-            try {
-                finalCompressionType = CompressionType.valueOf(compressionType);
-            } catch (Exception e) {
-                log.error("Failed to get compression type {} error msg: {}.", compressionType, e.getMessage());
-                throw e;
-            }
-        } else {
-            finalCompressionType = CompressionType.NONE;
+        this.ledgerInfoCompressionType = parseCompressionType(ledgerInfoCompressionType);
+        this.cursorInfoCompressionType = parseCompressionType(cursorInfoCompressionType);
+    }
+
+    private CompressionType parseCompressionType(String value) {
+        if (StringUtils.isEmpty(value)) {
+            return CompressionType.NONE;
+        }
+
+        CompressionType compressionType;
+        try {
+            compressionType = CompressionType.valueOf(value);
+        } catch (Exception e) {
+            log.error("Failed to get compression type {} error msg: {}.", value, e.getMessage());
+            throw e;
         }
-        this.compressionType = finalCompressionType;
+
+        return compressionType;
     }
 
     @Override
@@ -185,7 +195,7 @@ public class MetaStoreImpl implements MetaStore {
                 .thenAcceptAsync(optRes -> {
                     if (optRes.isPresent()) {
                         try {
-                            ManagedCursorInfo info = ManagedCursorInfo.parseFrom(optRes.get().getValue());
+                            ManagedCursorInfo info = parseManagedCursorInfo(optRes.get().getValue());
                             callback.operationComplete(info, optRes.get().getStat());
                         } catch (InvalidProtocolBufferException e) {
                             callback.operationFailed(getException(e));
@@ -208,7 +218,7 @@ public class MetaStoreImpl implements MetaStore {
                 info.getCursorsLedgerId(), info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());
 
         String path = PREFIX + ledgerName + "/" + cursorName;
-        byte[] content = info.toByteArray(); // Binary format
+        byte[] content = compressCursorInfo(info);
 
         long expectedVersion;
 
@@ -322,32 +332,97 @@ public class MetaStoreImpl implements MetaStore {
         }
     }
 
+    public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
+        if (ledgerInfoCompressionType.equals(CompressionType.NONE)) {
+            return managedLedgerInfo.toByteArray();
+        }
+        MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
+                .newBuilder()
+                .setCompressionType(ledgerInfoCompressionType)
+                .setUncompressedSize(managedLedgerInfo.getSerializedSize())
+                .build();
+        return compressManagedInfo(managedLedgerInfo.toByteArray(), mlInfoMetadata.toByteArray(),
+                mlInfoMetadata.getSerializedSize(), ledgerInfoCompressionType);
+    }
+
+    public byte[] compressCursorInfo(ManagedCursorInfo managedCursorInfo) {
+        if (cursorInfoCompressionType.equals(CompressionType.NONE)) {
+            return managedCursorInfo.toByteArray();
+        }
+        MLDataFormats.ManagedCursorInfoMetadata metadata = MLDataFormats.ManagedCursorInfoMetadata
+                .newBuilder()
+                .setCompressionType(cursorInfoCompressionType)
+                .setUncompressedSize(managedCursorInfo.getSerializedSize())
+                .build();
+        return compressManagedInfo(managedCursorInfo.toByteArray(), metadata.toByteArray(),
+                metadata.getSerializedSize(), cursorInfoCompressionType);
+    }
+
+    public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException {
+        ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
+
+        byte[] metadataBytes = extractCompressMetadataBytes(byteBuf);
+        if (metadataBytes != null) {
+            try {
+                MLDataFormats.ManagedLedgerInfoMetadata metadata =
+                        MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
+                return ManagedLedgerInfo.parseFrom(getCompressionCodec(metadata.getCompressionType())
+                        .decode(byteBuf, metadata.getUncompressedSize()).nioBuffer());
+            } catch (Exception e) {
+                log.error("Failed to parse managedLedgerInfo metadata, "
+                        + "fall back to parse managedLedgerInfo directly.", e);
+                return ManagedLedgerInfo.parseFrom(data);
+            } finally {
+                byteBuf.release();
+            }
+        } else {
+            return ManagedLedgerInfo.parseFrom(data);
+        }
+    }
+
+    public ManagedCursorInfo parseManagedCursorInfo(byte[] data) throws InvalidProtocolBufferException {
+        ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
+
+        byte[] metadataBytes = extractCompressMetadataBytes(byteBuf);
+        if (metadataBytes != null) {
+            try {
+                MLDataFormats.ManagedCursorInfoMetadata metadata =
+                        MLDataFormats.ManagedCursorInfoMetadata.parseFrom(metadataBytes);
+                return ManagedCursorInfo.parseFrom(getCompressionCodec(metadata.getCompressionType())
+                        .decode(byteBuf, metadata.getUncompressedSize()).nioBuffer());
+            } catch (Exception e) {
+                log.error("Failed to parse ManagedCursorInfo metadata, "
+                        + "fall back to parse ManagedCursorInfo directly", e);
+                return ManagedCursorInfo.parseFrom(data);
+            } finally {
+                byteBuf.release();
+            }
+        } else {
+            return ManagedCursorInfo.parseFrom(data);
+        }
+    }
+
     /**
-     * Compress ManagedLedgerInfo data.
+     * Compress Managed Info data such as LedgerInfo, CursorInfo.
      *
      * compression data structure
      * [MAGIC_NUMBER](2) + [METADATA_SIZE](4) + [METADATA_PAYLOAD] + [MANAGED_LEDGER_INFO_PAYLOAD]
-      */
-    public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
+     */
+    private byte[] compressManagedInfo(byte[] info, byte[] metadata, int metadataSerializedSize,
+                                       MLDataFormats.CompressionType compressionType) {
         if (compressionType == null || compressionType.equals(CompressionType.NONE)) {
-            return managedLedgerInfo.toByteArray();
+            return info;
         }
         ByteBuf metadataByteBuf = null;
         ByteBuf encodeByteBuf = null;
         try {
-            MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
-                    .newBuilder()
-                    .setCompressionType(compressionType)
-                    .setUncompressedSize(managedLedgerInfo.getSerializedSize())
-                    .build();
-            metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(
-                    mlInfoMetadata.getSerializedSize() + 6, mlInfoMetadata.getSerializedSize() + 6);
-            metadataByteBuf.writeShort(MAGIC_MANAGED_LEDGER_INFO_METADATA);
-            metadataByteBuf.writeInt(mlInfoMetadata.getSerializedSize());
-            metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray());
-
+            metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(metadataSerializedSize + 6,
+                    metadataSerializedSize + 6);
+            metadataByteBuf.writeShort(MAGIC_MANAGED_INFO_METADATA);
+            metadataByteBuf.writeInt(metadataSerializedSize);
+            metadataByteBuf.writeBytes(metadata);
             encodeByteBuf = getCompressionCodec(compressionType)
-                    .encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray()));
+                    .encode(Unpooled.wrappedBuffer(info));
             CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
             compositeByteBuf.addComponent(true, metadataByteBuf);
             compositeByteBuf.addComponent(true, encodeByteBuf);
@@ -364,42 +439,14 @@ public class MetaStoreImpl implements MetaStore {
         }
     }
 
-    public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException {
-        ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
-        if (byteBuf.readableBytes() > 0 && byteBuf.readShort() == MAGIC_MANAGED_LEDGER_INFO_METADATA) {
-            ByteBuf decodeByteBuf = null;
-            try {
-                int metadataSize = byteBuf.readInt();
-                byte[] metadataBytes = new byte[metadataSize];
-                byteBuf.readBytes(metadataBytes);
-                MLDataFormats.ManagedLedgerInfoMetadata metadata =
-                        MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
-
-                long unpressedSize = metadata.getUncompressedSize();
-                decodeByteBuf = getCompressionCodec(metadata.getCompressionType())
-                        .decode(byteBuf, (int) unpressedSize);
-                byte[] decodeBytes;
-                // couldn't decode data by ZLIB compression byteBuf array() directly
-                if (decodeByteBuf.hasArray() && !CompressionType.ZLIB.equals(metadata.getCompressionType())) {
-                    decodeBytes = decodeByteBuf.array();
-                } else {
-                    decodeBytes = new byte[decodeByteBuf.readableBytes() - decodeByteBuf.readerIndex()];
-                    decodeByteBuf.readBytes(decodeBytes);
-                }
-                return ManagedLedgerInfo.parseFrom(decodeBytes);
-            } catch (Exception e) {
-                log.error("Failed to parse managedLedgerInfo metadata, "
-                        + "fall back to parse managedLedgerInfo directly.", e);
-                return ManagedLedgerInfo.parseFrom(data);
-            } finally {
-                if (decodeByteBuf != null) {
-                    decodeByteBuf.release();
-                }
-                byteBuf.release();
-            }
-        } else {
-            return ManagedLedgerInfo.parseFrom(data);
+    private byte[] extractCompressMetadataBytes(ByteBuf data) {
+        if (data.readableBytes() > 0 && data.readShort() == MAGIC_MANAGED_INFO_METADATA) {
+            int metadataSize = data.readInt();
+            byte[] metadataBytes = new byte[metadataSize];
+            data.readBytes(metadataBytes);
+            return metadataBytes;
         }
+        return null;
     }
 
     private CompressionCodec getCompressionCodec(CompressionType compressionType) {
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto
index a3528b664e2..4671816c1a1 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -137,3 +137,8 @@ message ManagedLedgerInfoMetadata {
     required CompressionType compressionType = 1;
     required int32 uncompressedSize = 2;
 }
+
+message ManagedCursorInfoMetadata {
+    required CompressionType compressionType = 1;
+    required int32 uncompressedSize = 2;
+}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorInfoMetadataTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorInfoMetadataTest.java
new file mode 100644
index 00000000000..8b95876d0ae
--- /dev/null
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorInfoMetadataTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.testng.Assert.expectThrows;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * ManagedCursorInfo metadata test.
+ */
+@Slf4j
+public class ManagedCursorInfoMetadataTest {
+    private final String INVALID_TYPE = "INVALID_TYPE";
+
+    @DataProvider(name = "compressionTypeProvider")
+    private Object[][] compressionTypeProvider() {
+        return new Object[][]{
+                {null},
+                {INVALID_TYPE},
+                {CompressionType.NONE.name()},
+                {CompressionType.LZ4.name()},
+                {CompressionType.ZLIB.name()},
+                {CompressionType.ZSTD.name()},
+                {CompressionType.SNAPPY.name()}
+        };
+    }
+
+    @Test(dataProvider = "compressionTypeProvider")
+    public void testEncodeAndDecode(String compressionType) throws IOException {
+        long ledgerId = 10000;
+        MLDataFormats.ManagedCursorInfo.Builder builder = MLDataFormats.ManagedCursorInfo.newBuilder();
+
+        builder.setCursorsLedgerId(ledgerId);
+        builder.setMarkDeleteLedgerId(ledgerId);
+
+        List<MLDataFormats.BatchedEntryDeletionIndexInfo> batchedEntryDeletionIndexInfos = new ArrayList<>();
+        for (int i = 0; i < 1000; i++) {
+            MLDataFormats.NestedPositionInfo nestedPositionInfo = MLDataFormats.NestedPositionInfo.newBuilder()
+                    .setEntryId(i).setLedgerId(i).build();
+            MLDataFormats.BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = MLDataFormats
+                    .BatchedEntryDeletionIndexInfo.newBuilder().setPosition(nestedPositionInfo).build();
+            batchedEntryDeletionIndexInfos.add(batchedEntryDeletionIndexInfo);
+        }
+        builder.addAllBatchedEntryDeletionIndexInfo(batchedEntryDeletionIndexInfos);
+
+        MetaStoreImpl metaStore;
+        if (INVALID_TYPE.equals(compressionType)) {
+            IllegalArgumentException compressionTypeEx = expectThrows(IllegalArgumentException.class, () -> {
+                new MetaStoreImpl(null, null, null, compressionType);
+            });
+            assertEquals("No enum constant org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType."
+                    + compressionType, compressionTypeEx.getMessage());
+            return;
+        } else {
+            metaStore = new MetaStoreImpl(null, null, null, compressionType);
+        }
+
+        MLDataFormats.ManagedCursorInfo managedCursorInfo = builder.build();
+        byte[] compressionBytes = metaStore.compressCursorInfo(managedCursorInfo);
+        log.info("[{}] Uncompressed data size: {}, compressed data size: {}",
+                compressionType, managedCursorInfo.getSerializedSize(), compressionBytes.length);
+        if (compressionType == null || compressionType.equals(CompressionType.NONE.name())) {
+            Assert.assertEquals(compressionBytes.length, managedCursorInfo.getSerializedSize());
+        }
+
+        // parse compression data and unCompression data, check their results.
+        MLDataFormats.ManagedCursorInfo info1 = metaStore.parseManagedCursorInfo(compressionBytes);
+        MLDataFormats.ManagedCursorInfo info2 = metaStore.parseManagedCursorInfo(managedCursorInfo.toByteArray());
+        Assert.assertEquals(info1, info2);
+    }
+}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java
index 2f27489aeb9..91bc7f143a4 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java
@@ -19,6 +19,12 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.offload.OffloadUtils;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
@@ -28,13 +34,6 @@ import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
 /**
  * ManagedLedgerInfo metadata test.
  */
@@ -91,7 +90,7 @@ public class ManagedLedgerInfoMetadataTest {
 
         MetaStoreImpl metaStore;
         try {
-            metaStore = new MetaStoreImpl(null, null, compressionType);
+            metaStore = new MetaStoreImpl(null, null, compressionType, null);
             if ("INVALID_TYPE".equals(compressionType)) {
                 Assert.fail("The managedLedgerInfo compression type is invalid, should fail.");
             }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 466bf3df8e0..2eae2bac7d9 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1871,6 +1871,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
                     + "If value is invalid or NONE, then save the ManagedLedgerInfo bytes data directly.")
     private String managedLedgerInfoCompressionType = "NONE";
 
+    @FieldContext(category = CATEGORY_STORAGE_ML,
+            doc = "ManagedCursorInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY). \n"
+                    + "If value is NONE, then save the ManagedCursorInfo bytes data directly.")
+    private String managedCursorInfoCompressionType = "NONE";
+
     /*** --- Load balancer. --- ****/
     @FieldContext(
             category = CATEGORY_LOAD_BALANCER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index da45f32863a..f06679ece78 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -69,6 +69,7 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
         managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds());
         managedLedgerFactoryConfig.setManagedLedgerInfoCompressionType(conf.getManagedLedgerInfoCompressionType());
         managedLedgerFactoryConfig.setStatsPeriodSeconds(conf.getManagedLedgerStatsPeriodSeconds());
+        managedLedgerFactoryConfig.setManagedCursorInfoCompressionType(conf.getManagedCursorInfoCompressionType());
 
         Configuration configuration = new ClientConfiguration();
         if (conf.isBookkeeperClientExposeStatsToPrometheus()) {