You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/11 09:07:28 UTC

[GitHub] [pulsar] nodece commented on a diff in pull request #14542: [PIP-146] ManagedCursorInfo compression

nodece commented on code in PR #14542:
URL: https://github.com/apache/pulsar/pull/14542#discussion_r847106183


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java:
##########
@@ -322,32 +334,99 @@ private static MetaStoreException getException(Throwable t) {
         }
     }
 
+    public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
+        if (ledgerInfoCompressionType.equals(CompressionType.NONE)) {
+            return managedLedgerInfo.toByteArray();
+        }
+        MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
+                .newBuilder()
+                .setCompressionType(ledgerInfoCompressionType)
+                .setUncompressedSize(managedLedgerInfo.getSerializedSize())
+                .build();
+        return compressManagedInfo(managedLedgerInfo.toByteArray(), mlInfoMetadata.toByteArray(),
+                mlInfoMetadata.getSerializedSize(), ledgerInfoCompressionType);
+    }
+
+    public byte[] compressCursorInfo(ManagedCursorInfo managedCursorInfo) {
+        if (cursorInfoCompressionType.equals(CompressionType.NONE)) {
+            return managedCursorInfo.toByteArray();
+        }
+        MLDataFormats.ManagedCursorInfoMetadata metadata = MLDataFormats.ManagedCursorInfoMetadata
+                .newBuilder()
+                .setCompressionType(cursorInfoCompressionType)
+                .setUncompressedSize(managedCursorInfo.getSerializedSize())
+                .build();
+        return compressManagedInfo(managedCursorInfo.toByteArray(), metadata.toByteArray(),
+                metadata.getSerializedSize(), cursorInfoCompressionType);
+    }
+
+    public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException {
+        ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
+
+        byte[] metadataBytes = extractCompressMetadataBytes(byteBuf);
+        if (metadataBytes != null) {
+        try {
+                MLDataFormats.ManagedLedgerInfoMetadata metadata =
+                        MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
+                byte[] decodeBytes =
+                        parseCompressedInfo(byteBuf, metadata.getCompressionType(), metadata.getUncompressedSize());
+                return ManagedLedgerInfo.parseFrom(decodeBytes);
+            } catch (Exception e) {
+                log.error("Failed to parse managedLedgerInfo metadata, "
+                        + "fall back to parse managedLedgerInfo directly.", e);
+                return ManagedLedgerInfo.parseFrom(data);
+            } finally {
+                byteBuf.release();
+            }
+        } else {
+            return ManagedLedgerInfo.parseFrom(data);
+        }
+    }
+
+    public ManagedCursorInfo parseManagedCursorInfo(byte[] data) throws InvalidProtocolBufferException {
+        ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
+
+        byte[] metadataBytes = extractCompressMetadataBytes(byteBuf);
+        if (metadataBytes != null) {
+            try {
+                MLDataFormats.ManagedCursorInfoMetadata metadata =
+                        MLDataFormats.ManagedCursorInfoMetadata.parseFrom(metadataBytes);
+                byte[] decodeBytes =
+                        parseCompressedInfo(byteBuf, metadata.getCompressionType(), metadata.getUncompressedSize());
+                return ManagedCursorInfo.parseFrom(decodeBytes);

Review Comment:
   This ByteBuf is provided by netty, not java.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java:
##########
@@ -322,32 +334,99 @@ private static MetaStoreException getException(Throwable t) {
         }
     }
 
+    public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
+        if (ledgerInfoCompressionType.equals(CompressionType.NONE)) {
+            return managedLedgerInfo.toByteArray();
+        }
+        MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
+                .newBuilder()
+                .setCompressionType(ledgerInfoCompressionType)
+                .setUncompressedSize(managedLedgerInfo.getSerializedSize())
+                .build();
+        return compressManagedInfo(managedLedgerInfo.toByteArray(), mlInfoMetadata.toByteArray(),
+                mlInfoMetadata.getSerializedSize(), ledgerInfoCompressionType);
+    }
+
+    public byte[] compressCursorInfo(ManagedCursorInfo managedCursorInfo) {
+        if (cursorInfoCompressionType.equals(CompressionType.NONE)) {
+            return managedCursorInfo.toByteArray();
+        }
+        MLDataFormats.ManagedCursorInfoMetadata metadata = MLDataFormats.ManagedCursorInfoMetadata
+                .newBuilder()
+                .setCompressionType(cursorInfoCompressionType)
+                .setUncompressedSize(managedCursorInfo.getSerializedSize())
+                .build();
+        return compressManagedInfo(managedCursorInfo.toByteArray(), metadata.toByteArray(),
+                metadata.getSerializedSize(), cursorInfoCompressionType);
+    }
+
+    public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException {
+        ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
+
+        byte[] metadataBytes = extractCompressMetadataBytes(byteBuf);
+        if (metadataBytes != null) {
+        try {
+                MLDataFormats.ManagedLedgerInfoMetadata metadata =
+                        MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
+                byte[] decodeBytes =
+                        parseCompressedInfo(byteBuf, metadata.getCompressionType(), metadata.getUncompressedSize());
+                return ManagedLedgerInfo.parseFrom(decodeBytes);
+            } catch (Exception e) {
+                log.error("Failed to parse managedLedgerInfo metadata, "
+                        + "fall back to parse managedLedgerInfo directly.", e);
+                return ManagedLedgerInfo.parseFrom(data);
+            } finally {
+                byteBuf.release();
+            }
+        } else {
+            return ManagedLedgerInfo.parseFrom(data);
+        }
+    }
+
+    public ManagedCursorInfo parseManagedCursorInfo(byte[] data) throws InvalidProtocolBufferException {
+        ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
+
+        byte[] metadataBytes = extractCompressMetadataBytes(byteBuf);
+        if (metadataBytes != null) {
+            try {
+                MLDataFormats.ManagedCursorInfoMetadata metadata =
+                        MLDataFormats.ManagedCursorInfoMetadata.parseFrom(metadataBytes);
+                byte[] decodeBytes =
+                        parseCompressedInfo(byteBuf, metadata.getCompressionType(), metadata.getUncompressedSize());
+                return ManagedCursorInfo.parseFrom(decodeBytes);

Review Comment:
   @codelipenghui 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org