You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/11 05:42:03 UTC
[rocketmq-clients] 01/02: Java: adapt to the latest protocol
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch java_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 5483046a5c2c54b8246c99fd7000a45aa79a1a0e
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Jul 11 11:47:53 2022 +0800
Java: adapt to the latest protocol
---
.../java/impl/producer/ProducerSettings.java | 17 ----------
.../client/java/message/PublishingMessageImpl.java | 37 ++--------------------
2 files changed, 2 insertions(+), 52 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
index 9f6f261..81e6e8b 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
@@ -41,15 +41,8 @@ public class ProducerSettings extends ClientSettings {
/**
* If message body size exceeds the threshold, it would be compressed for convenience of transport.
*/
- private volatile int compressBodyThresholdBytes = 4 * 1024;
private volatile int maxBodySizeBytes = 4 * 1024 * 1024;
-
private volatile boolean validateMessageType = true;
- /**
- * The default GZIP compression level for the message body.
- */
- @SuppressWarnings("FieldCanBeLocal")
- private final int messageGzipCompressionLevel = 5;
public ProducerSettings(String clientId, Endpoints accessPoint,
ExponentialBackoffRetryPolicy exponentialBackoffRetryPolicy, Duration requestTimeout, Set<Resource> topics) {
@@ -57,10 +50,6 @@ public class ProducerSettings extends ClientSettings {
this.topics = topics;
}
- public int getCompressBodyThresholdBytes() {
- return compressBodyThresholdBytes;
- }
-
public int getMaxBodySizeBytes() {
return maxBodySizeBytes;
}
@@ -69,10 +58,6 @@ public class ProducerSettings extends ClientSettings {
return validateMessageType;
}
- public int getMessageGzipCompressionLevel() {
- return messageGzipCompressionLevel;
- }
-
@Override
public Settings toProtobuf() {
final Publishing publishing = Publishing.newBuilder().addAllTopics(topics.stream().map(Resource::toProtobuf)
@@ -93,7 +78,6 @@ public class ProducerSettings extends ClientSettings {
return;
}
final Publishing publishing = settings.getPublishing();
- this.compressBodyThresholdBytes = publishing.getCompressBodyThreshold();
this.maxBodySizeBytes = publishing.getMaxBodySize();
this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
}
@@ -107,7 +91,6 @@ public class ProducerSettings extends ClientSettings {
.add("retryPolicy", retryPolicy)
.add("requestTimeout", requestTimeout)
.add("topics", topics)
- .add("compressBodyThresholdBytes", compressBodyThresholdBytes)
.add("maxBodySizeBytes", maxBodySizeBytes)
.toString();
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index b419531..72afd9a 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -22,25 +22,18 @@ import apache.rocketmq.v2.SystemProperties;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Optional;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.java.impl.producer.ProducerSettings;
import org.apache.rocketmq.client.java.message.protocol.Encoding;
import org.apache.rocketmq.client.java.misc.Utilities;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This class is a publishing view for message, which could be considered as an extension of {@link MessageImpl}.
* Specifically speaking, Some work has been brought forward, e.g. message body compression, message id generation, etc.
*/
public class PublishingMessageImpl extends MessageImpl {
- private static final Logger LOGGER = LoggerFactory.getLogger(PublishingMessageImpl.class);
-
- private final Encoding encoding;
- private final ByteBuffer compressedBody;
private final MessageId messageId;
private final MessageType messageType;
private volatile String traceContext;
@@ -54,28 +47,6 @@ public class PublishingMessageImpl extends MessageImpl {
if (length > maxBodySizeBytes) {
throw new IOException("Message body size exceeds the threshold, max size=" + maxBodySizeBytes + " bytes");
}
- // Message body length exceeds the compression threshold, try to compress it.
- if (length > producerSettings.getCompressBodyThresholdBytes()) {
- byte[] body;
- // Try downcasting to avoid redundant copy because ByteBuffer could not be compressed directly.
- if (message instanceof MessageImpl) {
- MessageImpl messageImpl = (MessageImpl) message;
- body = messageImpl.body;
- } else {
- // Failed to downcast, which is out of expectation.
- LOGGER.error("[Bug] message is not an instance of MessageImpl, have to copy it to compress");
- body = new byte[length];
- message.getBody().get(body);
- }
- final byte[] compressed = Utilities.compressBytesGzip(body,
- producerSettings.getMessageGzipCompressionLevel());
- this.compressedBody = ByteBuffer.wrap(compressed).asReadOnlyBuffer();
- this.encoding = Encoding.GZIP;
- } else {
- // No need to compress message body.
- this.compressedBody = null;
- this.encoding = Encoding.IDENTITY;
- }
// Generate message id.
this.messageId = MessageIdCodec.getInstance().nextMessageId();
// Normal message.
@@ -112,10 +83,6 @@ public class PublishingMessageImpl extends MessageImpl {
return messageType;
}
- public ByteBuffer getTransportBody() {
- return null == compressedBody ? getBody() : compressedBody;
- }
-
public void setTraceContext(String traceContext) {
this.traceContext = traceContext;
}
@@ -142,7 +109,7 @@ public class PublishingMessageImpl extends MessageImpl {
// Born host
.setBornHost(Utilities.hostName())
// Body encoding
- .setBodyEncoding(Encoding.toProtobuf(encoding))
+ .setBodyEncoding(Encoding.toProtobuf(Encoding.IDENTITY))
// Message type
.setMessageType(MessageType.toProtobuf(messageType));
// Message tag
@@ -160,7 +127,7 @@ public class PublishingMessageImpl extends MessageImpl {
// Topic
.setTopic(topicResource)
// Message body
- .setBody(ByteString.copyFrom(getTransportBody()))
+ .setBody(ByteString.copyFrom(getBody()))
// System properties
.setSystemProperties(systemProperties)
// User properties