You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2021/05/20 02:34:27 UTC
[rocketmq] 01/01: Revert "[ISSUE #2865] Batch message send bug fix
(#2866)"
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch revert-2866-batch_msg_bug_fix
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit d7f865ec22c6bd52cb8499824929a01f1a639029
Author: von gosling <vo...@apache.org>
AuthorDate: Thu May 20 10:34:14 2021 +0800
Revert "[ISSUE #2865] Batch message send bug fix (#2866)"
This reverts commit 16694485a7e111c601c0ec8c2a76a44cdd830067.
---
.../rocketmq/store/dledger/DLedgerCommitLog.java | 58 +++++++++++-----------
1 file changed, 29 insertions(+), 29 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 9241ffe..9a6e7a7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -426,18 +426,17 @@ public class DLedgerCommitLog extends CommitLog {
AppendFuture<AppendEntryResponse> dledgerFuture;
EncodeResult encodeResult;
- encodeResult = this.messageSerializer.serialize(msg);
- if (encodeResult.status != AppendMessageStatus.PUT_OK) {
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
- }
-
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
long elapsedTimeInLock;
long queueOffset;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+ encodeResult = this.messageSerializer.serialize(msg);
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
- encodeResult.setQueueOffsetKey(queueOffset, false);
+ encodeResult.setQueueOffsetKey(queueOffset);
+ if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
+ }
AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
@@ -543,12 +542,6 @@ public class DLedgerCommitLog extends CommitLog {
BatchAppendFuture<AppendEntryResponse> dledgerFuture;
EncodeResult encodeResult;
- encodeResult = this.messageSerializer.serialize(messageExtBatch);
- if (encodeResult.status != AppendMessageStatus.PUT_OK) {
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
- .status));
- }
-
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
msgIdBuilder.setLength(0);
long elapsedTimeInLock;
@@ -556,8 +549,12 @@ public class DLedgerCommitLog extends CommitLog {
long msgNum = 0;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
- queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
- encodeResult.setQueueOffsetKey(queueOffset, true);
+ encodeResult = this.messageSerializer.serialize(messageExtBatch);
+ queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+ if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
+ .status));
+ }
BatchAppendEntryRequest request = new BatchAppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
@@ -667,7 +664,7 @@ public class DLedgerCommitLog extends CommitLog {
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
- encodeResult.setQueueOffsetKey(queueOffset, false);
+ encodeResult.setQueueOffsetKey(queueOffset);
AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
@@ -782,8 +779,7 @@ public class DLedgerCommitLog extends CommitLog {
long msgNum = 0;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
- queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
- encodeResult.setQueueOffsetKey(queueOffset, true);
+ queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
BatchAppendEntryRequest request = new BatchAppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
@@ -961,15 +957,8 @@ public class DLedgerCommitLog extends CommitLog {
this.queueOffsetKey = queueOffsetKey;
}
- public void setQueueOffsetKey(long offset, boolean isBatch) {
- if (!isBatch) {
- this.data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset);
- return;
- }
-
- for (byte[] data : batchData) {
- ByteBuffer.wrap(data).putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset++);
- }
+ public void setQueueOffsetKey(long offset) {
+ data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset);
}
public byte[] getData() {
@@ -988,6 +977,8 @@ public class DLedgerCommitLog extends CommitLog {
// The maximum length of the message
private final int maxMessageSize;
+ // Build Message Key
+ private final StringBuilder keyBuilder = new StringBuilder();
MessageSerializer(final int size) {
this.maxMessageSize = size;
@@ -1088,7 +1079,17 @@ public class DLedgerCommitLog extends CommitLog {
}
public EncodeResult serialize(final MessageExtBatch messageExtBatch) {
- String key = messageExtBatch.getTopic() + "-" + messageExtBatch.getQueueId();
+ keyBuilder.setLength(0);
+ keyBuilder.append(messageExtBatch.getTopic());
+ keyBuilder.append('-');
+ keyBuilder.append(messageExtBatch.getQueueId());
+ String key = keyBuilder.toString();
+
+ Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key);
+ if (null == queueOffset) {
+ queueOffset = 0L;
+ DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset);
+ }
int totalMsgLen = 0;
ByteBuffer messagesByteBuff = messageExtBatch.wrap();
@@ -1153,7 +1154,7 @@ public class DLedgerCommitLog extends CommitLog {
// 5 FLAG
msgStoreItemMemory.putInt(flag);
// 6 QUEUEOFFSET
- msgStoreItemMemory.putLong(0L);
+ msgStoreItemMemory.putLong(queueOffset++);
// 7 PHYSICALOFFSET
msgStoreItemMemory.putLong(0);
// 8 SYSFLAG
@@ -1209,7 +1210,6 @@ public class DLedgerCommitLog extends CommitLog {
this.sbr = sbr;
}
- @Override
public synchronized void release() {
super.release();
if (sbr != null) {