You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/05/15 08:59:29 UTC

[GitHub] vongosling closed pull request #294: [ISSUE #66] duplicate compress message body if retry to send msg when…

vongosling closed pull request #294: [ISSUE #66] duplicate compress message body if retry to send msg when…
URL: https://github.com/apache/rocketmq/pull/294
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index b07778499..d4ed1ec4d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -27,6 +27,7 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 9dd8ee3ce..81461f5e9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -607,8 +607,10 @@ private SendResult sendKernelImpl(final Message msg,
                 }
 
                 int sysFlag = 0;
+                boolean msgBodyCompressed = false;
                 if (this.tryToCompressMessage(msg)) {
                     sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
+                    msgBodyCompressed = true;
                 }
 
                 final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
@@ -678,10 +680,19 @@ private SendResult sendKernelImpl(final Message msg,
                 SendResult sendResult = null;
                 switch (communicationMode) {
                     case ASYNC:
+                        Message tmpMessage = msg;
+                        if (msgBodyCompressed) {
+                            //If msg body was compressed, msgbody should be reset using prevBody.
+                            //Clone new message using commpressed message body and recover origin massage.
+                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
+                            tmpMessage = MessageAccessor.cloneMessage(msg);
+                            msg.setBody(prevBody);
+                        }
+
                         sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                             brokerAddr,
                             mq.getBrokerName(),
-                            msg,
+                            tmpMessage,
                             requestHeader,
                             timeout,
                             communicationMode,
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index ded22ada9..d3c6cc8b1 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -69,6 +69,7 @@
     private DefaultMQProducer producer;
     private Message message;
     private Message zeroMsg;
+    private Message bigMessage;
     private String topic = "FooBar";
     private String producerGroupPrefix = "FooBar_PID";
 
@@ -77,8 +78,10 @@ public void init() throws Exception {
         String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
         producer = new DefaultMQProducer(producerGroupTemp);
         producer.setNamesrvAddr("127.0.0.1:9876");
+        producer.setCompressMsgBodyOverHowmuch(16);
         message = new Message(topic, new byte[] {'a'});
         zeroMsg = new Message(topic, new byte[] {});
+        bigMessage = new Message(topic, "This is a very huge message!".getBytes());
 
         producer.start();
 
@@ -146,6 +149,52 @@ public void testSendMessageSync_Success() throws RemotingException, InterruptedE
         assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
     }
 
+    @Test
+    public void testSendMessageSync_WithBodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        SendResult sendResult = producer.send(bigMessage);
+
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+        assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+    }
+
+    @Test
+    public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        producer.send(message, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+                assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+                assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+            }
+
+            @Override
+            public void onException(Throwable e) {
+            }
+        });
+
+    }
+
+    @Test
+    public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        producer.send(bigMessage, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+                assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+                assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+            }
+
+            @Override
+            public void onException(Throwable e) {
+            }
+        });
+
+    }
+
     @Test
     public void testSendMessageSync_SuccessWithHook() throws Throwable {
         when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
index 4cac404b9..1b7e2bba3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
@@ -89,4 +89,11 @@ public static String getConsumeStartTimeStamp(final Message msg) {
         return msg.getProperty(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP);
     }
 
+    public static Message cloneMessage(final Message msg) {
+        Message newMsg = new Message(msg.getTopic(), msg.getBody());
+        newMsg.setFlag(msg.getFlag());
+        newMsg.setProperties(msg.getProperties());
+        return newMsg;
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services