You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/11 05:42:03 UTC

[rocketmq-clients] 01/02: Java: adapt to the latest protocol

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch java_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 5483046a5c2c54b8246c99fd7000a45aa79a1a0e
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Jul 11 11:47:53 2022 +0800

    Java: adapt to the latest protocol
---
 .../java/impl/producer/ProducerSettings.java       | 17 ----------
 .../client/java/message/PublishingMessageImpl.java | 37 ++--------------------
 2 files changed, 2 insertions(+), 52 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
index 9f6f261..81e6e8b 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
@@ -41,15 +41,8 @@ public class ProducerSettings extends ClientSettings {
     /**
      * If message body size exceeds the threshold, it would be compressed for convenience of transport.
      */
-    private volatile int compressBodyThresholdBytes = 4 * 1024;
     private volatile int maxBodySizeBytes = 4 * 1024 * 1024;
-
     private volatile boolean validateMessageType = true;
-    /**
-     * The default GZIP compression level for the message body.
-     */
-    @SuppressWarnings("FieldCanBeLocal")
-    private final int messageGzipCompressionLevel = 5;
 
     public ProducerSettings(String clientId, Endpoints accessPoint,
         ExponentialBackoffRetryPolicy exponentialBackoffRetryPolicy, Duration requestTimeout, Set<Resource> topics) {
@@ -57,10 +50,6 @@ public class ProducerSettings extends ClientSettings {
         this.topics = topics;
     }
 
-    public int getCompressBodyThresholdBytes() {
-        return compressBodyThresholdBytes;
-    }
-
     public int getMaxBodySizeBytes() {
         return maxBodySizeBytes;
     }
@@ -69,10 +58,6 @@ public class ProducerSettings extends ClientSettings {
         return validateMessageType;
     }
 
-    public int getMessageGzipCompressionLevel() {
-        return messageGzipCompressionLevel;
-    }
-
     @Override
     public Settings toProtobuf() {
         final Publishing publishing = Publishing.newBuilder().addAllTopics(topics.stream().map(Resource::toProtobuf)
@@ -93,7 +78,6 @@ public class ProducerSettings extends ClientSettings {
             return;
         }
         final Publishing publishing = settings.getPublishing();
-        this.compressBodyThresholdBytes = publishing.getCompressBodyThreshold();
         this.maxBodySizeBytes = publishing.getMaxBodySize();
         this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
     }
@@ -107,7 +91,6 @@ public class ProducerSettings extends ClientSettings {
             .add("retryPolicy", retryPolicy)
             .add("requestTimeout", requestTimeout)
             .add("topics", topics)
-            .add("compressBodyThresholdBytes", compressBodyThresholdBytes)
             .add("maxBodySizeBytes", maxBodySizeBytes)
             .toString();
     }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index b419531..72afd9a 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -22,25 +22,18 @@ import apache.rocketmq.v2.SystemProperties;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.util.Timestamps;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Optional;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.java.impl.producer.ProducerSettings;
 import org.apache.rocketmq.client.java.message.protocol.Encoding;
 import org.apache.rocketmq.client.java.misc.Utilities;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This class is a publishing view for message, which could be considered as an extension of {@link MessageImpl}.
  * Specifically speaking, Some work has been brought forward, e.g. message body compression, message id generation, etc.
  */
 public class PublishingMessageImpl extends MessageImpl {
-    private static final Logger LOGGER = LoggerFactory.getLogger(PublishingMessageImpl.class);
-
-    private final Encoding encoding;
-    private final ByteBuffer compressedBody;
     private final MessageId messageId;
     private final MessageType messageType;
     private volatile String traceContext;
@@ -54,28 +47,6 @@ public class PublishingMessageImpl extends MessageImpl {
         if (length > maxBodySizeBytes) {
             throw new IOException("Message body size exceeds the threshold, max size=" + maxBodySizeBytes + " bytes");
         }
-        // Message body length exceeds the compression threshold, try to compress it.
-        if (length > producerSettings.getCompressBodyThresholdBytes()) {
-            byte[] body;
-            // Try downcasting to avoid redundant copy because ByteBuffer could not be compressed directly.
-            if (message instanceof MessageImpl) {
-                MessageImpl messageImpl = (MessageImpl) message;
-                body = messageImpl.body;
-            } else {
-                // Failed to downcast, which is out of expectation.
-                LOGGER.error("[Bug] message is not an instance of MessageImpl, have to copy it to compress");
-                body = new byte[length];
-                message.getBody().get(body);
-            }
-            final byte[] compressed = Utilities.compressBytesGzip(body,
-                producerSettings.getMessageGzipCompressionLevel());
-            this.compressedBody = ByteBuffer.wrap(compressed).asReadOnlyBuffer();
-            this.encoding = Encoding.GZIP;
-        } else {
-            // No need to compress message body.
-            this.compressedBody = null;
-            this.encoding = Encoding.IDENTITY;
-        }
         // Generate message id.
         this.messageId = MessageIdCodec.getInstance().nextMessageId();
         // Normal message.
@@ -112,10 +83,6 @@ public class PublishingMessageImpl extends MessageImpl {
         return messageType;
     }
 
-    public ByteBuffer getTransportBody() {
-        return null == compressedBody ? getBody() : compressedBody;
-    }
-
     public void setTraceContext(String traceContext) {
         this.traceContext = traceContext;
     }
@@ -142,7 +109,7 @@ public class PublishingMessageImpl extends MessageImpl {
                 // Born host
                 .setBornHost(Utilities.hostName())
                 // Body encoding
-                .setBodyEncoding(Encoding.toProtobuf(encoding))
+                .setBodyEncoding(Encoding.toProtobuf(Encoding.IDENTITY))
                 // Message type
                 .setMessageType(MessageType.toProtobuf(messageType));
         // Message tag
@@ -160,7 +127,7 @@ public class PublishingMessageImpl extends MessageImpl {
             // Topic
             .setTopic(topicResource)
             // Message body
-            .setBody(ByteString.copyFrom(getTransportBody()))
+            .setBody(ByteString.copyFrom(getBody()))
             // System properties
             .setSystemProperties(systemProperties)
             // User properties