You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/06 01:45:41 UTC
[pulsar] 04/04: [Patch] Fix decode compression managedLedgerInfo
data (#11569)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9268cb121503fb22132a36cc0108469f00e3bc08
Author: ran <ga...@126.com>
AuthorDate: Fri Aug 6 05:13:35 2021 +0800
[Patch] Fix decode compression managedLedgerInfo data (#11569)
* unify using the class `MLDataFormats.CompressionType` in package `managed-ledger`.
* fix compression data decode error
(cherry picked from commit f008777b8d32a58b9ce9d9ad6299ec3a0b181d27)
---
.../mledger/ManagedLedgerFactoryConfig.java | 4 +-
.../bookkeeper/mledger/impl/MetaStoreImpl.java | 8 ++--
.../service/ManagedLedgerCompressionTest.java | 47 +++++++++++++---------
3 files changed, 34 insertions(+), 25 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index 02ac4e9..ef92957 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -21,7 +21,7 @@ package org.apache.bookkeeper.mledger;
import lombok.Data;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
-import org.apache.pulsar.common.api.proto.CompressionType;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
/**
* Configuration for a {@link ManagedLedgerFactory}.
@@ -80,5 +80,5 @@ public class ManagedLedgerFactoryConfig {
/**
* ManagedLedgerInfo compression type. If the compression type is null or invalid, don't compress data.
*/
- private String managedLedgerInfoCompressionType = CompressionType.NONE.name();
+ private String managedLedgerInfoCompressionType = MLDataFormats.CompressionType.NONE.name();
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index cfe3aaf..9558d11 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -317,7 +317,8 @@ public class MetaStoreImpl implements MetaStore {
metadataByteBuf.writeInt(mlInfoMetadata.getSerializedSize());
metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray());
- encodeByteBuf = getCompressionCodec().encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray()));
+ encodeByteBuf = getCompressionCodec(compressionType)
+ .encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray()));
CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponent(true, metadataByteBuf);
@@ -347,7 +348,8 @@ public class MetaStoreImpl implements MetaStore {
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
long unpressedSize = metadata.getUncompressedSize();
- decodeByteBuf = getCompressionCodec().decode(byteBuf, (int) unpressedSize);
+ decodeByteBuf = getCompressionCodec(metadata.getCompressionType())
+ .decode(byteBuf, (int) unpressedSize);
byte[] decodeBytes;
// couldn't decode data by ZLIB compression byteBuf array() directly
if (decodeByteBuf.hasArray() && !CompressionType.ZLIB.equals(metadata.getCompressionType())) {
@@ -371,7 +373,7 @@ public class MetaStoreImpl implements MetaStore {
}
}
- private CompressionCodec getCompressionCodec() {
+ private CompressionCodec getCompressionCodec(CompressionType compressionType) {
return CompressionCodecProvider.getCompressionCodec(
org.apache.pulsar.common.api.proto.CompressionType.valueOf(compressionType.name()));
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
index 626522d..9a3bc2f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
@@ -20,10 +20,11 @@ package org.apache.pulsar.broker.service;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
-import org.apache.pulsar.client.api.CompressionType;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -37,7 +38,7 @@ public class ManagedLedgerCompressionTest extends BrokerTestBase {
@BeforeClass
@Override
protected void setup() throws Exception {
- conf.setManagedLedgerInfoCompressionType(CompressionType.NONE.name());
+ conf.setManagedLedgerInfoCompressionType(MLDataFormats.CompressionType.NONE.name());
super.baseSetup();
}
@@ -47,7 +48,7 @@ public class ManagedLedgerCompressionTest extends BrokerTestBase {
super.internalCleanup();
}
- @Test(timeOut = 1000 * 10)
+ @Test(timeOut = 1000 * 20)
public void testRestartBrokerEnableManagedLedgerInfoCompression() throws Exception {
String topic = newTopicName();
@Cleanup
@@ -61,27 +62,17 @@ public class ManagedLedgerCompressionTest extends BrokerTestBase {
.subscribe();
int messageCnt = 100;
- for (int i = 0; i < messageCnt; i++) {
- producer.newMessage().value("test".getBytes()).send();
- }
- for (int i = 0; i < messageCnt; i++) {
- Message<byte[]> message = consumer.receive(1000, TimeUnit.SECONDS);
- consumer.acknowledge(message);
- Assert.assertNotNull(message);
- }
+ produceAndConsume(producer, consumer, messageCnt);
stopBroker();
- conf.setManagedLedgerInfoCompressionType(CompressionType.ZSTD.name());
+ conf.setManagedLedgerInfoCompressionType(MLDataFormats.CompressionType.ZSTD.name());
startBroker();
+ produceAndConsume(producer, consumer, messageCnt);
- for (int i = 0; i < messageCnt; i++) {
- producer.newMessage().value("test".getBytes()).send();
- }
- for (int i = 0; i < messageCnt; i++) {
- Message<byte[]> message = consumer.receive(1000, TimeUnit.SECONDS);
- Assert.assertNotNull(message);
- consumer.acknowledge(message);
- }
+ stopBroker();
+ conf.setManagedLedgerInfoCompressionType(MLDataFormats.CompressionType.LZ4.name());
+ startBroker();
+ produceAndConsume(producer, consumer, messageCnt);
stopBroker();
conf.setManagedLedgerInfoCompressionType("INVALID");
@@ -94,6 +85,22 @@ public class ManagedLedgerCompressionTest extends BrokerTestBase {
"No enum constant org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType.INVALID",
e.getCause().getMessage());
}
+
+ conf.setManagedLedgerInfoCompressionType(MLDataFormats.CompressionType.NONE.name());
+ startBroker();
+ produceAndConsume(producer, consumer, messageCnt);
+ }
+
+ private void produceAndConsume(Producer<byte[]> producer,
+ Consumer<byte[]> consumer, int messageCnt) throws PulsarClientException {
+ for (int i = 0; i < messageCnt; i++) {
+ producer.newMessage().value("test".getBytes()).send();
+ }
+ for (int i = 0; i < messageCnt; i++) {
+ Message<byte[]> message = consumer.receive(1000, TimeUnit.SECONDS);
+ consumer.acknowledge(message);
+ Assert.assertNotNull(message);
+ }
}
}