You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/11/08 10:39:19 UTC

[GitHub] [rocketmq] TerrellChen opened a new pull request #2406: Issue_690: support batch msgs in dledger

TerrellChen opened a new pull request #2406:
URL: https://github.com/apache/rocketmq/pull/2406


   ## What is the purpose of the change
   
   https://github.com/apache/rocketmq/issues/690
   
   ## Brief changelog
   
   XX
   
   ## Verifying this change
   
   XXXX
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] TerrellChen commented on a change in pull request #2406: [ISSUE #690] Support batch msgs in dledger mode

Posted by GitBox <gi...@apache.org>.
TerrellChen commented on a change in pull request #2406:
URL: https://github.com/apache/rocketmq/pull/2406#discussion_r520639304



##########
File path: store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
##########
@@ -507,7 +513,122 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
 
     @Override
     public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
-        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+        if (messageExtBatch.getDelayTimeLevel() > 0) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+
+        // Set the storage time
+        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+
+        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setStoreHostAddressV6Flag();
+        }
+
+        // Back to Results
+        AppendMessageResult appendResult;
+        BatchAppendFuture<AppendEntryResponse> dledgerFuture;
+        EncodeResult encodeResult;
+
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+        msgIdBuilder.setLength(0);
+        long elapsedTimeInLock;
+        long queueOffset;
+        long msgNum = 0;
+        try {
+            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+            encodeResult = this.messageSerializer.serialize(messageExtBatch);
+            queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+            if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
+                        .status));
+            }
+            BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+            request.setGroup(dLedgerConfig.getGroup());
+            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+            request.setBatchMsgs(encodeResult.batchData);
+            dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+            if (dledgerFuture.getPos() == -1) {
+                log.warn("[DEBUG_CTR] handleAppend return false due to error code {}", dledgerFuture.get().getCode());
+                return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
+            }
+            long wroteOffset = 0;
+
+            int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
+
+            for (long pos : dledgerFuture.getPositions()) {
+                wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
+                String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
+                if (msgIdBuilder.length() > 0) {
+                    msgIdBuilder.append(',').append(msgId);
+                } else {
+                    msgIdBuilder.append(msgId);
+                }
+                msgNum++;
+            }

Review comment:
       LGTM!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin edited a comment on pull request #2406: [ISSUE #690] Support batch msgs in dledger mode

Posted by GitBox <gi...@apache.org>.
RongtongJin edited a comment on pull request #2406:
URL: https://github.com/apache/rocketmq/pull/2406#issuecomment-723563198


   Good job! I will review the code ASAP.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin merged pull request #2406: [ISSUE #690] Support batch msgs in dledger mode

Posted by GitBox <gi...@apache.org>.
RongtongJin merged pull request #2406:
URL: https://github.com/apache/rocketmq/pull/2406


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin commented on pull request #2406: [ISSUE #690] Support batch msgs in dledger mode

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on pull request #2406:
URL: https://github.com/apache/rocketmq/pull/2406#issuecomment-724604486


   @TerrellChen  I have released the dledger version to 0.2.2 which includes the bug you fixed. You can upgrade the dledger version when you have time. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] TerrellChen commented on a change in pull request #2406: [ISSUE #690] Support batch msgs in dledger mode

Posted by GitBox <gi...@apache.org>.
TerrellChen commented on a change in pull request #2406:
URL: https://github.com/apache/rocketmq/pull/2406#discussion_r520638172



##########
File path: store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
##########
@@ -507,7 +513,122 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
 
     @Override
     public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
