You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/01/06 10:00:11 UTC

[GitHub] [rocketmq] guyinyou opened a new pull request #3718: [ISSUE #3717][RIP-27] Auto batch in producer

guyinyou opened a new pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718


   **Make sure set the target branch to `develop`**
   
   ## What is the purpose of the change
   
   #3717 
   
   ## Brief changelog
   
   Realize the autobatch function in the producer.
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] guyinyou commented on a change in pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
guyinyou commented on a change in pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#discussion_r780869680



##########
File path: client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class ProduceAccumulator {
+    // totalHoldSize normal value
+    private long totalHoldSize = 32 * 1024 * 1024;
+    // holdSize normal value
+    private long holdSize = 32 * 1024;
+    // holdMs normal value
+    private int holdMs = 10;
+    private static final InternalLogger log = ClientLogger.getLog();
+    private final GuardForSyncSendService guardThreadForSyncSend;
+    private final GuardForAsyncSendService guardThreadForAsyncSend;
+    private Map<AggregateKey, MessageAccumulation> syncSendBatchs = new ConcurrentHashMap<>();
+    private Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new ConcurrentHashMap<>();
+    private AtomicLong currentlyHoldSize = new AtomicLong(0);
+    private final String instanceName;
+
+    public ProduceAccumulator(String instanceName) {
+        this.instanceName = instanceName;
+        this.guardThreadForSyncSend = new GuardForSyncSendService(this.instanceName);
+        this.guardThreadForAsyncSend = new GuardForAsyncSendService(this.instanceName);
+    }
+
+    class GuardForSyncSendService extends ServiceThread {
+        private final String serviceName;
+
+        public GuardForSyncSendService(String clientInstanceName) {
+            serviceName = String.format("Client_%s_GuardForSyncSend", clientInstanceName);
+        }
+
+        @Override public String getServiceName() {
+            return serviceName;
+        }
+
+        @Override public void run() {
+            ProduceAccumulator.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.doWork();
+                } catch (Exception e) {
+                    ProduceAccumulator.log.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            ProduceAccumulator.log.info(this.getServiceName() + " service end");
+        }
+
+        private void doWork() throws Exception {
+            Collection<MessageAccumulation> values = syncSendBatchs.values();
+            final int sleepTime = Math.max(1, holdMs / 2);
+            values.parallelStream().forEach((v) -> {
+                v.wakeup();
+                synchronized (v) {
+                    synchronized (v.closed) {
+                        if (v.messagesSize.get() == 0) {
+                            v.closed.set(true);
+                            syncSendBatchs.remove(v.aggregateKey, v);
+                        } else {
+                            v.notify();
+                        }
+                    }
+                }
+            });
+            Thread.sleep(sleepTime);
+        }
+    }
+
+    class GuardForAsyncSendService extends ServiceThread {
+        private final String serviceName;
+
+        public GuardForAsyncSendService(String clientInstanceName) {
+            serviceName = String.format("Client_%s_GuardForAsyncSend", clientInstanceName);
+        }
+
+        @Override public String getServiceName() {
+            return serviceName;
+        }
+
+        @Override public void run() {
+            ProduceAccumulator.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.doWork();
+                } catch (Exception e) {
+                    ProduceAccumulator.log.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            ProduceAccumulator.log.info(this.getServiceName() + " service end");
+        }
+
+        private void doWork() throws Exception {
+            Collection<MessageAccumulation> values = asyncSendBatchs.values();
+            final int sleepTime = Math.max(1, holdMs / 2);
+            values.parallelStream().forEach((v) -> {
+                if (v.readyToSend()) {
+                    v.send(null);
+                }
+                synchronized (v.closed) {
+                    if (v.messagesSize.get() == 0) {
+                        v.closed.set(true);
+                        asyncSendBatchs.remove(v.aggregateKey, v);
+                    }
+                }
+            });
+            Thread.sleep(sleepTime);
+        }
+    }
+
+    public void start() {
+        guardThreadForSyncSend.start();
+        guardThreadForAsyncSend.start();
+    }
+
+    public void shutdown() {
+        guardThreadForSyncSend.shutdown();
+        guardThreadForAsyncSend.shutdown();
+    }
+
+    public int getBatchMaxDelayMs() {
+        return holdMs;
+    }
+
+    public void batchMaxDelayMs(int holdMs) {
+        if (holdMs <= 0 || holdMs > 30 * 1000) {
+            throw new IllegalArgumentException(String.format("batchMaxDelayMs expect between 1ms and 30s, but get %d!", holdMs));
+        }
+        this.holdMs = holdMs;
+    }
+
+    public long getBatchMaxBytes() {
+        return holdSize;
+    }
+
+    public void batchMaxBytes(long holdSize) {
+        if (holdSize <= 0 || holdSize > 2 * 1024 * 1024) {
+            throw new IllegalArgumentException(String.format("batchMaxBytes expect between 1B and 2MB, but get %d!", holdSize));
+        }
+        this.holdSize = holdSize;
+    }
+
+    public long getTotalBatchMaxBytes() {
+        return holdSize;
+    }
+
+    public void totalBatchMaxBytes(long totalHoldSize) {
+        if (totalHoldSize <= 0) {
+            throw new IllegalArgumentException(String.format("totalBatchMaxBytes must bigger then 0, but get %d!", totalHoldSize));
+        }
+        this.totalHoldSize = totalHoldSize;
+    }
+
+    private MessageAccumulation getOrCreateSyncSendBatch(AggregateKey aggregateKey,
+        DefaultMQProducer defaultMQProducer) {
+        return syncSendBatchs.computeIfAbsent(aggregateKey, (key -> new MessageAccumulation(aggregateKey, defaultMQProducer)));
+    }
+
+    private MessageAccumulation getOrCreateAsyncSendBatch(AggregateKey aggregateKey,
+        DefaultMQProducer defaultMQProducer) {
+        return asyncSendBatchs.computeIfAbsent(aggregateKey, (key -> new MessageAccumulation(aggregateKey, defaultMQProducer)));
+    }
+
+    public SendResult send(Message msg,
+        DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        AggregateKey partitionKey = new AggregateKey(msg);
+        while (true) {
+            MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, defaultMQProducer);
+            int index = batch.add(msg);
+            if (index == -1) {
+                syncSendBatchs.remove(partitionKey, batch);
+            } else {
+                return batch.sendResults[index];
+            }
+        }
+    }
+
+    public SendResult send(Message msg, MessageQueue mq,
+        DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        AggregateKey partitionKey = new AggregateKey(msg, mq);
+        while (true) {
+            MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, defaultMQProducer);
+            int index = batch.add(msg);
+            if (index == -1) {
+                syncSendBatchs.remove(partitionKey, batch);
+            } else {
+                return batch.sendResults[index];
+            }
+        }
+    }
+
+    public void send(Message msg, SendCallback sendCallback,
+        DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException {
+        AggregateKey partitionKey = new AggregateKey(msg);
+        while (true) {
+            MessageAccumulation batch = getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer);
+            if (!batch.add(msg, sendCallback)) {
+                asyncSendBatchs.remove(partitionKey, batch);
+            } else {
+                return;
+            }
+        }
+    }
+
+    public void send(Message msg, MessageQueue mq,
+        SendCallback sendCallback,
+        DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException {
+        AggregateKey partitionKey = new AggregateKey(msg, mq);
+        while (true) {
+            MessageAccumulation batch = getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer);
+            if (!batch.add(msg, sendCallback)) {
+                asyncSendBatchs.remove(partitionKey, batch);
+            } else {
+                return;
+            }
+        }
+    }
+
+    public boolean tryAddMessage(Message message) {
+        synchronized (currentlyHoldSize) {
+            if (currentlyHoldSize.get() < totalHoldSize) {
+                currentlyHoldSize.addAndGet(message.getBody().length);
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+    private class AggregateKey {
+        public String topic = null;
+        public MessageQueue mq = null;
+        public boolean waitStoreMsgOK = false;
+        public String tag = null;
+
+        public AggregateKey(Message message) {
+            this(message.getTopic(), null, message.isWaitStoreMsgOK(), message.getTags());
+        }
+
+        public AggregateKey(Message message, MessageQueue mq) {
+            this(message.getTopic(), mq, message.isWaitStoreMsgOK(), message.getTags());
+        }
+
+        public AggregateKey(String topic, MessageQueue mq, boolean waitStoreMsgOK, String tag) {
+            this.topic = topic;
+            this.mq = mq;
+            this.waitStoreMsgOK = waitStoreMsgOK;
+            this.tag = tag;
+        }
+
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            AggregateKey key = (AggregateKey) o;
+            return waitStoreMsgOK == key.waitStoreMsgOK && topic.equals(key.topic) && Objects.equals(mq, key.mq) && Objects.equals(tag, key.tag);
+        }
+
+        @Override public int hashCode() {
+            return Objects.hash(topic, mq, waitStoreMsgOK, tag);
+        }
+    }
+
+    private class MessageAccumulation {
+        private final DefaultMQProducer defaultMQProducer;
+        private LinkedList<Message> messages;
+        private LinkedList<SendCallback> sendCallbacks;
+        private Set<String> keys;
+        private AtomicBoolean closed;
+        private SendResult[] sendResults;
+        private AggregateKey aggregateKey;
+        private AtomicInteger messagesSize;
+        private int count;
+        private long createTime;
+
+        public MessageAccumulation(AggregateKey aggregateKey, DefaultMQProducer defaultMQProducer) {
+            this.defaultMQProducer = defaultMQProducer;
+            this.messages = new LinkedList<>();
+            this.sendCallbacks = new LinkedList<>();
+            this.keys = new HashSet<>();
+            this.closed = new AtomicBoolean(false);
+            this.messagesSize = new AtomicInteger(0);
+            this.aggregateKey = aggregateKey;
+            this.count = 0;
+            this.createTime = System.currentTimeMillis();
+        }
+
+        private boolean readyToSend() {
+            if (this.messagesSize.get() > holdSize
+                || System.currentTimeMillis() >= this.createTime + holdMs) {
+                return true;
+            }
+            return false;
+        }
+
+        public int add(
+            Message msg) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+            int ret = -1;
+            synchronized (this.closed) {
+                if (this.closed.get()) {
+                    return ret;
+                }
+                ret = this.count++;
+                this.messages.add(msg);
+                messagesSize.addAndGet(msg.getBody().length);
+                String msgKeys = msg.getKeys();
+                if (msgKeys != null) {
+                    this.keys.addAll(Arrays.asList(msgKeys.split(MessageConst.KEY_SEPARATOR)));
+                }
+            }
+            synchronized (this) {
+                while (!this.closed.get()) {
+                    if (readyToSend()) {
+                        this.send();
+                        break;
+                    } else {
+                        this.wait();
+                    }
+                }
+                return ret;
+            }
+        }
+
+        public boolean add(Message msg,
+            SendCallback sendCallback) throws InterruptedException, RemotingException, MQClientException {
+            synchronized (this.closed) {
+                if (this.closed.get()) {
+                    return false;
+                }
+                this.count++;
+                this.messages.add(msg);
+                this.sendCallbacks.add(sendCallback);
+                messagesSize.getAndAdd(msg.getBody().length);
+            }
+            if (readyToSend()) {
+                this.send(sendCallback);
+            }
+            return true;
+
+        }
+
+        public synchronized void wakeup() {
+            if (this.closed.get()) {
+                return;
+            }
+            this.notify();
+        }
+
+        private MessageBatch batch() {
+            MessageBatch messageBatch = new MessageBatch(this.messages);
+            messageBatch.setTopic(this.aggregateKey.topic);
+            messageBatch.setWaitStoreMsgOK(this.aggregateKey.waitStoreMsgOK);
+            messageBatch.setKeys(this.keys);
+            messageBatch.setTags(this.aggregateKey.tag);
+            messageBatch.setBody(MessageDecoder.encodeMessages(this.messages));
+            return messageBatch;
+        }
+
+        private void splitSendResults(SendResult sendResult) {
+            if (sendResult == null) {
+                throw new IllegalArgumentException("sendResult is null");
+            }
+            boolean isBatchConsumerQueue = !sendResult.getMsgId().contains(",");
+            this.sendResults = new SendResult[this.count];
+            if (!isBatchConsumerQueue) {
+                String[] msgIds = sendResult.getMsgId().split(",");
+                String[] offsetMsgIds = sendResult.getOffsetMsgId().split(",");
+                if (offsetMsgIds.length != this.count || msgIds.length != this.count) {
+                    throw new IllegalArgumentException("sendResult is illegal");
+                }
+                for (int i = 0; i < this.count; i++) {
+                    this.sendResults[i] = new SendResult(sendResult.getSendStatus(), msgIds[i],
+                        sendResult.getMessageQueue(), sendResult.getQueueOffset() + i,
+                        sendResult.getTransactionId(), offsetMsgIds[i], sendResult.getRegionId());
+                }
+            } else {
+                for (int i = 0; i < this.count; i++) {
+                    this.sendResults[i] = sendResult;
+                }
+            }
+        }
+
+        private void send() throws InterruptedException, MQClientException, MQBrokerException, RemotingException {
+            synchronized (this.closed) {
+                if (this.closed.getAndSet(true)) {
+                    return;
+                }
+            }
+            MessageBatch messageBatch = this.batch();
+            SendResult sendResult = null;
+            try {
+                if (defaultMQProducer != null) {
+                    sendResult = defaultMQProducer.sendDirect(messageBatch, aggregateKey.mq, null);
+                    this.splitSendResults(sendResult);

Review comment:
       if sendResult is null, it will throw new exception in splitSendResults function, see org/apache/rocketmq/client/producer/ProduceAccumulator.java:403 for detail




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin commented on pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#issuecomment-1006573209


   refer #https://github.com/apache/rocketmq/wiki/RIP-27-Auto-batching-in-producer
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] guyinyou commented on pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
guyinyou commented on pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#issuecomment-1022793052


   @yuz10 There is no lamda expression in jdk1.6. For compatibility with it, replace computeIfAbsent with putIfAbsent.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] codecov-commenter commented on pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#issuecomment-1008516915


   # [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3718?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3718](https://codecov.io/gh/apache/rocketmq/pull/3718?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (450a164) into [develop](https://codecov.io/gh/apache/rocketmq/commit/7d1b001c2fe50f9c22096efdc42a42b60d3ee2d6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7d1b001) will **decrease** coverage by `2.57%`.
   > The diff coverage is `54.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/rocketmq/pull/3718/graphs/tree.svg?width=650&height=150&src=pr&token=4w0sxP1wZv&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/rocketmq/pull/3718?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             develop    #3718      +/-   ##
   =============================================
   - Coverage      49.76%   47.18%   -2.58%     
   - Complexity      4732     5037     +305     
   =============================================
     Files            555      629      +74     
     Lines          36770    41778    +5008     
     Branches        4847     5429     +582     
   =============================================
   + Hits           18297    19713    +1416     
   - Misses         16185    19606    +3421     
   - Partials        2288     2459     +171     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/3718?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...rocketmq/client/impl/factory/MQClientInstance.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9mYWN0b3J5L01RQ2xpZW50SW5zdGFuY2UuamF2YQ==) | `51.30% <0.00%> (-2.00%)` | :arrow_down: |
   | [...he/rocketmq/client/producer/DefaultMQProducer.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvcHJvZHVjZXIvRGVmYXVsdE1RUHJvZHVjZXIuamF2YQ==) | `51.51% <42.85%> (-5.84%)` | :arrow_down: |
   | [...mq/client/impl/producer/DefaultMQProducerImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9wcm9kdWNlci9EZWZhdWx0TVFQcm9kdWNlckltcGwuamF2YQ==) | `45.75% <47.82%> (-0.31%)` | :arrow_down: |
   | [...e/rocketmq/client/producer/ProduceAccumulator.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvcHJvZHVjZXIvUHJvZHVjZUFjY3VtdWxhdG9yLmphdmE=) | `59.23% <59.23%> (ø)` | |
   | [...g/apache/rocketmq/client/impl/MQClientManager.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9NUUNsaWVudE1hbmFnZXIuamF2YQ==) | `88.00% <100.00%> (+1.63%)` | :arrow_up: |
   | [...g/apache/rocketmq/common/message/MessageBatch.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vbWVzc2FnZS9NZXNzYWdlQmF0Y2guamF2YQ==) | `81.48% <100.00%> (ø)` | |
   | [.../rocketmq/client/producer/RequestFutureHolder.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvcHJvZHVjZXIvUmVxdWVzdEZ1dHVyZUhvbGRlci5qYXZh) | `81.39% <0.00%> (-12.36%)` | :arrow_down: |
   | [...apache/rocketmq/remoting/netty/ResponseFuture.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L1Jlc3BvbnNlRnV0dXJlLmphdmE=) | `85.00% <0.00%> (-5.00%)` | :arrow_down: |
   | [...pache/rocketmq/store/stats/BrokerStatsManager.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3N0YXRzL0Jyb2tlclN0YXRzTWFuYWdlci5qYXZh) | `71.87% <0.00%> (-4.64%)` | :arrow_down: |
   | [...ache/rocketmq/common/stats/MomentStatsItemSet.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vc3RhdHMvTW9tZW50U3RhdHNJdGVtU2V0LmphdmE=) | `39.13% <0.00%> (-4.35%)` | :arrow_down: |
   | ... and [104 more](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/3718?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3718?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [fe98bed...450a164](https://codecov.io/gh/apache/rocketmq/pull/3718?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] codecov-commenter edited a comment on pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#issuecomment-1008516915


   # [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3718?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3718](https://codecov.io/gh/apache/rocketmq/pull/3718?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b697a49) into [develop](https://codecov.io/gh/apache/rocketmq/commit/7d1b001c2fe50f9c22096efdc42a42b60d3ee2d6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7d1b001) will **decrease** coverage by `2.45%`.
   > The diff coverage is `60.10%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/rocketmq/pull/3718/graphs/tree.svg?width=650&height=150&src=pr&token=4w0sxP1wZv&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/rocketmq/pull/3718?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             develop    #3718      +/-   ##
   =============================================
   - Coverage      49.76%   47.30%   -2.46%     
   - Complexity      4732     5050     +318     
   =============================================
     Files            555      629      +74     
     Lines          36770    41778    +5008     
     Branches        4847     5429     +582     
   =============================================
   + Hits           18297    19765    +1468     
   - Misses         16185    19549    +3364     
   - Partials        2288     2464     +176     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/3718?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...rocketmq/client/impl/factory/MQClientInstance.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9mYWN0b3J5L01RQ2xpZW50SW5zdGFuY2UuamF2YQ==) | `53.13% <0.00%> (-0.16%)` | :arrow_down: |
   | [...he/rocketmq/client/producer/DefaultMQProducer.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvcHJvZHVjZXIvRGVmYXVsdE1RUHJvZHVjZXIuamF2YQ==) | `51.51% <42.85%> (-5.84%)` | :arrow_down: |
   | [...mq/client/impl/producer/DefaultMQProducerImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9wcm9kdWNlci9EZWZhdWx0TVFQcm9kdWNlckltcGwuamF2YQ==) | `45.75% <47.82%> (-0.31%)` | :arrow_down: |
   | [...e/rocketmq/client/producer/ProduceAccumulator.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvcHJvZHVjZXIvUHJvZHVjZUFjY3VtdWxhdG9yLmphdmE=) | `67.30% <67.30%> (ø)` | |
   | [...g/apache/rocketmq/client/impl/MQClientManager.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9NUUNsaWVudE1hbmFnZXIuamF2YQ==) | `88.00% <100.00%> (+1.63%)` | :arrow_up: |
   | [...g/apache/rocketmq/common/message/MessageBatch.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vbWVzc2FnZS9NZXNzYWdlQmF0Y2guamF2YQ==) | `81.48% <100.00%> (ø)` | |
   | [...rocketmq/broker/filtersrv/FilterServerManager.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvZmlsdGVyc3J2L0ZpbHRlclNlcnZlck1hbmFnZXIuamF2YQ==) | `20.00% <0.00%> (-14.29%)` | :arrow_down: |
   | [.../rocketmq/client/producer/RequestFutureHolder.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvcHJvZHVjZXIvUmVxdWVzdEZ1dHVyZUhvbGRlci5qYXZh) | `81.39% <0.00%> (-12.36%)` | :arrow_down: |
   | [...tmq/logappender/log4j2/RocketmqLog4j2Appender.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-bG9nYXBwZW5kZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL2xvZ2FwcGVuZGVyL2xvZzRqMi9Sb2NrZXRtcUxvZzRqMkFwcGVuZGVyLmphdmE=) | `35.00% <0.00%> (-10.00%)` | :arrow_down: |
   | [...pache/rocketmq/store/stats/BrokerStatsManager.java](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3N0YXRzL0Jyb2tlclN0YXRzTWFuYWdlci5qYXZh) | `71.87% <0.00%> (-4.64%)` | :arrow_down: |
   | ... and [101 more](https://codecov.io/gh/apache/rocketmq/pull/3718/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/3718?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3718?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [fe98bed...b697a49](https://codecov.io/gh/apache/rocketmq/pull/3718?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] guyinyou commented on a change in pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
guyinyou commented on a change in pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#discussion_r782872852



##########
File path: client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
##########
@@ -197,20 +208,21 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
     /**
      * Constructor specifying namespace, producer group and RPC hook.
      *
-     * @param namespace Namespace for this MQ Producer instance.
+     * @param namespace     Namespace for this MQ Producer instance.
      * @param producerGroup Producer group, see the name-sake field.
-     * @param rpcHook RPC hook to execute per each remoting command execution.
+     * @param rpcHook       RPC hook to execute per each remoting command execution.
      */
     public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
         this.namespace = namespace;
         this.producerGroup = producerGroup;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+        produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);

Review comment:
       ```
   producer.setAutoBatch(true);
   producer.batchMaxDelayMs(xxx);
   producer.batchMaxBytes(xxx);
   producer.totalBatchMaxBytes(xxx);
   ```
   The usage of the interface, after being exposed to the user, should support that he can call at any time




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] yuz10 commented on a change in pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#discussion_r780668774



##########
File path: client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class ProduceAccumulator {
+    // totalHoldSize normal value
+    private long totalHoldSize = 32 * 1024 * 1024;
+    // holdSize normal value
+    private long holdSize = 32 * 1024;
+    // holdMs normal value
+    private int holdMs = 10;
+    private static final InternalLogger log = ClientLogger.getLog();
+    private final GuardForSyncSendService guardThreadForSyncSend;
+    private final GuardForAsyncSendService guardThreadForAsyncSend;
+    private Map<AggregateKey, MessageAccumulation> syncSendBatchs = new ConcurrentHashMap<>();
+    private Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new ConcurrentHashMap<>();
+    private AtomicLong currentlyHoldSize = new AtomicLong(0);
+    private final String instanceName;
+
+    public ProduceAccumulator(String instanceName) {
+        this.instanceName = instanceName;
+        this.guardThreadForSyncSend = new GuardForSyncSendService(this.instanceName);
+        this.guardThreadForAsyncSend = new GuardForAsyncSendService(this.instanceName);
+    }
+
+    class GuardForSyncSendService extends ServiceThread {
+        private final String serviceName;
+
+        public GuardForSyncSendService(String clientInstanceName) {
+            serviceName = String.format("Client_%s_GuardForSyncSend", clientInstanceName);
+        }
+
+        @Override public String getServiceName() {
+            return serviceName;
+        }
+
+        @Override public void run() {
+            ProduceAccumulator.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.doWork();
+                } catch (Exception e) {
+                    ProduceAccumulator.log.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            ProduceAccumulator.log.info(this.getServiceName() + " service end");
+        }
+
+        private void doWork() throws Exception {
+            Collection<MessageAccumulation> values = syncSendBatchs.values();
+            final int sleepTime = Math.max(1, holdMs / 2);
+            values.parallelStream().forEach((v) -> {
+                v.wakeup();
+                synchronized (v) {
+                    synchronized (v.closed) {
+                        if (v.messagesSize.get() == 0) {
+                            v.closed.set(true);
+                            syncSendBatchs.remove(v.aggregateKey, v);
+                        } else {
+                            v.notify();
+                        }
+                    }
+                }
+            });
+            Thread.sleep(sleepTime);
+        }
+    }
+
+    class GuardForAsyncSendService extends ServiceThread {
+        private final String serviceName;
+
+        public GuardForAsyncSendService(String clientInstanceName) {
+            serviceName = String.format("Client_%s_GuardForAsyncSend", clientInstanceName);
+        }
+
+        @Override public String getServiceName() {
+            return serviceName;
+        }
+
+        @Override public void run() {
+            ProduceAccumulator.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.doWork();
+                } catch (Exception e) {
+                    ProduceAccumulator.log.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            ProduceAccumulator.log.info(this.getServiceName() + " service end");
+        }
+
+        private void doWork() throws Exception {
+            Collection<MessageAccumulation> values = asyncSendBatchs.values();
+            final int sleepTime = Math.max(1, holdMs / 2);
+            values.parallelStream().forEach((v) -> {
+                if (v.readyToSend()) {
+                    v.send(null);
+                }
+                synchronized (v.closed) {
+                    if (v.messagesSize.get() == 0) {
+                        v.closed.set(true);
+                        asyncSendBatchs.remove(v.aggregateKey, v);
+                    }
+                }
+            });
+            Thread.sleep(sleepTime);
+        }
+    }
+
+    public void start() {
+        guardThreadForSyncSend.start();
+        guardThreadForAsyncSend.start();
+    }
+
+    public void shutdown() {
+        guardThreadForSyncSend.shutdown();
+        guardThreadForAsyncSend.shutdown();
+    }
+
+    public int getBatchMaxDelayMs() {
+        return holdMs;
+    }
+
+    public void batchMaxDelayMs(int holdMs) {
+        if (holdMs <= 0 || holdMs > 30 * 1000) {
+            throw new IllegalArgumentException(String.format("batchMaxDelayMs expect between 1ms and 30s, but get %d!", holdMs));
+        }
+        this.holdMs = holdMs;
+    }
+
+    public long getBatchMaxBytes() {
+        return holdSize;
+    }
+
+    public void batchMaxBytes(long holdSize) {
+        if (holdSize <= 0 || holdSize > 2 * 1024 * 1024) {
+            throw new IllegalArgumentException(String.format("batchMaxBytes expect between 1B and 2MB, but get %d!", holdSize));
+        }
+        this.holdSize = holdSize;
+    }
+
+    public long getTotalBatchMaxBytes() {
+        return holdSize;
+    }
+
+    public void totalBatchMaxBytes(long totalHoldSize) {
+        if (totalHoldSize <= 0) {
+            throw new IllegalArgumentException(String.format("totalBatchMaxBytes must bigger then 0, but get %d!", totalHoldSize));
+        }
+        this.totalHoldSize = totalHoldSize;
+    }
+
+    private MessageAccumulation getOrCreateSyncSendBatch(AggregateKey aggregateKey,
+        DefaultMQProducer defaultMQProducer) {
+        return syncSendBatchs.computeIfAbsent(aggregateKey, (key -> new MessageAccumulation(aggregateKey, defaultMQProducer)));
+    }
+
+    private MessageAccumulation getOrCreateAsyncSendBatch(AggregateKey aggregateKey,
+        DefaultMQProducer defaultMQProducer) {
+        return asyncSendBatchs.computeIfAbsent(aggregateKey, (key -> new MessageAccumulation(aggregateKey, defaultMQProducer)));
+    }
+
+    public SendResult send(Message msg,
+        DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        AggregateKey partitionKey = new AggregateKey(msg);
+        while (true) {
+            MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, defaultMQProducer);
+            int index = batch.add(msg);
+            if (index == -1) {
+                syncSendBatchs.remove(partitionKey, batch);
+            } else {
+                return batch.sendResults[index];
+            }
+        }
+    }
+
+    public SendResult send(Message msg, MessageQueue mq,
+        DefaultMQProducer defaultMQProducer) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+        AggregateKey partitionKey = new AggregateKey(msg, mq);
+        while (true) {
+            MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, defaultMQProducer);
+            int index = batch.add(msg);
+            if (index == -1) {
+                syncSendBatchs.remove(partitionKey, batch);
+            } else {
+                return batch.sendResults[index];
+            }
+        }
+    }
+
+    public void send(Message msg, SendCallback sendCallback,
+        DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException {
+        AggregateKey partitionKey = new AggregateKey(msg);
+        while (true) {
+            MessageAccumulation batch = getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer);
+            if (!batch.add(msg, sendCallback)) {
+                asyncSendBatchs.remove(partitionKey, batch);
+            } else {
+                return;
+            }
+        }
+    }
+
+    public void send(Message msg, MessageQueue mq,
+        SendCallback sendCallback,
+        DefaultMQProducer defaultMQProducer) throws InterruptedException, RemotingException, MQClientException {
+        AggregateKey partitionKey = new AggregateKey(msg, mq);
+        while (true) {
+            MessageAccumulation batch = getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer);
+            if (!batch.add(msg, sendCallback)) {
+                asyncSendBatchs.remove(partitionKey, batch);
+            } else {
+                return;
+            }
+        }
+    }
+
+    public boolean tryAddMessage(Message message) {
+        synchronized (currentlyHoldSize) {
+            if (currentlyHoldSize.get() < totalHoldSize) {
+                currentlyHoldSize.addAndGet(message.getBody().length);
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+    private class AggregateKey {
+        public String topic = null;
+        public MessageQueue mq = null;
+        public boolean waitStoreMsgOK = false;
+        public String tag = null;
+
+        public AggregateKey(Message message) {
+            this(message.getTopic(), null, message.isWaitStoreMsgOK(), message.getTags());
+        }
+
+        public AggregateKey(Message message, MessageQueue mq) {
+            this(message.getTopic(), mq, message.isWaitStoreMsgOK(), message.getTags());
+        }
+
+        public AggregateKey(String topic, MessageQueue mq, boolean waitStoreMsgOK, String tag) {
+            this.topic = topic;
+            this.mq = mq;
+            this.waitStoreMsgOK = waitStoreMsgOK;
+            this.tag = tag;
+        }
+
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            AggregateKey key = (AggregateKey) o;
+            return waitStoreMsgOK == key.waitStoreMsgOK && topic.equals(key.topic) && Objects.equals(mq, key.mq) && Objects.equals(tag, key.tag);
+        }
+
+        @Override public int hashCode() {
+            return Objects.hash(topic, mq, waitStoreMsgOK, tag);
+        }
+    }
+
+    private class MessageAccumulation {
+        private final DefaultMQProducer defaultMQProducer;
+        private LinkedList<Message> messages;
+        private LinkedList<SendCallback> sendCallbacks;
+        private Set<String> keys;
+        private AtomicBoolean closed;
+        private SendResult[] sendResults;
+        private AggregateKey aggregateKey;
+        private AtomicInteger messagesSize;
+        private int count;
+        private long createTime;
+
+        public MessageAccumulation(AggregateKey aggregateKey, DefaultMQProducer defaultMQProducer) {
+            this.defaultMQProducer = defaultMQProducer;
+            this.messages = new LinkedList<>();
+            this.sendCallbacks = new LinkedList<>();
+            this.keys = new HashSet<>();
+            this.closed = new AtomicBoolean(false);
+            this.messagesSize = new AtomicInteger(0);
+            this.aggregateKey = aggregateKey;
+            this.count = 0;
+            this.createTime = System.currentTimeMillis();
+        }
+
+        private boolean readyToSend() {
+            if (this.messagesSize.get() > holdSize
+                || System.currentTimeMillis() >= this.createTime + holdMs) {
+                return true;
+            }
+            return false;
+        }
+
+        public int add(
+            Message msg) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+            int ret = -1;
+            synchronized (this.closed) {
+                if (this.closed.get()) {
+                    return ret;
+                }
+                ret = this.count++;
+                this.messages.add(msg);
+                messagesSize.addAndGet(msg.getBody().length);
+                String msgKeys = msg.getKeys();
+                if (msgKeys != null) {
+                    this.keys.addAll(Arrays.asList(msgKeys.split(MessageConst.KEY_SEPARATOR)));
+                }
+            }
+            synchronized (this) {
+                while (!this.closed.get()) {
+                    if (readyToSend()) {
+                        this.send();
+                        break;
+                    } else {
+                        this.wait();
+                    }
+                }
+                return ret;
+            }
+        }
+
+        public boolean add(Message msg,
+            SendCallback sendCallback) throws InterruptedException, RemotingException, MQClientException {
+            synchronized (this.closed) {
+                if (this.closed.get()) {
+                    return false;
+                }
+                this.count++;
+                this.messages.add(msg);
+                this.sendCallbacks.add(sendCallback);
+                messagesSize.getAndAdd(msg.getBody().length);
+            }
+            if (readyToSend()) {
+                this.send(sendCallback);
+            }
+            return true;
+
+        }
+
+        public synchronized void wakeup() {
+            if (this.closed.get()) {
+                return;
+            }
+            this.notify();
+        }
+
+        private MessageBatch batch() {
+            MessageBatch messageBatch = new MessageBatch(this.messages);
+            messageBatch.setTopic(this.aggregateKey.topic);
+            messageBatch.setWaitStoreMsgOK(this.aggregateKey.waitStoreMsgOK);
+            messageBatch.setKeys(this.keys);
+            messageBatch.setTags(this.aggregateKey.tag);
+            messageBatch.setBody(MessageDecoder.encodeMessages(this.messages));
+            return messageBatch;
+        }
+
+        private void splitSendResults(SendResult sendResult) {
+            if (sendResult == null) {
+                throw new IllegalArgumentException("sendResult is null");
+            }
+            boolean isBatchConsumerQueue = !sendResult.getMsgId().contains(",");
+            this.sendResults = new SendResult[this.count];
+            if (!isBatchConsumerQueue) {
+                String[] msgIds = sendResult.getMsgId().split(",");
+                String[] offsetMsgIds = sendResult.getOffsetMsgId().split(",");
+                if (offsetMsgIds.length != this.count || msgIds.length != this.count) {
+                    throw new IllegalArgumentException("sendResult is illegal");
+                }
+                for (int i = 0; i < this.count; i++) {
+                    this.sendResults[i] = new SendResult(sendResult.getSendStatus(), msgIds[i],
+                        sendResult.getMessageQueue(), sendResult.getQueueOffset() + i,
+                        sendResult.getTransactionId(), offsetMsgIds[i], sendResult.getRegionId());
+                }
+            } else {
+                for (int i = 0; i < this.count; i++) {
+                    this.sendResults[i] = sendResult;
+                }
+            }
+        }
+
+        private void send() throws InterruptedException, MQClientException, MQBrokerException, RemotingException {
+            synchronized (this.closed) {
+                if (this.closed.getAndSet(true)) {
+                    return;
+                }
+            }
+            MessageBatch messageBatch = this.batch();
+            SendResult sendResult = null;
+            try {
+                if (defaultMQProducer != null) {
+                    sendResult = defaultMQProducer.sendDirect(messageBatch, aggregateKey.mq, null);
+                    this.splitSendResults(sendResult);

Review comment:
       if send fail, splitSendResults is not called, and this.sendResults is null, will cause NPE in next step




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] guyinyou commented on pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
guyinyou commented on pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#issuecomment-1077271438


   Since the new broker processes the messageBatch as a single message, it should not support the case of multiple topics in one messageBatch @duhenglucky 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] yuz10 commented on a change in pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#discussion_r780647995



##########
File path: client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
##########
@@ -323,28 +340,54 @@ public void shutdown() {
      * @param msg Message to send.
      * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
+
+    private boolean canBatch(Message msg) {
+        // produceAccumulator is full
+        if (!produceAccumulator.tryAddMessage(msg)) {
+            return false;
+        }
+        // delay message do not support batch processing
+        if (msg.getDelayTimeLevel() > 0) {
+            return false;
+        }
+        // retry message do not support batch processing
+        if (msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            return false;
+        }
+        // message which have been assigned to producer group do not support batch processing
+        if (msg.getProperties().containsKey(MessageConst.PROPERTY_PRODUCER_GROUP)) {
+            return false;
+        }
+        return true;
+    }
+
     @Override
     public SendResult send(
         Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        return this.defaultMQProducerImpl.send(msg);
+
+        if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+            return sendByAccumulator(msg, null, null);

Review comment:
       We can use the benchmark to test whether the throughput is increased, and how many threads should use. If on most cases    the tps is worse, I would still recommend to disable batch support in sync mode




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] yuz10 commented on a change in pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#discussion_r780013319



##########
File path: client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
##########
@@ -323,28 +340,54 @@ public void shutdown() {
      * @param msg Message to send.
      * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
+
+    private boolean canBatch(Message msg) {
+        // produceAccumulator is full
+        if (!produceAccumulator.tryAddMessage(msg)) {
+            return false;
+        }
+        // delay message do not support batch processing
+        if (msg.getDelayTimeLevel() > 0) {
+            return false;
+        }
+        // retry message do not support batch processing
+        if (msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            return false;
+        }
+        // message which have been assigned to producer group do not support batch processing
+        if (msg.getProperties().containsKey(MessageConst.PROPERTY_PRODUCER_GROUP)) {
+            return false;
+        }
+        return true;
+    }
+
     @Override
     public SendResult send(
         Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        return this.defaultMQProducerImpl.send(msg);
+
+        if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+            return sendByAccumulator(msg, null, null);

Review comment:
       I think we should forbid batch if messege is sent in sync mode, if the messages is sent in a for loop, then time is wasted  when each message is waiting for the previous batch result.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] tianliuliu commented on pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
tianliuliu commented on pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#issuecomment-1017235782


   Whether there will be a performance improvement report in the future when using this feature


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#issuecomment-1008516873


   
   [![Coverage Status](https://coveralls.io/builds/45976121/badge)](https://coveralls.io/builds/45976121)
   
   Coverage increased (+0.2%) to 51.313% when pulling **a72c5c68eeb43d6a968025cd3b3eb3039b5846dc on guyinyou:develop_feature_client_autoBatch** into **8ce2cdb004a6b968ad51a985fa262b84fa97d041 on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] guyinyou commented on a change in pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
guyinyou commented on a change in pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#discussion_r780078309



##########
File path: client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
##########
@@ -323,28 +340,54 @@ public void shutdown() {
      * @param msg Message to send.
      * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
+
+    private boolean canBatch(Message msg) {
+        // produceAccumulator is full
+        if (!produceAccumulator.tryAddMessage(msg)) {
+            return false;
+        }
+        // delay message do not support batch processing
+        if (msg.getDelayTimeLevel() > 0) {
+            return false;
+        }
+        // retry message do not support batch processing
+        if (msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            return false;
+        }
+        // message which have been assigned to producer group do not support batch processing
+        if (msg.getProperties().containsKey(MessageConst.PROPERTY_PRODUCER_GROUP)) {
+            return false;
+        }
+        return true;
+    }
+
     @Override
     public SendResult send(
         Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        return this.defaultMQProducerImpl.send(msg);
+
+        if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+            return sendByAccumulator(msg, null, null);

Review comment:
       Thank you for your suggestion. In the synchronous sending mode, we recommend that users use multi-threading, so that the same effect can be achieved.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] guyinyou commented on a change in pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
guyinyou commented on a change in pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#discussion_r780078309



##########
File path: client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
##########
@@ -323,28 +340,54 @@ public void shutdown() {
      * @param msg Message to send.
      * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
+
+    private boolean canBatch(Message msg) {
+        // produceAccumulator is full
+        if (!produceAccumulator.tryAddMessage(msg)) {
+            return false;
+        }
+        // delay message do not support batch processing
+        if (msg.getDelayTimeLevel() > 0) {
+            return false;
+        }
+        // retry message do not support batch processing
+        if (msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            return false;
+        }
+        // message which have been assigned to producer group do not support batch processing
+        if (msg.getProperties().containsKey(MessageConst.PROPERTY_PRODUCER_GROUP)) {
+            return false;
+        }
+        return true;
+    }
+
     @Override
     public SendResult send(
         Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        return this.defaultMQProducerImpl.send(msg);
+
+        if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+            return sendByAccumulator(msg, null, null);

Review comment:
       Thank you for your suggestion. In the synchronous sending mode, we recommend that users use multi-threading, so that the same effect can be achieved.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] yuz10 commented on a change in pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#discussion_r779654705



##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
##########
@@ -40,6 +43,23 @@ public static MQClientManager getInstance() {
         return instance;
     }
 
+    public ProduceAccumulator getOrCreateProduceAccumulator(final ClientConfig clientConfig) {
+        String clientId = clientConfig.buildMQClientId();
+        ProduceAccumulator accumulator = this.accumulatorTable.get(clientId);
+        if (null == accumulator) {
+            accumulator = new ProduceAccumulator(clientId);
+            ProduceAccumulator prev = this.accumulatorTable.putIfAbsent(clientId, accumulator);

Review comment:
       two steps of get and put to map can be simplified with computeIfAbsent

##########
File path: client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class ProduceAccumulator {
+    // totalHoldSize normal value
+    private long totalHoldSize = 32 * 1024 * 1024;
+    // holdSize normal value
+    private long holdSize = 32 * 1024;
+    // holdMs normal value
+    private int holdMs = 10;
+    private static final InternalLogger log = ClientLogger.getLog();
+    private final GuardForSyncSendService guardThreadForSyncSend;
+    private final GuardForAsyncSendService guardThreadForAsyncSend;
+    private Map<AggregateKey, MessageAccumulation> syncSendBatchs = new ConcurrentHashMap<>();
+    private Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new ConcurrentHashMap<>();
+    private AtomicLong currentlyHoldSize = new AtomicLong(0);
+    private final String instanceName;
+
+    public ProduceAccumulator(String instanceName) {
+        this.instanceName = instanceName;
+        this.guardThreadForSyncSend = new GuardForSyncSendService(this.instanceName);
+        this.guardThreadForAsyncSend = new GuardForAsyncSendService(this.instanceName);
+    }
+
+    class GuardForSyncSendService extends ServiceThread {
+        private final String serviceName;
+
+        public GuardForSyncSendService(String clientInstanceName) {
+            serviceName = String.format("Client_%s_GuardForSyncSend", clientInstanceName);
+        }
+
+        @Override public String getServiceName() {
+            return serviceName;
+        }
+
+        @Override public void run() {
+            ProduceAccumulator.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.doWork();
+                } catch (Exception e) {
+                    ProduceAccumulator.log.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            ProduceAccumulator.log.info(this.getServiceName() + " service end");
+        }
+
+        private void doWork() throws Exception {
+            Collection<MessageAccumulation> values = syncSendBatchs.values();
+            final int sleepTime = Math.max(1, holdMs / 2);
+            values.parallelStream().forEach((v) -> {
+                v.wakeup();
+                synchronized (v) {
+                    synchronized (v.closed) {
+                        if (v.messagesSize.get() == 0) {
+                            v.closed.set(true);
+                            syncSendBatchs.remove(v.aggregateKey, v);
+                        } else {
+                            v.notify();
+                        }
+                    }
+                }
+            });
+            Thread.sleep(sleepTime);
+        }
+    }
+
+    class GuardForAsyncSendService extends ServiceThread {
+        private final String serviceName;
+
+        public GuardForAsyncSendService(String clientInstanceName) {
+            serviceName = String.format("Client_%s_GuardForAsyncSend", clientInstanceName);
+        }
+
+        @Override public String getServiceName() {
+            return serviceName;
+        }
+
+        @Override public void run() {
+            ProduceAccumulator.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.doWork();
+                } catch (Exception e) {
+                    ProduceAccumulator.log.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            ProduceAccumulator.log.info(this.getServiceName() + " service end");
+        }
+
+        private void doWork() throws Exception {
+            Collection<MessageAccumulation> values = asyncSendBatchs.values();
+            final int sleepTime = Math.max(1, holdMs / 2);
+            values.parallelStream().forEach((v) -> {
+                if (v.readyToSend()) {
+                    v.send(null);
+                }
+                synchronized (v.closed) {
+                    if (v.messagesSize.get() == 0) {
+                        v.closed.set(true);
+                        asyncSendBatchs.remove(v.aggregateKey, v);
+                    }
+                }
+            });
+            Thread.sleep(sleepTime);
+        }
+    }
+
+    public void start() {
+        guardThreadForSyncSend.start();
+        guardThreadForAsyncSend.start();
+    }
+
+    public void shutdown() {
+        guardThreadForSyncSend.shutdown();
+        guardThreadForAsyncSend.shutdown();
+    }
+
+    public int getBatchMaxDelayMs() {
+        return holdMs;
+    }
+
+    public void batchMaxDelayMs(int holdMs) {
+        if (holdMs <= 0 || holdMs > 30 * 1000) {
+            throw new IllegalArgumentException(String.format("batchMaxDelayMs expect between 1ms and 30s, but get %d!", holdMs));
+        }
+        this.holdMs = holdMs;
+    }
+
+    public long getBatchMaxBytes() {
+        return holdSize;
+    }
+
+    public void batchMaxBytes(long holdSize) {
+        if (holdSize <= 0 || holdSize > 2 * 1024 * 1024) {
+            throw new IllegalArgumentException(String.format("batchMaxBytes expect between 1B and 2MB, but get %d!", holdSize));
+        }
+        this.holdSize = holdSize;
+    }
+
+    public long getTotalBatchMaxBytes() {
+        return holdSize;
+    }
+
+    public void totalBatchMaxBytes(long totalHoldSize) {
+        if (totalHoldSize <= 0) {
+            throw new IllegalArgumentException(String.format("totalBatchMaxBytes must bigger then 0, but get %d!", totalHoldSize));
+        }
+        this.totalHoldSize = totalHoldSize;
+    }
+
+    private MessageAccumulation getOrCreateSyncSendBatch(AggregateKey aggregateKey,
+        DefaultMQProducer defaultMQProducer) {
+        MessageAccumulation batch = syncSendBatchs.get(aggregateKey);
+        if (batch != null) {
+            return batch;
+        }
+        batch = new MessageAccumulation(aggregateKey, defaultMQProducer);
+        MessageAccumulation previous = syncSendBatchs.putIfAbsent(aggregateKey, batch);

Review comment:
       can be simplified with computeIfAbsent

##########
File path: client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class ProduceAccumulator {
+    // totalHoldSize normal value
+    private long totalHoldSize = 32 * 1024 * 1024;
+    // holdSize normal value
+    private long holdSize = 32 * 1024;
+    // holdMs normal value
+    private int holdMs = 10;
+    private static final InternalLogger log = ClientLogger.getLog();
+    private final GuardForSyncSendService guardThreadForSyncSend;
+    private final GuardForAsyncSendService guardThreadForAsyncSend;
+    private Map<AggregateKey, MessageAccumulation> syncSendBatchs = new ConcurrentHashMap<>();
+    private Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new ConcurrentHashMap<>();
+    private AtomicLong currentlyHoldSize = new AtomicLong(0);
+    private final String instanceName;
+
+    public ProduceAccumulator(String instanceName) {
+        this.instanceName = instanceName;
+        this.guardThreadForSyncSend = new GuardForSyncSendService(this.instanceName);
+        this.guardThreadForAsyncSend = new GuardForAsyncSendService(this.instanceName);
+    }
+
+    class GuardForSyncSendService extends ServiceThread {
+        private final String serviceName;
+
+        public GuardForSyncSendService(String clientInstanceName) {
+            serviceName = String.format("Client_%s_GuardForSyncSend", clientInstanceName);
+        }
+
+        @Override public String getServiceName() {
+            return serviceName;
+        }
+
+        @Override public void run() {
+            ProduceAccumulator.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.doWork();
+                } catch (Exception e) {
+                    ProduceAccumulator.log.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            ProduceAccumulator.log.info(this.getServiceName() + " service end");
+        }
+
+        private void doWork() throws Exception {
+            Collection<MessageAccumulation> values = syncSendBatchs.values();
+            final int sleepTime = Math.max(1, holdMs / 2);
+            values.parallelStream().forEach((v) -> {
+                v.wakeup();
+                synchronized (v) {
+                    synchronized (v.closed) {
+                        if (v.messagesSize.get() == 0) {
+                            v.closed.set(true);
+                            syncSendBatchs.remove(v.aggregateKey, v);
+                        } else {
+                            v.notify();
+                        }
+                    }
+                }
+            });
+            Thread.sleep(sleepTime);
+        }
+    }
+
+    class GuardForAsyncSendService extends ServiceThread {
+        private final String serviceName;
+
+        public GuardForAsyncSendService(String clientInstanceName) {
+            serviceName = String.format("Client_%s_GuardForAsyncSend", clientInstanceName);
+        }
+
+        @Override public String getServiceName() {
+            return serviceName;
+        }
+
+        @Override public void run() {
+            ProduceAccumulator.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.doWork();
+                } catch (Exception e) {
+                    ProduceAccumulator.log.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            ProduceAccumulator.log.info(this.getServiceName() + " service end");
+        }
+
+        private void doWork() throws Exception {
+            Collection<MessageAccumulation> values = asyncSendBatchs.values();
+            final int sleepTime = Math.max(1, holdMs / 2);
+            values.parallelStream().forEach((v) -> {
+                if (v.readyToSend()) {
+                    v.send(null);
+                }
+                synchronized (v.closed) {
+                    if (v.messagesSize.get() == 0) {
+                        v.closed.set(true);
+                        asyncSendBatchs.remove(v.aggregateKey, v);
+                    }
+                }
+            });
+            Thread.sleep(sleepTime);
+        }
+    }
+
+    public void start() {
+        guardThreadForSyncSend.start();
+        guardThreadForAsyncSend.start();
+    }
+
+    public void shutdown() {
+        guardThreadForSyncSend.shutdown();
+        guardThreadForAsyncSend.shutdown();
+    }
+
+    public int getBatchMaxDelayMs() {
+        return holdMs;
+    }
+
+    public void batchMaxDelayMs(int holdMs) {
+        if (holdMs <= 0 || holdMs > 30 * 1000) {
+            throw new IllegalArgumentException(String.format("batchMaxDelayMs expect between 1ms and 30s, but get %d!", holdMs));
+        }
+        this.holdMs = holdMs;
+    }
+
+    public long getBatchMaxBytes() {
+        return holdSize;
+    }
+
+    public void batchMaxBytes(long holdSize) {
+        if (holdSize <= 0 || holdSize > 2 * 1024 * 1024) {
+            throw new IllegalArgumentException(String.format("batchMaxBytes expect between 1B and 2MB, but get %d!", holdSize));
+        }
+        this.holdSize = holdSize;
+    }
+
+    public long getTotalBatchMaxBytes() {
+        return holdSize;
+    }
+
+    public void totalBatchMaxBytes(long totalHoldSize) {
+        if (totalHoldSize <= 0) {
+            throw new IllegalArgumentException(String.format("totalBatchMaxBytes must bigger then 0, but get %d!", totalHoldSize));
+        }
+        this.totalHoldSize = totalHoldSize;
+    }
+
+    private MessageAccumulation getOrCreateSyncSendBatch(AggregateKey aggregateKey,
+        DefaultMQProducer defaultMQProducer) {
+        MessageAccumulation batch = syncSendBatchs.get(aggregateKey);
+        if (batch != null) {
+            return batch;
+        }
+        batch = new MessageAccumulation(aggregateKey, defaultMQProducer);
+        MessageAccumulation previous = syncSendBatchs.putIfAbsent(aggregateKey, batch);
+
+        return previous == null ? batch : previous;
+    }
+
+    private MessageAccumulation getOrCreateAsyncSendBatch(AggregateKey aggregateKey,
+        DefaultMQProducer defaultMQProducer) {
+        MessageAccumulation batch = asyncSendBatchs.get(aggregateKey);
+        if (batch != null) {
+            return batch;
+        }
+        batch = new MessageAccumulation(aggregateKey, defaultMQProducer);
+        MessageAccumulation previous = asyncSendBatchs.putIfAbsent(aggregateKey, batch);

Review comment:
       can be simplified with computeIfAbsent

##########
File path: client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
##########
@@ -323,28 +340,54 @@ public void shutdown() {
      * @param msg Message to send.
      * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
+
+    private boolean canBatch(Message msg) {
+        // produceAccumulator is full
+        if (!produceAccumulator.tryAddMessage(msg)) {
+            return false;
+        }
+        // delay message do not support batch processing
+        if (msg.getDelayTimeLevel() > 0) {
+            return false;
+        }
+        // retry message do not support batch processing
+        if (msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            return false;
+        }
+        // message which have been assigned to producer group do not support batch processing
+        if (msg.getProperties().containsKey(MessageConst.PROPERTY_PRODUCER_GROUP)) {
+            return false;
+        }
+        return true;
+    }
+
     @Override
     public SendResult send(
         Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        return this.defaultMQProducerImpl.send(msg);
+
+        if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+            return sendByAccumulator(msg, null, null);

Review comment:
       I think we should forbid batch if messege is sent in sync mode, if the messages is sent in a for loop, then time is wasted 
    when each message is waiting for the previous batch result.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] yuz10 commented on a change in pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#discussion_r780647851



##########
File path: client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
##########
@@ -323,28 +340,54 @@ public void shutdown() {
      * @param msg Message to send.
      * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
+
+    private boolean canBatch(Message msg) {
+        // produceAccumulator is full
+        if (!produceAccumulator.tryAddMessage(msg)) {
+            return false;
+        }
+        // delay message do not support batch processing
+        if (msg.getDelayTimeLevel() > 0) {
+            return false;
+        }
+        // retry message do not support batch processing
+        if (msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            return false;
+        }
+        // message which have been assigned to producer group do not support batch processing
+        if (msg.getProperties().containsKey(MessageConst.PROPERTY_PRODUCER_GROUP)) {
+            return false;
+        }
+        return true;
+    }
+
     @Override
     public SendResult send(
         Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        return this.defaultMQProducerImpl.send(msg);
+
+        if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+            return sendByAccumulator(msg, null, null);

Review comment:
       We can use the benchmark to test whether the throughput is increased, and how many threads should use. If on most cases    the tps is worse, I would still recommend to disable batch support in sync mode




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#issuecomment-1008516873


   
   [![Coverage Status](https://coveralls.io/builds/45593060/badge)](https://coveralls.io/builds/45593060)
   
   Coverage increased (+0.04%) to 53.312% when pulling **b697a491bb67241b6a54278dede9c676fc7598c3 on guyinyou:develop_feature_client_autoBatch** into **ed5f4e4eac71849bc479aee2db7cc931d262a184 on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] odbozhou commented on a change in pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
odbozhou commented on a change in pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#discussion_r781789410



##########
File path: client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
##########
@@ -197,20 +208,21 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
     /**
      * Constructor specifying namespace, producer group and RPC hook.
      *
-     * @param namespace Namespace for this MQ Producer instance.
+     * @param namespace     Namespace for this MQ Producer instance.
      * @param producerGroup Producer group, see the name-sake field.
-     * @param rpcHook RPC hook to execute per each remoting command execution.
+     * @param rpcHook       RPC hook to execute per each remoting command execution.
      */
     public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
         this.namespace = namespace;
         this.producerGroup = producerGroup;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+        produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);

Review comment:
       Should it be judged that autoBatch is true when creating produceAccumulator?

##########
File path: client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
##########
@@ -233,18 +245,19 @@ public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, fin
      * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic
      * name.
      *
-     * @param namespace Namespace for this MQ Producer instance.
-     * @param producerGroup Producer group, see the name-sake field.
-     * @param rpcHook RPC hook to execute per each remoting command execution.
-     * @param enableMsgTrace Switch flag instance for message trace.
+     * @param namespace            Namespace for this MQ Producer instance.
+     * @param producerGroup        Producer group, see the name-sake field.
+     * @param rpcHook              RPC hook to execute per each remoting command execution.
+     * @param enableMsgTrace       Switch flag instance for message trace.
      * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
-     * trace topic name.
+     *                             trace topic name.
      */
     public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
         boolean enableMsgTrace, final String customizedTraceTopic) {
         this.namespace = namespace;
         this.producerGroup = producerGroup;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+        produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);

Review comment:
       the same as above

##########
File path: client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
##########
@@ -296,6 +312,7 @@ public void start() throws MQClientException {
     @Override
     public void shutdown() {
         this.defaultMQProducerImpl.shutdown();
+        this.produceAccumulator.shutdown();

Review comment:
       the same as above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] guyinyou commented on pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
guyinyou commented on pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#issuecomment-1022793591


   > Whether there will be a performance improvement report in the future when using this feature
   
   I plan to do some benchmarks after Chinese New Year, now here I wish you a happy new year.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls commented on pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
coveralls commented on pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#issuecomment-1008516873


   
   [![Coverage Status](https://coveralls.io/builds/45479161/badge)](https://coveralls.io/builds/45479161)
   
   Coverage decreased (-0.1%) to 53.171% when pulling **450a16437368807c9f5ef57f668fa0399b89d012 on guyinyou:develop_feature_client_autoBatch** into **ed5f4e4eac71849bc479aee2db7cc931d262a184 on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] yuz10 commented on a change in pull request #3718: [ISSUE #3717][RIP-27] Auto batching in producer

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #3718:
URL: https://github.com/apache/rocketmq/pull/3718#discussion_r780647851



##########
File path: client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
##########
@@ -323,28 +340,54 @@ public void shutdown() {
      * @param msg Message to send.
      * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
+
+    private boolean canBatch(Message msg) {
+        // produceAccumulator is full
+        if (!produceAccumulator.tryAddMessage(msg)) {
+            return false;
+        }
+        // delay message do not support batch processing
+        if (msg.getDelayTimeLevel() > 0) {
+            return false;
+        }
+        // retry message do not support batch processing
+        if (msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            return false;
+        }
+        // message which have been assigned to producer group do not support batch processing
+        if (msg.getProperties().containsKey(MessageConst.PROPERTY_PRODUCER_GROUP)) {
+            return false;
+        }
+        return true;
+    }
+
     @Override
     public SendResult send(
         Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        return this.defaultMQProducerImpl.send(msg);
+
+        if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+            return sendByAccumulator(msg, null, null);

Review comment:
       We can use the benchmark to test whether the throughput is increased, and how many threads should use. If on most cases    the tps is worse, I would still recommend to disable batch support in sync mode

##########
File path: client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
##########
@@ -323,28 +340,54 @@ public void shutdown() {
      * @param msg Message to send.
      * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
+
+    private boolean canBatch(Message msg) {
+        // produceAccumulator is full
+        if (!produceAccumulator.tryAddMessage(msg)) {
+            return false;
+        }
+        // delay message do not support batch processing
+        if (msg.getDelayTimeLevel() > 0) {
+            return false;
+        }
+        // retry message do not support batch processing
+        if (msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            return false;
+        }
+        // message which have been assigned to producer group do not support batch processing
+        if (msg.getProperties().containsKey(MessageConst.PROPERTY_PRODUCER_GROUP)) {
+            return false;
+        }
+        return true;
+    }
+
     @Override
     public SendResult send(
         Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        return this.defaultMQProducerImpl.send(msg);
+
+        if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+            return sendByAccumulator(msg, null, null);

Review comment:
       We can use the benchmark to test whether the throughput is increased, and how many threads should use. If on most cases    the tps is worse, I would still recommend to disable batch support in sync mode




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org