You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/11/10 15:12:12 UTC

[GitHub] [rocketmq] TerrellChen commented on a change in pull request #2406: [ISSUE #690] Support batch msgs in dledger mode

TerrellChen commented on a change in pull request #2406:
URL: https://github.com/apache/rocketmq/pull/2406#discussion_r520638172



##########
File path: store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
##########
@@ -507,7 +513,122 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
 
     @Override
     public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
-        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+        if (messageExtBatch.getDelayTimeLevel() > 0) {
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+
+        // Set the storage time
+        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+
+        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setBornHostV6Flag();
+        }
+
+        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+            messageExtBatch.setStoreHostAddressV6Flag();
+        }
+
+        // Back to Results
+        AppendMessageResult appendResult;
+        BatchAppendFuture<AppendEntryResponse> dledgerFuture;
+        EncodeResult encodeResult;
+
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+        msgIdBuilder.setLength(0);
+        long elapsedTimeInLock;
+        long queueOffset;
+        long msgNum = 0;
+        try {
+            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+            encodeResult = this.messageSerializer.serialize(messageExtBatch);
+            queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+            if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
+                        .status));
+            }
+            BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+            request.setGroup(dLedgerConfig.getGroup());
+            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+            request.setBatchMsgs(encodeResult.batchData);
+            dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+            if (dledgerFuture.getPos() == -1) {
+                log.warn("[DEBUG_CTR] handleAppend return false due to error code {}", dledgerFuture.get().getCode());
+                return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
+            }
+            long wroteOffset = 0;
+
+            int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
+
+            for (long pos : dledgerFuture.getPositions()) {
+                wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
+                String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
+                if (msgIdBuilder.length() > 0) {
+                    msgIdBuilder.append(',').append(msgId);
+                } else {
+                    msgIdBuilder.append(msgId);
+                }
+                msgNum++;
+            }
+            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
+            appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.totalMsgLen,
+                    msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
+            DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + msgNum);

Review comment:
       Good job!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org