-        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+        if (messageExtBatch.getDelayTimeLevel() > 0) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+
+        // Set the storage time
+        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+
+        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setStoreHostAddressV6Flag();
+        }
+
+        // Back to Results
+        AppendMessageResult appendResult;
+        BatchAppendFuture<AppendEntryResponse> dledgerFuture;
+        EncodeResult encodeResult;
+
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+        msgIdBuilder.setLength(0);
+        long elapsedTimeInLock;
+        long queueOffset;
+        long msgNum = 0;
+        try {
+            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+            encodeResult = this.messageSerializer.serialize(messageExtBatch);
+            queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+            if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
+                        .status));
+            }
+            BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+            request.setGroup(dLedgerConfig.getGroup());
+            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+            request.setBatchMsgs(encodeResult.batchData);
+            dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+            if (dledgerFuture.getPos() == -1) {
+                log.warn("[DEBUG_CTR] handleAppend return false due to error code {}", dledgerFuture.get().getCode());
+                return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
+            }
+            long wroteOffset = 0;
+
+            int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
+
+            for (long pos : dledgerFuture.getPositions()) {
+                wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
+                String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
+                if (msgIdBuilder.length() > 0) {
+                    msgIdBuilder.append(',').append(msgId);
+                } else {
+                    msgIdBuilder.append(msgId);
+                }
+                msgNum++;
+            }
+            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
+            appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.totalMsgLen,
+                    msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
+            DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + msgNum);

Review comment:
       Good job!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] TerrellChen commented on a change in pull request #2406: [ISSUE #690] Support batch msgs in dledger mode

Posted by GitBox <gi...@apache.org>.
TerrellChen commented on a change in pull request #2406:
URL: https://github.com/apache/rocketmq/pull/2406#discussion_r522194383



##########
File path: store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
##########
@@ -609,7 +735,122 @@ public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
 
     @Override
     public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
-        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+        }
+        if (messageExtBatch.getDelayTimeLevel() > 0) {
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+        }
+
+        // Set the storage time
+        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+
+        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setStoreHostAddressV6Flag();
+        }
+
+        // Back to Results
+        AppendMessageResult appendResult;
+        BatchAppendFuture<AppendEntryResponse> dledgerFuture;
+        EncodeResult encodeResult;
+
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+        msgIdBuilder.setLength(0);
+        long elapsedTimeInLock;
+        long queueOffset;
+        long msgNum = 0;
+        try {
+            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+            encodeResult = this.messageSerializer.serialize(messageExtBatch);
+            queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+            if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
+                        .status)));
+            }
+            BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+            request.setGroup(dLedgerConfig.getGroup());
+            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+            request.setBatchMsgs(encodeResult.batchData);
+            dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+            if (dledgerFuture.getPos() == -1) {
+                log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode());
+                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+            }
+            long wroteOffset = 0;
+
+            int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
+
+            boolean isFirstOffset = true;
+            for (long pos : dledgerFuture.getPositions()) {
+                if (isFirstOffset) {
+                    wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
+                    isFirstOffset = false;
+                }
+                String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
+                if (msgIdBuilder.length() > 0) {
+                    msgIdBuilder.append(',').append(msgId);
+                } else {
+                    msgIdBuilder.append(msgId);
+                }
+                msgNum++;
+            }

Review comment:
       fixed

##########
File path: store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
##########
@@ -507,7 +513,127 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
 
     @Override
     public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
-        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+        if (messageExtBatch.getDelayTimeLevel() > 0) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+
+        // Set the storage time
+        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+
+        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setStoreHostAddressV6Flag();
+        }
+
+        // Back to Results
+        AppendMessageResult appendResult;
+        BatchAppendFuture<AppendEntryResponse> dledgerFuture;
+        EncodeResult encodeResult;
+
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+        msgIdBuilder.setLength(0);
+        long elapsedTimeInLock;
+        long queueOffset;
+        long msgNum = 0;
+        try {
+            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+            encodeResult = this.messageSerializer.serialize(messageExtBatch);
+            queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+            if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
+                        .status));
+            }
+            BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+            request.setGroup(dLedgerConfig.getGroup());
+            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+            request.setBatchMsgs(encodeResult.batchData);
+            dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+            if (dledgerFuture.getPos() == -1) {
+                log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode());
+                return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
+            }
+            long wroteOffset = 0;
+
+            int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
+
+            boolean isFirstOffset = true;
+            for (long pos : dledgerFuture.getPositions()) {
+                if (isFirstOffset) {
+                    wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
+                    isFirstOffset = false;
+                }
+                String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
+                if (msgIdBuilder.length() > 0) {
+                    msgIdBuilder.append(',').append(msgId);
+                } else {
+                    msgIdBuilder.append(msgId);
+                }
+                msgNum++;
+            }

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] TerrellChen commented on pull request #2406: [ISSUE #690] Support batch msgs in dledger mode

