You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/05 01:48:38 UTC
[pulsar] 04/04: [ManagedLedger] Compress managed ledger info
(#11490)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b6d0934672d9b3892594df10efd7adbc00833e38
Author: ran <ga...@126.com>
AuthorDate: Thu Aug 5 06:38:16 2021 +0800
[ManagedLedger] Compress managed ledger info (#11490)
* compress managed ledger info
* 1. Move the `ManagedLedgerInfoMetadata` to `MLDataFormats.proto`.
2. Add configuration managedLedgerInfoCompressionType to control ManagedLedgerInfo compression type.
* use ByteBuf wrap bytes array, release ByteBuf if needed.
* make the compressionType as a final field
* fix comment
* 1. throw exception if using a invalid compression type.
2. change compression magic number.
3. add a unit test to verify compression could work well.
* change compression magic number.
* fix test
(cherry picked from commit 4361b6db563bf73232ecadafda245b33cc524aaa)
---
.../mledger/ManagedLedgerFactoryConfig.java | 6 +
.../mledger/impl/ManagedLedgerFactoryImpl.java | 2 +-
.../bookkeeper/mledger/impl/MetaStoreImpl.java | 118 ++++++++++++++++++-
managed-ledger/src/main/proto/MLDataFormats.proto | 13 +++
.../impl/ManagedLedgerInfoMetadataTest.java | 130 +++++++++++++++++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 5 +
.../pulsar/broker/ManagedLedgerClientFactory.java | 1 +
.../service/ManagedLedgerCompressionTest.java | 99 ++++++++++++++++
8 files changed, 370 insertions(+), 4 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index d2109ea..02ac4e9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger;
import lombok.Data;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.pulsar.common.api.proto.CompressionType;
/**
* Configuration for a {@link ManagedLedgerFactory}.
@@ -75,4 +76,9 @@ public class ManagedLedgerFactoryConfig {
* cluster name for prometheus stats
*/
private String clusterName;
+
+ /**
+ * ManagedLedgerInfo compression type. If the compression type is null or invalid, don't compress data.
+ */
+ private String managedLedgerInfoCompressionType = CompressionType.NONE.name();
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index b135577..16d577a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -172,7 +172,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
this.bookkeeperFactory = bookKeeperGroupFactory;
this.isBookkeeperManaged = isBookkeeperManaged;
this.metadataStore = metadataStore;
- this.store = new MetaStoreImpl(metadataStore, scheduledExecutor);
+ this.store = new MetaStoreImpl(metadataStore, scheduledExecutor, config.getManagedLedgerInfoCompressionType());
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index caa21a1..cfe3aaf 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -25,15 +25,23 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
@@ -47,9 +55,30 @@ public class MetaStoreImpl implements MetaStore {
private final MetadataStore store;
private final OrderedExecutor executor;
+ private static final int MAGIC_MANAGED_LEDGER_INFO_METADATA = 0x4778; // 0100 0111 0111 1000
+ private final CompressionType compressionType;
+
public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) {
this.store = store;
this.executor = executor;
+ this.compressionType = CompressionType.NONE;
+ }
+
+ public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String compressionType) {
+ this.store = store;
+ this.executor = executor;
+ CompressionType finalCompressionType;
+ if (compressionType != null) {
+ try {
+ finalCompressionType = CompressionType.valueOf(compressionType);
+ } catch (Exception e) {
+ log.error("Failed to get compression type {} error msg: {}.", compressionType, e.getMessage());
+ throw e;
+ }
+ } else {
+ finalCompressionType = CompressionType.NONE;
+ }
+ this.compressionType = finalCompressionType;
}
@Override
@@ -62,7 +91,7 @@ public class MetaStoreImpl implements MetaStore {
if (optResult.isPresent()) {
ManagedLedgerInfo info;
try {
- info = ManagedLedgerInfo.parseFrom(optResult.get().getValue());
+ info = parseManagedLedgerInfo(optResult.get().getValue());
info = updateMLInfoTimestamp(info);
callback.operationComplete(info, optResult.get().getStat());
} catch (InvalidProtocolBufferException e) {
@@ -101,9 +130,8 @@ public class MetaStoreImpl implements MetaStore {
log.debug("[{}] Updating metadata version={} with content={}", ledgerName, stat, mlInfo);
}
- byte[] serializedMlInfo = mlInfo.toByteArray(); // Binary format
String path = PREFIX + ledgerName;
- store.put(path, serializedMlInfo, Optional.of(stat.getVersion()))
+ store.put(path, compressLedgerInfo(mlInfo), Optional.of(stat.getVersion()))
.thenAcceptAsync(newVersion -> callback.operationComplete(null, newVersion), executor.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
@@ -264,4 +292,88 @@ public class MetaStoreImpl implements MetaStore {
return new MetaStoreException(t);
}
}
+
+ /**
+ * Compress ManagedLedgerInfo data.
+ *
+ * compression data structure
+ * [MAGIC_NUMBER](2) + [METADATA_SIZE](4) + [METADATA_PAYLOAD] + [MANAGED_LEDGER_INFO_PAYLOAD]
+ */
+ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
+ if (compressionType == null || compressionType.equals(CompressionType.NONE)) {
+ return managedLedgerInfo.toByteArray();
+ }
+ ByteBuf metadataByteBuf = null;
+ ByteBuf encodeByteBuf = null;
+ try {
+ MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
+ .newBuilder()
+ .setCompressionType(compressionType)
+ .setUncompressedSize(managedLedgerInfo.getSerializedSize())
+ .build();
+ metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(
+ mlInfoMetadata.getSerializedSize() + 6, mlInfoMetadata.getSerializedSize() + 6);
+ metadataByteBuf.writeShort(MAGIC_MANAGED_LEDGER_INFO_METADATA);
+ metadataByteBuf.writeInt(mlInfoMetadata.getSerializedSize());
+ metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray());
+
+ encodeByteBuf = getCompressionCodec().encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray()));
+
+ CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+ compositeByteBuf.addComponent(true, metadataByteBuf);
+ compositeByteBuf.addComponent(true, encodeByteBuf);
+ byte[] dataBytes = new byte[compositeByteBuf.readableBytes()];
+ compositeByteBuf.readBytes(dataBytes);
+ return dataBytes;
+ } finally {
+ if (metadataByteBuf != null) {
+ metadataByteBuf.release();
+ }
+ if (encodeByteBuf != null) {
+ encodeByteBuf.release();
+ }
+ }
+ }
+
+ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException {
+ ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
+ if (byteBuf.readableBytes() > 0 && byteBuf.readShort() == MAGIC_MANAGED_LEDGER_INFO_METADATA) {
+ ByteBuf decodeByteBuf = null;
+ try {
+ int metadataSize = byteBuf.readInt();
+ byte[] metadataBytes = new byte[metadataSize];
+ byteBuf.readBytes(metadataBytes);
+ MLDataFormats.ManagedLedgerInfoMetadata metadata =
+ MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
+
+ long unpressedSize = metadata.getUncompressedSize();
+ decodeByteBuf = getCompressionCodec().decode(byteBuf, (int) unpressedSize);
+ byte[] decodeBytes;
+ // couldn't decode data by ZLIB compression byteBuf array() directly
+ if (decodeByteBuf.hasArray() && !CompressionType.ZLIB.equals(metadata.getCompressionType())) {
+ decodeBytes = decodeByteBuf.array();
+ } else {
+ decodeBytes = new byte[decodeByteBuf.readableBytes() - decodeByteBuf.readerIndex()];
+ decodeByteBuf.readBytes(decodeBytes);
+ }
+ return ManagedLedgerInfo.parseFrom(decodeBytes);
+ } catch (Exception e) {
+ log.error("Failed to parse managedLedgerInfo metadata, "
+ + "fall back to parse managedLedgerInfo directly.", e);
+ return ManagedLedgerInfo.parseFrom(data);
+ } finally {
+ if (decodeByteBuf != null) {
+ decodeByteBuf.release();
+ }
+ }
+ } else {
+ return ManagedLedgerInfo.parseFrom(data);
+ }
+ }
+
+ private CompressionCodec getCompressionCodec() {
+ return CompressionCodecProvider.getCompressionCodec(
+ org.apache.pulsar.common.api.proto.CompressionType.valueOf(compressionType.name()));
+ }
+
}
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto
index a5be8e4..a3528b6 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -124,3 +124,16 @@ message ManagedCursorInfo {
// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
}
+
+enum CompressionType {
+ NONE = 0;
+ LZ4 = 1;
+ ZLIB = 2;
+ ZSTD = 3;
+ SNAPPY = 4;
+}
+
+message ManagedLedgerInfoMetadata {
+ required CompressionType compressionType = 1;
+ required int32 uncompressedSize = 2;
+}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java
new file mode 100644
index 0000000..2f27489
--- /dev/null
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * ManagedLedgerInfo metadata test.
+ */
+@Slf4j
+public class ManagedLedgerInfoMetadataTest {
+
+ @DataProvider(name = "compressionTypeProvider")
+ private Object[][] compressionTypeProvider() {
+ return new Object[][] {
+ {null},
+ {"INVALID_TYPE"},
+ {CompressionType.NONE.name()},
+ {CompressionType.LZ4.name()},
+ {CompressionType.ZLIB.name()},
+ {CompressionType.ZSTD.name()},
+ {CompressionType.SNAPPY.name()}
+ };
+ }
+
+ @Test(dataProvider = "compressionTypeProvider")
+ public void testEncodeAndDecode(String compressionType) throws IOException {
+ long ledgerId = 10000;
+ List<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerInfoList = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo.Builder builder = MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder();
+ builder.setLedgerId(ledgerId);
+ builder.setEntries(RandomUtils.nextInt());
+ builder.setSize(RandomUtils.nextLong());
+ builder.setTimestamp(System.currentTimeMillis());
+
+ UUID uuid = UUID.randomUUID();
+ builder.getOffloadContextBuilder()
+ .setUidMsb(uuid.getMostSignificantBits())
+ .setUidLsb(uuid.getLeastSignificantBits());
+ Map<String, String> offloadDriverMetadata = new HashMap<>();
+ offloadDriverMetadata.put("bucket", "test-bucket");
+ offloadDriverMetadata.put("managedLedgerOffloadDriver", "pulsar-offload-dev");
+ offloadDriverMetadata.put("serviceEndpoint", "https://s3.eu-west-1.amazonaws.com");
+ offloadDriverMetadata.put("region", "eu-west-1");
+ OffloadUtils.setOffloadDriverMetadata(
+ builder,
+ "aws-s3",
+ offloadDriverMetadata
+ );
+
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = builder.build();
+ ledgerInfoList.add(ledgerInfo);
+ ledgerId ++;
+ }
+
+ MLDataFormats.ManagedLedgerInfo managedLedgerInfo = MLDataFormats.ManagedLedgerInfo.newBuilder()
+ .addAllLedgerInfo(ledgerInfoList)
+ .build();
+
+ MetaStoreImpl metaStore;
+ try {
+ metaStore = new MetaStoreImpl(null, null, compressionType);
+ if ("INVALID_TYPE".equals(compressionType)) {
+ Assert.fail("The managedLedgerInfo compression type is invalid, should fail.");
+ }
+ } catch (Exception e) {
+ if ("INVALID_TYPE".equals(compressionType)) {
+ Assert.assertEquals(e.getClass(), IllegalArgumentException.class);
+ Assert.assertEquals(
+ "No enum constant org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType."
+ + compressionType, e.getMessage());
+ return;
+ } else {
+ throw e;
+ }
+ }
+
+ byte[] compressionBytes = metaStore.compressLedgerInfo(managedLedgerInfo);
+ log.info("[{}] Uncompressed data size: {}, compressed data size: {}",
+ compressionType, managedLedgerInfo.getSerializedSize(), compressionBytes.length);
+ if (compressionType == null || compressionType.equals(CompressionType.NONE.name())) {
+ Assert.assertEquals(compressionBytes.length, managedLedgerInfo.getSerializedSize());
+ }
+
+ // parse compression data and unCompression data, check their results.
+ MLDataFormats.ManagedLedgerInfo info1 = metaStore.parseManagedLedgerInfo(compressionBytes);
+ MLDataFormats.ManagedLedgerInfo info2 = metaStore.parseManagedLedgerInfo(managedLedgerInfo.toByteArray());
+ Assert.assertEquals(info1, info2);
+ }
+
+ @Test
+ public void testParseEmptyData() throws InvalidProtocolBufferException {
+ MetaStoreImpl metaStore = new MetaStoreImpl(null, null);
+ MLDataFormats.ManagedLedgerInfo managedLedgerInfo = metaStore.parseManagedLedgerInfo(new byte[0]);
+ Assert.assertEquals(managedLedgerInfo.toString(), "");
+ }
+
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index ebefd59..d8fcbd9 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1581,6 +1581,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
private String managedLedgerDataReadPriority = OffloadedReadPriority.TIERED_STORAGE_FIRST
.getValue();
+ @FieldContext(category = CATEGORY_STORAGE_ML,
+ doc = "ManagedLedgerInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY). \n"
+ + "If value is invalid or NONE, then save the ManagedLedgerInfo bytes data directly.")
+ private String managedLedgerInfoCompressionType = "NONE";
+
/*** --- Load balancer --- ****/
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 3b684bf..431cb72 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -69,6 +69,7 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());
managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds());
+ managedLedgerFactoryConfig.setManagedLedgerInfoCompressionType(conf.getManagedLedgerInfoCompressionType());
Configuration configuration = new ClientConfiguration();
if (conf.isBookkeeperClientExposeStatsToPrometheus()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
new file mode 100644
index 0000000..626522d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * ManagedLedgerInfo compression configuration test.
+ */
+public class ManagedLedgerCompressionTest extends BrokerTestBase {
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ conf.setManagedLedgerInfoCompressionType(CompressionType.NONE.name());
+ super.baseSetup();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 1000 * 10)
+ public void testRestartBrokerEnableManagedLedgerInfoCompression() throws Exception {
+ String topic = newTopicName();
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .create();
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("test")
+ .subscribe();
+
+ int messageCnt = 100;
+ for (int i = 0; i < messageCnt; i++) {
+ producer.newMessage().value("test".getBytes()).send();
+ }
+ for (int i = 0; i < messageCnt; i++) {
+ Message<byte[]> message = consumer.receive(1000, TimeUnit.SECONDS);
+ consumer.acknowledge(message);
+ Assert.assertNotNull(message);
+ }
+
+ stopBroker();
+ conf.setManagedLedgerInfoCompressionType(CompressionType.ZSTD.name());
+ startBroker();
+
+ for (int i = 0; i < messageCnt; i++) {
+ producer.newMessage().value("test".getBytes()).send();
+ }
+ for (int i = 0; i < messageCnt; i++) {
+ Message<byte[]> message = consumer.receive(1000, TimeUnit.SECONDS);
+ Assert.assertNotNull(message);
+ consumer.acknowledge(message);
+ }
+
+ stopBroker();
+ conf.setManagedLedgerInfoCompressionType("INVALID");
+ try {
+ startBroker();
+ Assert.fail("The managedLedgerInfo compression type is invalid, should fail.");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class);
+ Assert.assertEquals(
+ "No enum constant org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType.INVALID",
+ e.getCause().getMessage());
+ }
+ }
+
+}