Posted by GitBox <gi...@apache.org>.
TerrellChen commented on pull request #2406:
URL: https://github.com/apache/rocketmq/pull/2406#issuecomment-724773490


   @RongtongJin Thank you for all your assistance! 
   I've made a commit by your comment. Please check it again. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #2406: [ISSUE #690] Support batch msgs in dledger mode

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #2406:
URL: https://github.com/apache/rocketmq/pull/2406#issuecomment-724798593


   
   [![Coverage Status](https://coveralls.io/builds/34921896/badge)](https://coveralls.io/builds/34921896)
   
   Coverage increased (+0.4%) to 51.88% when pulling **4dbf7d6c9f8a05763d5202f8151b5ffa5bb32661 on TerrellChen:issue_690** into **0b600484b512f01d752e39ebb42c5a8b6e50fcb5 on apache:develop**.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] TerrellChen commented on a change in pull request #2406: [ISSUE #690] Support batch msgs in dledger mode

Posted by GitBox <gi...@apache.org>.
TerrellChen commented on a change in pull request #2406:
URL: https://github.com/apache/rocketmq/pull/2406#discussion_r520639761



##########
File path: store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
##########
@@ -507,7 +513,122 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
 
     @Override
     public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
-        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+        if (messageExtBatch.getDelayTimeLevel() > 0) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+
+        // Set the storage time
+        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+
+        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setStoreHostAddressV6Flag();
+        }
+
+        // Back to Results
+        AppendMessageResult appendResult;
+        BatchAppendFuture<AppendEntryResponse> dledgerFuture;
+        EncodeResult encodeResult;
+
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+        msgIdBuilder.setLength(0);
+        long elapsedTimeInLock;
+        long queueOffset;
+        long msgNum = 0;
+        try {
+            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+            encodeResult = this.messageSerializer.serialize(messageExtBatch);
+            queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+            if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
+                        .status));
+            }
+            BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+            request.setGroup(dLedgerConfig.getGroup());
+            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+            request.setBatchMsgs(encodeResult.batchData);
+            dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+            if (dledgerFuture.getPos() == -1) {
+                log.warn("[DEBUG_CTR] handleAppend return false due to error code {}", dledgerFuture.get().getCode());

Review comment:
       My fault!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls commented on pull request #2406: [ISSUE #690] Support batch msgs in dledger mode

Posted by GitBox <gi...@apache.org>.
coveralls commented on pull request #2406:
URL: https://github.com/apache/rocketmq/pull/2406#issuecomment-724798593


   
   [![Coverage Status](https://coveralls.io/builds/34859580/badge)](https://coveralls.io/builds/34859580)
   
   Coverage increased (+0.2%) to 51.735% when pulling **6e7488353dc2c35b6fba097da1b5c28ced7d2a61 on TerrellChen:issue_690** into **03c1f11d6bc3034daca519de043fd4d2f69bb047 on apache:develop**.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin commented on a change in pull request #2406: [ISSUE #690] Support batch msgs in dledger mode

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #2406:
URL: https://github.com/apache/rocketmq/pull/2406#discussion_r520945736



##########
File path: store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
##########
@@ -609,7 +735,122 @@ public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
 
     @Override
     public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
-        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+        }
+        if (messageExtBatch.getDelayTimeLevel() > 0) {
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+        }
+
+        // Set the storage time
+        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+
+        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setStoreHostAddressV6Flag();
+        }
+
+        // Back to Results
+        AppendMessageResult appendResult;
+        BatchAppendFuture<AppendEntryResponse> dledgerFuture;
+        EncodeResult encodeResult;
+
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+        msgIdBuilder.setLength(0);
+        long elapsedTimeInLock;
+        long queueOffset;
+        long msgNum = 0;
+        try {
+            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+            encodeResult = this.messageSerializer.serialize(messageExtBatch);
+            queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+            if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
+                        .status)));
+            }
+            BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+            request.setGroup(dLedgerConfig.getGroup());
+            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+            request.setBatchMsgs(encodeResult.batchData);
+            dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+            if (dledgerFuture.getPos() == -1) {
+                log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode());
+                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+            }
+            long wroteOffset = 0;
+
+            int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
+
+            boolean isFirstOffset = true;
+            for (long pos : dledgerFuture.getPositions()) {
+                if (isFirstOffset) {
+                    wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
+                    isFirstOffset = false;
+                }
+                String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
+                if (msgIdBuilder.length() > 0) {
+                    msgIdBuilder.append(',').append(msgId);
+                } else {
+                    msgIdBuilder.append(msgId);
+                }
+                msgNum++;
+            }

Review comment:
       This is my fault. The code will cause the writeOffset of the following message not to be updated, so as to get the wrong msgId. We need to fix it and ensure that the writeOffset of the first message is returned to the user.

##########
File path: store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
##########
@@ -507,7 +513,127 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
 
     @Override
     public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
-        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+        if (messageExtBatch.getDelayTimeLevel() > 0) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+
+        // Set the storage time
+        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+
+        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setStoreHostAddressV6Flag();
+        }
+
+        // Back to Results
+        AppendMessageResult appendResult;
+        BatchAppendFuture<AppendEntryResponse> dledgerFuture;
+        EncodeResult encodeResult;
+
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+        msgIdBuilder.setLength(0);
+        long elapsedTimeInLock;
+        long queueOffset;
+        long msgNum = 0;
+        try {
+            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+            encodeResult = this.messageSerializer.serialize(messageExtBatch);
+            queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+            if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
+                        .status));
+            }
+            BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+            request.setGroup(dLedgerConfig.getGroup());
+            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+            request.setBatchMsgs(encodeResult.batchData);
+            dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+            if (dledgerFuture.getPos() == -1) {
+                log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode());
+                return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
+            }
+            long wroteOffset = 0;
+
+            int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
+
+            boolean isFirstOffset = true;
+            for (long pos : dledgerFuture.getPositions()) {
+                if (isFirstOffset) {
+                    wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
+                    isFirstOffset = false;
+                }
+                String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
+                if (msgIdBuilder.length() > 0) {
+                    msgIdBuilder.append(',').append(msgId);
+                } else {
+                    msgIdBuilder.append(msgId);
+                }
+                msgNum++;
+            }

Review comment:
       Same as below




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin commented on pull request #2406: [ISSUE #690] Support batch msgs in dledger mode

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on pull request #2406:
URL: https://github.com/apache/rocketmq/pull/2406#issuecomment-723563198


   Good. job! I will review the code ASAP.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin commented on a change in pull request #2406: [ISSUE #690] Support batch msgs in dledger mode

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #2406:
URL: https://github.com/apache/rocketmq/pull/2406#discussion_r519521598



##########
File path: store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
##########
@@ -507,7 +513,122 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
 
     @Override
     public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
-        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+        if (messageExtBatch.getDelayTimeLevel() > 0) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+
+        // Set the storage time
+        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+
+        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setStoreHostAddressV6Flag();
+        }
+
+        // Back to Results
+        AppendMessageResult appendResult;
+        BatchAppendFuture<AppendEntryResponse> dledgerFuture;
+        EncodeResult encodeResult;
+
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+        msgIdBuilder.setLength(0);
+        long elapsedTimeInLock;
+        long queueOffset;
+        long msgNum = 0;
+        try {
+            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+            encodeResult = this.messageSerializer.serialize(messageExtBatch);
+            queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+            if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
+                        .status));
+            }
+            BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+            request.setGroup(dLedgerConfig.getGroup());
+            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+            request.setBatchMsgs(encodeResult.batchData);
+            dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+            if (dledgerFuture.getPos() == -1) {
+                log.warn("[DEBUG_CTR] handleAppend return false due to error code {}", dledgerFuture.get().getCode());

Review comment:
       Removing `[DEBUG_CTR]` will be better.

##########
File path: store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
##########
@@ -507,7 +513,122 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
 
     @Override
     public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
-        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+        if (messageExtBatch.getDelayTimeLevel() > 0) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+
+        // Set the storage time
+        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+
+        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setStoreHostAddressV6Flag();
+        }
+
+        // Back to Results
+        AppendMessageResult appendResult;
+        BatchAppendFuture<AppendEntryResponse> dledgerFuture;
+        EncodeResult encodeResult;
+
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+        msgIdBuilder.setLength(0);
+        long elapsedTimeInLock;
+        long queueOffset;
+        long msgNum = 0;
+        try {
+            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+            encodeResult = this.messageSerializer.serialize(messageExtBatch);
+            queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+            if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
+                        .status));
+            }
+            BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+            request.setGroup(dLedgerConfig.getGroup());
+            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+            request.setBatchMsgs(encodeResult.batchData);
+            dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+            if (dledgerFuture.getPos() == -1) {
+                log.warn("[DEBUG_CTR] handleAppend return false due to error code {}", dledgerFuture.get().getCode());
+                return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
+            }
+            long wroteOffset = 0;
+
+            int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
+
+            for (long pos : dledgerFuture.getPositions()) {
+                wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
+                String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
+                if (msgIdBuilder.length() > 0) {
+                    msgIdBuilder.append(',').append(msgId);
+                } else {
+                    msgIdBuilder.append(msgId);
+                }
+                msgNum++;
+            }

Review comment:
       Write code as this:
   ```java
   boolean isFirstOffset = true;
               for (long pos : dledgerFuture.getPositions()) {
                   if(isFirstOffset) {
                       wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
                       isFirstOffset = false;
                   }
                   String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
                   if (msgIdBuilder.length() > 0) {
                       msgIdBuilder.append(',').append(msgId);
                   } else {
                       msgIdBuilder.append(msgId);
                   }
                   msgNum++;
               }
   ```
   Or do you have a better solution?

##########
File path: store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
##########
@@ -507,7 +513,122 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
 
     @Override
     public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
-        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+        if (messageExtBatch.getDelayTimeLevel() > 0) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+
+        // Set the storage time
+        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+
+        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setStoreHostAddressV6Flag();
+        }
+
+        // Back to Results
+        AppendMessageResult appendResult;
+        BatchAppendFuture<AppendEntryResponse> dledgerFuture;
+        EncodeResult encodeResult;
+
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+        msgIdBuilder.setLength(0);
+        long elapsedTimeInLock;
+        long queueOffset;
+        long msgNum = 0;
+        try {
+            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+            encodeResult = this.messageSerializer.serialize(messageExtBatch);
+            queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+            if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
+                        .status));
+            }
+            BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+            request.setGroup(dLedgerConfig.getGroup());
+            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+            request.setBatchMsgs(encodeResult.batchData);
+            dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+            if (dledgerFuture.getPos() == -1) {
+                log.warn("[DEBUG_CTR] handleAppend return false due to error code {}", dledgerFuture.get().getCode());
+                return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
+            }
+            long wroteOffset = 0;
+
+            int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
+
+            for (long pos : dledgerFuture.getPositions()) {
+                wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
+                String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
+                if (msgIdBuilder.length() > 0) {
+                    msgIdBuilder.append(',').append(msgId);
+                } else {
+                    msgIdBuilder.append(msgId);
+                }
+                msgNum++;
+            }
+            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
+            appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.totalMsgLen,
+                    msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
+            DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + msgNum);

Review comment:
       When sending batch messages, worteOffset is the offset of the first message, but this is the offset of the last message, so the unit test fails.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org