You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2021/07/28 06:50:05 UTC
[rocketmq] branch develop updated: [ISSUE #2883] [Part E] Improve
produce performance in M/S mode. (#2889)
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0325772 [ISSUE #2883] [Part E] Improve produce performance in M/S mode. (#2889)
0325772 is described below
commit 032577255ecd543908446aa272668c9f882bcce5
Author: huangli <ar...@gmail.com>
AuthorDate: Wed Jul 28 14:49:45 2021 +0800
[ISSUE #2883] [Part E] Improve produce performance in M/S mode. (#2889)
* Remove putMessage/putMessages method in CommitLog which has too many duplicated code.
* Optimise performance of asyncPutMessage (extract some code out of putMessage lock)
* extract generation of msgId out of lock in CommitLog (now only for single message processor)
* extract generation of topicQueueTable key out of sync code
* extract generation of msgId out of lock in CommitLog (for batch)
* fix ipv6 problem introduced in commit "Optimise performance of asyncPutMessage (extract some code out of putMessage lock)"
---
.../rocketmq/store/AppendMessageCallback.java | 5 +-
.../apache/rocketmq/store/AppendMessageResult.java | 17 +
.../java/org/apache/rocketmq/store/CommitLog.java | 760 ++++++++-------------
.../apache/rocketmq/store/DefaultMessageStore.java | 55 +-
.../java/org/apache/rocketmq/store/MappedFile.java | 20 +-
.../rocketmq/store/MessageExtBrokerInner.java | 12 +
.../rocketmq/store/dledger/DLedgerCommitLog.java | 232 -------
.../apache/rocketmq/store/AppendCallbackTest.java | 28 +-
8 files changed, 361 insertions(+), 768 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
index d337638..5499c90 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.store;
import java.nio.ByteBuffer;
import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.store.CommitLog.PutMessageContext;
/**
* Write messages callback interface
@@ -30,7 +31,7 @@ public interface AppendMessageCallback {
* @return How many bytes to write
*/
AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
- final int maxBlank, final MessageExtBrokerInner msg);
+ final int maxBlank, final MessageExtBrokerInner msg, PutMessageContext putMessageContext);
/**
* After batched message serialization, write MapedByteBuffer
@@ -39,5 +40,5 @@ public interface AppendMessageCallback {
* @return How many bytes to write
*/
AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
- final int maxBlank, final MessageExtBatch messageExtBatch);
+ final int maxBlank, final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext);
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
index d6d1aa6..de3c03b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.store;
+import java.util.function.Supplier;
+
/**
* When write a message to the commit log, returns results
*/
@@ -28,6 +30,7 @@ public class AppendMessageResult {
private int wroteBytes;
// Message ID
private String msgId;
+ private Supplier<String> msgIdSupplier;
// Message storage timestamp
private long storeTimestamp;
// Consume queue's offset(step by one)
@@ -51,6 +54,17 @@ public class AppendMessageResult {
this.pagecacheRT = pagecacheRT;
}
+ public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, Supplier<String> msgIdSupplier,
+ long storeTimestamp, long logicsOffset, long pagecacheRT) {
+ this.status = status;
+ this.wroteOffset = wroteOffset;
+ this.wroteBytes = wroteBytes;
+ this.msgIdSupplier = msgIdSupplier;
+ this.storeTimestamp = storeTimestamp;
+ this.logicsOffset = logicsOffset;
+ this.pagecacheRT = pagecacheRT;
+ }
+
public long getPagecacheRT() {
return pagecacheRT;
}
@@ -88,6 +102,9 @@ public class AppendMessageResult {
}
public String getMsgId() {
+ if (msgId == null && msgIdSupplier != null) {
+ msgId = msgIdSupplier.get();
+ }
return msgId;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 57fa363..5e92654 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -16,17 +16,18 @@
*/
package org.apache.rocketmq.store;
+import java.net.Inet4Address;
import java.net.Inet6Address;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
@@ -62,7 +63,7 @@ public class CommitLog {
private final FlushCommitLogService commitLogService;
private final AppendMessageCallback appendMessageCallback;
- private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
+ private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
protected volatile long confirmOffset = -1L;
@@ -84,10 +85,10 @@ public class CommitLog {
this.commitLogService = new CommitRealTimeService();
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
- batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
+ putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
@Override
- protected MessageExtBatchEncoder initialValue() {
- return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
+ protected PutMessageThreadLocal initialValue() {
+ return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
@@ -555,6 +556,14 @@ public class CommitLog {
return beginTimeInLock;
}
+ private String generateKey(StringBuilder keyBuilder, MessageExt messageExt) {
+ keyBuilder.setLength(0);
+ keyBuilder.append(messageExt.getTopic());
+ keyBuilder.append('-');
+ keyBuilder.append(messageExt.getQueueId());
+ return keyBuilder.toString();
+ }
+
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
@@ -591,12 +600,30 @@ public class CommitLog {
}
}
+ InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
+ if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+ msg.setBornHostV6Flag();
+ }
+
+ InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
+ if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+ msg.setStoreHostAddressV6Flag();
+ }
+
+ PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
+ PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
+ if (encodeResult != null) {
+ return CompletableFuture.completedFuture(encodeResult);
+ }
+ msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
+ PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
+
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
- MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
+ MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
@@ -613,7 +640,7 @@ public class CommitLog {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
- result = mappedFile.appendMessage(msg, this.appendMessageCallback);
+ result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
switch (result.getStatus()) {
case PUT_OK:
break;
@@ -627,7 +654,7 @@ public class CommitLog {
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
- result = mappedFile.appendMessage(msg, this.appendMessageCallback);
+ result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
@@ -693,14 +720,26 @@ public class CommitLog {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}
+ InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+ if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+ messageExtBatch.setBornHostV6Flag();
+ }
+
+ InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+ if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+ messageExtBatch.setStoreHostAddressV6Flag();
+ }
+
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//fine-grained lock instead of the coarse-grained
- MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();
+ PutMessageThreadLocal pmThreadLocal = this.putMessageThreadLocal.get();
+ MessageExtEncoder batchEncoder = pmThreadLocal.getEncoder();
- messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+ PutMessageContext putMessageContext = new PutMessageContext(generateKey(pmThreadLocal.getKeyBuilder(), messageExtBatch));
+ messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
putMessageLock.lock();
try {
@@ -720,7 +759,7 @@ public class CommitLog {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
- result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
+ result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
switch (result.getStatus()) {
case PUT_OK:
break;
@@ -734,7 +773,7 @@ public class CommitLog {
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
- result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
+ result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
@@ -784,129 +823,6 @@ public class CommitLog {
}
- public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
- // Set the storage time
- msg.setStoreTimestamp(System.currentTimeMillis());
- // Set the message body BODY CRC (consider the most appropriate setting
- // on the client)
- msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
- // Back to Results
- AppendMessageResult result = null;
-
- StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
-
- String topic = msg.getTopic();
- int queueId = msg.getQueueId();
-
- final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
- if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
- || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
- // Delay Delivery
- if (msg.getDelayTimeLevel() > 0) {
- if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
- msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
- }
-
- topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
- queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
-
- // Backup real topic, queueId
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
- msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
-
- msg.setTopic(topic);
- msg.setQueueId(queueId);
- }
- }
-
- InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
- if (bornSocketAddress.getAddress() instanceof Inet6Address) {
- msg.setBornHostV6Flag();
- }
-
- InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
- if (storeSocketAddress.getAddress() instanceof Inet6Address) {
- msg.setStoreHostAddressV6Flag();
- }
-
- long elapsedTimeInLock = 0;
-
- MappedFile unlockMappedFile = null;
- MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
-
- putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
- try {
- long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
- this.beginTimeInLock = beginLockTimestamp;
-
- // Here settings are stored timestamp, in order to ensure an orderly
- // global
- msg.setStoreTimestamp(beginLockTimestamp);
-
- if (null == mappedFile || mappedFile.isFull()) {
- mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
- }
- if (null == mappedFile) {
- log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
- beginTimeInLock = 0;
- return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
- }
-
- result = mappedFile.appendMessage(msg, this.appendMessageCallback);
- switch (result.getStatus()) {
- case PUT_OK:
- break;
- case END_OF_FILE:
- unlockMappedFile = mappedFile;
- // Create a new file, re-write the message
- mappedFile = this.mappedFileQueue.getLastMappedFile(0);
- if (null == mappedFile) {
- // XXX: warn and notify me
- log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
- beginTimeInLock = 0;
- return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
- }
- result = mappedFile.appendMessage(msg, this.appendMessageCallback);
- break;
- case MESSAGE_SIZE_EXCEEDED:
- case PROPERTIES_SIZE_EXCEEDED:
- beginTimeInLock = 0;
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
- case UNKNOWN_ERROR:
- beginTimeInLock = 0;
- return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
- default:
- beginTimeInLock = 0;
- return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
- }
-
- elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
- beginTimeInLock = 0;
- } finally {
- putMessageLock.unlock();
- }
-
- if (elapsedTimeInLock > 500) {
- log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
- }
-
- if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
- this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
- }
-
- PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
-
- // Statistics
- storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
- storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
-
- handleDiskFlush(result, putMessageResult, msg);
- handleHA(result, putMessageResult, msg);
-
- return putMessageResult;
- }
-
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
@@ -951,179 +867,6 @@ public class CommitLog {
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
-
- public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
- // Synchronization flush
- if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
- final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
- if (messageExt.isWaitStoreMsgOK()) {
- GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
- service.putRequest(request);
- CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
- PutMessageStatus flushStatus = null;
- try {
- flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
- TimeUnit.MILLISECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- //flushOK=false;
- }
- if (flushStatus != PutMessageStatus.PUT_OK) {
- log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
- + " client address: " + messageExt.getBornHostString());
- putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
- }
- } else {
- service.wakeup();
- }
- }
- // Asynchronous flush
- else {
- if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
- flushCommitLogService.wakeup();
- } else {
- commitLogService.wakeup();
- }
- }
- }
-
- public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
- if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
- HAService service = this.defaultMessageStore.getHaService();
- if (messageExt.isWaitStoreMsgOK()) {
- // Determine whether to wait
- if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
- GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
- service.putRequest(request);
- service.getWaitNotifyObject().wakeupAll();
- PutMessageStatus replicaStatus = null;
- try {
- replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
- TimeUnit.MILLISECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- }
- if (replicaStatus != PutMessageStatus.PUT_OK) {
- log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
- + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
- putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
- }
- }
- // Slave problem
- else {
- // Tell the producer, slave not available
- putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
- }
- }
- }
-
- }
-
- public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
- messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
- AppendMessageResult result;
-
- StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
-
- final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
-
- if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
- }
- if (messageExtBatch.getDelayTimeLevel() > 0) {
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
- }
-
- InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
- if (bornSocketAddress.getAddress() instanceof Inet6Address) {
- messageExtBatch.setBornHostV6Flag();
- }
-
- InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
- if (storeSocketAddress.getAddress() instanceof Inet6Address) {
- messageExtBatch.setStoreHostAddressV6Flag();
- }
-
- long elapsedTimeInLock = 0;
- MappedFile unlockMappedFile = null;
- MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
-
- //fine-grained lock instead of the coarse-grained
- MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();
-
- messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
-
- putMessageLock.lock();
- try {
- long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
- this.beginTimeInLock = beginLockTimestamp;
-
- // Here settings are stored timestamp, in order to ensure an orderly
- // global
- messageExtBatch.setStoreTimestamp(beginLockTimestamp);
-
- if (null == mappedFile || mappedFile.isFull()) {
- mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
- }
- if (null == mappedFile) {
- log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
- beginTimeInLock = 0;
- return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
- }
-
- result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
- switch (result.getStatus()) {
- case PUT_OK:
- break;
- case END_OF_FILE:
- unlockMappedFile = mappedFile;
- // Create a new file, re-write the message
- mappedFile = this.mappedFileQueue.getLastMappedFile(0);
- if (null == mappedFile) {
- // XXX: warn and notify me
- log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
- beginTimeInLock = 0;
- return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
- }
- result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
- break;
- case MESSAGE_SIZE_EXCEEDED:
- case PROPERTIES_SIZE_EXCEEDED:
- beginTimeInLock = 0;
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
- case UNKNOWN_ERROR:
- beginTimeInLock = 0;
- return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
- default:
- beginTimeInLock = 0;
- return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
- }
-
- elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
- beginTimeInLock = 0;
- } finally {
- putMessageLock.unlock();
- }
-
- if (elapsedTimeInLock > 500) {
- log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result);
- }
-
- if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
- this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
- }
-
- PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
-
- // Statistics
- storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
- storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
-
- handleDiskFlush(result, putMessageResult, messageExtBatch);
-
- handleHA(result, putMessageResult, messageExtBatch);
-
- return putMessageResult;
- }
-
/**
* According to receive certain message or offset storage time if an error occurs, it returns -1
*/
@@ -1509,50 +1252,33 @@ public class CommitLog {
private final ByteBuffer msgStoreItemMemory;
// The maximum length of the message
private final int maxMessageSize;
- // Build Message Key
- private final StringBuilder keyBuilder = new StringBuilder();
-
- private final StringBuilder msgIdBuilder = new StringBuilder();
DefaultAppendMessageCallback(final int size) {
this.msgIdMemory = ByteBuffer.allocate(4 + 4 + 8);
this.msgIdV6Memory = ByteBuffer.allocate(16 + 4 + 8);
- this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH);
+ this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
this.maxMessageSize = size;
}
- public ByteBuffer getMsgStoreItemMemory() {
- return msgStoreItemMemory;
- }
-
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
- final MessageExtBrokerInner msgInner) {
+ final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
long wroteOffset = fileFromOffset + byteBuffer.position();
- int sysflag = msgInner.getSysFlag();
-
- int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
- int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
- ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
- ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
-
- this.resetByteBuffer(storeHostHolder, storeHostLength);
- String msgId;
- if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
- msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
- } else {
- msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
- }
+ Supplier<String> msgIdSupplier = () -> {
+ int sysflag = msgInner.getSysFlag();
+ int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+ ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
+ MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
+ msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
+ msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
+ return UtilAll.bytes2string(msgIdBuffer.array());
+ };
// Record ConsumeQueue information
- keyBuilder.setLength(0);
- keyBuilder.append(msgInner.getTopic());
- keyBuilder.append('-');
- keyBuilder.append(msgInner.getQueueId());
- String key = keyBuilder.toString();
+ String key = putMessageContext.getTopicQueueTableKey();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
@@ -1574,36 +1300,12 @@ public class CommitLog {
break;
}
- /**
- * Serialize message
- */
- final byte[] propertiesData =
- msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
-
- final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
-
- if (propertiesLength > Short.MAX_VALUE) {
- log.warn("putMessage message properties length too long. length={}", propertiesData.length);
- return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
- }
-
- final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
- final int topicLength = topicData.length;
-
- final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
-
- final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
-
- // Exceeds the maximum message
- if (msgLen > this.maxMessageSize) {
- CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
- + ", maxMessageSize: " + this.maxMessageSize);
- return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
- }
+ ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
+ final int msgLen = preEncodeBuffer.getInt(0);
// Determines whether there is sufficient free space
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
- this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
+ this.msgStoreItemMemory.clear();
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
@@ -1611,60 +1313,31 @@ public class CommitLog {
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
- byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
- return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
- queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+ byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
+ return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
+ maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
+ msgIdSupplier, msgInner.getStoreTimestamp(),
+ queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
- // Initialization of storage space
- this.resetByteBuffer(msgStoreItemMemory, msgLen);
- // 1 TOTALSIZE
- this.msgStoreItemMemory.putInt(msgLen);
- // 2 MAGICCODE
- this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
- // 3 BODYCRC
- this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
- // 4 QUEUEID
- this.msgStoreItemMemory.putInt(msgInner.getQueueId());
- // 5 FLAG
- this.msgStoreItemMemory.putInt(msgInner.getFlag());
+ int pos = 4 + 4 + 4 + 4 + 4;
// 6 QUEUEOFFSET
- this.msgStoreItemMemory.putLong(queueOffset);
+ preEncodeBuffer.putLong(pos, queueOffset);
+ pos += 8;
// 7 PHYSICALOFFSET
- this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
- // 8 SYSFLAG
- this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
- // 9 BORNTIMESTAMP
- this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
- // 10 BORNHOST
- this.resetByteBuffer(bornHostHolder, bornHostLength);
- this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
- // 11 STORETIMESTAMP
- this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
- // 12 STOREHOSTADDRESS
- this.resetByteBuffer(storeHostHolder, storeHostLength);
- this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
- // 13 RECONSUMETIMES
- this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
- // 14 Prepared Transaction Offset
- this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
- // 15 BODY
- this.msgStoreItemMemory.putInt(bodyLength);
- if (bodyLength > 0)
- this.msgStoreItemMemory.put(msgInner.getBody());
- // 16 TOPIC
- this.msgStoreItemMemory.put((byte) topicLength);
- this.msgStoreItemMemory.put(topicData);
- // 17 PROPERTIES
- this.msgStoreItemMemory.putShort((short) propertiesLength);
- if (propertiesLength > 0)
- this.msgStoreItemMemory.put(propertiesData);
+ preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
+ int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
+ // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
+ pos += 8 + 4 + 8 + ipLen;
+ // refresh store time stamp in lock
+ preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
+
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
- byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
-
- AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
+ byteBuffer.put(preEncodeBuffer);
+ msgInner.setEncodedBuff(null);
+ AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) {
@@ -1683,16 +1356,12 @@ public class CommitLog {
}
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
- final MessageExtBatch messageExtBatch) {
+ final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
byteBuffer.mark();
//physical offset
long wroteOffset = fileFromOffset + byteBuffer.position();
// Record ConsumeQueue information
- keyBuilder.setLength(0);
- keyBuilder.append(messageExtBatch.getTopic());
- keyBuilder.append('-');
- keyBuilder.append(messageExtBatch.getQueueId());
- String key = keyBuilder.toString();
+ String key = putMessageContext.getTopicQueueTableKey();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
@@ -1701,17 +1370,35 @@ public class CommitLog {
long beginQueueOffset = queueOffset;
int totalMsgLen = 0;
int msgNum = 0;
- msgIdBuilder.setLength(0);
+
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();
int sysFlag = messageExtBatch.getSysFlag();
+ int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
- ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
+ Supplier<String> msgIdSupplier = () -> {
+ int msgIdLen = storeHostLength + 8;
+ int batchCount = putMessageContext.getBatchSize();
+ long[] phyPosArray = putMessageContext.getPhyPos();
+ ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
+ MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(), msgIdBuffer);
+ msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
+
+ StringBuilder buffer = new StringBuilder(batchCount * msgIdLen * 2 + batchCount - 1);
+ for (int i = 0; i < phyPosArray.length; i++) {
+ msgIdBuffer.putLong(msgIdLen - 8, phyPosArray[i]);
+ String msgId = UtilAll.bytes2string(msgIdBuffer.array());
+ if (i != 0) {
+ buffer.append(',');
+ }
+ buffer.append(msgId);
+ }
+ return buffer.toString();
+ };
- this.resetByteBuffer(storeHostHolder, storeHostLength);
- ByteBuffer storeHostBytes = messageExtBatch.getStoreHostBytes(storeHostHolder);
messagesByteBuff.mark();
+ int index = 0;
while (messagesByteBuff.hasRemaining()) {
// 1 TOTALSIZE
final int msgPos = messagesByteBuff.position();
@@ -1726,7 +1413,7 @@ public class CommitLog {
totalMsgLen += msgLen;
// Determines whether there is sufficient free space
if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
- this.resetByteBuffer(this.msgStoreItemMemory, 8);
+ this.msgStoreItemMemory.clear();
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
@@ -1737,27 +1424,20 @@ public class CommitLog {
// Here the length of the specially set maxBlank
byteBuffer.reset(); //ignore the previous appended messages
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
- return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(),
+ return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, messageExtBatch.getStoreTimestamp(),
beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
//move to add queue offset and commitlog offset
- messagesByteBuff.position(msgPos + 20);
- messagesByteBuff.putLong(queueOffset);
- messagesByteBuff.putLong(wroteOffset + totalMsgLen - msgLen);
-
- storeHostBytes.rewind();
- String msgId;
- if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
- msgId = MessageDecoder.createMessageId(this.msgIdMemory, storeHostBytes, wroteOffset + totalMsgLen - msgLen);
- } else {
- msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, storeHostBytes, wroteOffset + totalMsgLen - msgLen);
- }
-
- if (msgIdBuilder.length() > 0) {
- msgIdBuilder.append(',').append(msgId);
- } else {
- msgIdBuilder.append(msgId);
- }
+ int pos = msgPos + 20;
+ messagesByteBuff.putLong(pos, queueOffset);
+ pos += 8;
+ messagesByteBuff.putLong(pos, wroteOffset + totalMsgLen - msgLen);
+ // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
+ pos += 8 + 4 + 8 + bornHostLength;
+ // refresh store time stamp in lock
+ messagesByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp());
+
+ putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen;
queueOffset++;
msgNum++;
messagesByteBuff.position(msgPos + msgLen);
@@ -1767,7 +1447,7 @@ public class CommitLog {
messagesByteBuff.limit(totalMsgLen);
byteBuffer.put(messagesByteBuff);
messageExtBatch.setEncodedBuff(null);
- AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdBuilder.toString(),
+ AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdSupplier,
messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
result.setMsgNum(msgNum);
CommitLog.this.topicQueueTable.put(key, queueOffset);
@@ -1782,19 +1462,104 @@ public class CommitLog {
}
- public static class MessageExtBatchEncoder {
+ public static class MessageExtEncoder {
// Store the message content
- private final ByteBuffer msgBatchMemory;
+ private final ByteBuffer encoderBuffer;
// The maximum length of the message
private final int maxMessageSize;
- MessageExtBatchEncoder(final int size) {
- this.msgBatchMemory = ByteBuffer.allocateDirect(size);
+ MessageExtEncoder(final int size) {
+ this.encoderBuffer = ByteBuffer.allocateDirect(size);
this.maxMessageSize = size;
}
- public ByteBuffer encode(final MessageExtBatch messageExtBatch) {
- msgBatchMemory.clear(); //not thread-safe
+ private void socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
+ InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
+ InetAddress address = inetSocketAddress.getAddress();
+ if (address instanceof Inet4Address) {
+ byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
+ } else {
+ byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 16);
+ }
+ byteBuffer.putInt(inetSocketAddress.getPort());
+ }
+
+ protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
+ /**
+ * Serialize message
+ */
+ final byte[] propertiesData =
+ msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
+
+ final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
+
+ if (propertiesLength > Short.MAX_VALUE) {
+ log.warn("putMessage message properties length too long. length={}", propertiesData.length);
+ return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
+ }
+
+ final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+ final int topicLength = topicData.length;
+
+ final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
+
+ final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
+
+ // Exceeds the maximum message
+ if (msgLen > this.maxMessageSize) {
+ CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ + ", maxMessageSize: " + this.maxMessageSize);
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+ }
+
+ // Initialization of storage space
+ this.resetByteBuffer(encoderBuffer, msgLen);
+ // 1 TOTALSIZE
+ this.encoderBuffer.putInt(msgLen);
+ // 2 MAGICCODE
+ this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
+ // 3 BODYCRC
+ this.encoderBuffer.putInt(msgInner.getBodyCRC());
+ // 4 QUEUEID
+ this.encoderBuffer.putInt(msgInner.getQueueId());
+ // 5 FLAG
+ this.encoderBuffer.putInt(msgInner.getFlag());
+ // 6 QUEUEOFFSET, need update later
+ this.encoderBuffer.putLong(0);
+ // 7 PHYSICALOFFSET, need update later
+ this.encoderBuffer.putLong(0);
+ // 8 SYSFLAG
+ this.encoderBuffer.putInt(msgInner.getSysFlag());
+ // 9 BORNTIMESTAMP
+ this.encoderBuffer.putLong(msgInner.getBornTimestamp());
+ // 10 BORNHOST
+ socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);
+ // 11 STORETIMESTAMP
+ this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
+ // 12 STOREHOSTADDRESS
+ socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);
+ // 13 RECONSUMETIMES
+ this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
+ // 14 Prepared Transaction Offset
+ this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());
+ // 15 BODY
+ this.encoderBuffer.putInt(bodyLength);
+ if (bodyLength > 0)
+ this.encoderBuffer.put(msgInner.getBody());
+ // 16 TOPIC
+ this.encoderBuffer.put((byte) topicLength);
+ this.encoderBuffer.put(topicData);
+ // 17 PROPERTIES
+ this.encoderBuffer.putShort((short) propertiesLength);
+ if (propertiesLength > 0)
+ this.encoderBuffer.put(propertiesData);
+
+ encoderBuffer.flip();
+ return null;
+ }
+
+ protected ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
+ encoderBuffer.clear(); //not thread-safe
int totalMsgLen = 0;
ByteBuffer messagesByteBuff = messageExtBatch.wrap();
@@ -1809,7 +1574,9 @@ public class CommitLog {
final byte[] batchPropData = batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8);
final short batchPropLen = (short) batchPropData.length;
+ int batchSize = 0;
while (messagesByteBuff.hasRemaining()) {
+ batchSize++;
// 1 TOTALSIZE
messagesByteBuff.getInt();
// 2 MAGICCODE
@@ -1849,53 +1616,55 @@ public class CommitLog {
}
// 1 TOTALSIZE
- this.msgBatchMemory.putInt(msgLen);
+ this.encoderBuffer.putInt(msgLen);
// 2 MAGICCODE
- this.msgBatchMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
+ this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
- this.msgBatchMemory.putInt(bodyCrc);
+ this.encoderBuffer.putInt(bodyCrc);
// 4 QUEUEID
- this.msgBatchMemory.putInt(messageExtBatch.getQueueId());
+ this.encoderBuffer.putInt(messageExtBatch.getQueueId());
// 5 FLAG
- this.msgBatchMemory.putInt(flag);
+ this.encoderBuffer.putInt(flag);
// 6 QUEUEOFFSET
- this.msgBatchMemory.putLong(0);
+ this.encoderBuffer.putLong(0);
// 7 PHYSICALOFFSET
- this.msgBatchMemory.putLong(0);
+ this.encoderBuffer.putLong(0);
// 8 SYSFLAG
- this.msgBatchMemory.putInt(messageExtBatch.getSysFlag());
+ this.encoderBuffer.putInt(messageExtBatch.getSysFlag());
// 9 BORNTIMESTAMP
- this.msgBatchMemory.putLong(messageExtBatch.getBornTimestamp());
+ this.encoderBuffer.putLong(messageExtBatch.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
- this.msgBatchMemory.put(messageExtBatch.getBornHostBytes(bornHostHolder));
+ this.encoderBuffer.put(messageExtBatch.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
- this.msgBatchMemory.putLong(messageExtBatch.getStoreTimestamp());
+ this.encoderBuffer.putLong(messageExtBatch.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
- this.msgBatchMemory.put(messageExtBatch.getStoreHostBytes(storeHostHolder));
+ this.encoderBuffer.put(messageExtBatch.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
- this.msgBatchMemory.putInt(messageExtBatch.getReconsumeTimes());
+ this.encoderBuffer.putInt(messageExtBatch.getReconsumeTimes());
// 14 Prepared Transaction Offset, batch does not support transaction
- this.msgBatchMemory.putLong(0);
+ this.encoderBuffer.putLong(0);
// 15 BODY
- this.msgBatchMemory.putInt(bodyLen);
+ this.encoderBuffer.putInt(bodyLen);
if (bodyLen > 0)
- this.msgBatchMemory.put(messagesByteBuff.array(), bodyPos, bodyLen);
+ this.encoderBuffer.put(messagesByteBuff.array(), bodyPos, bodyLen);
// 16 TOPIC
- this.msgBatchMemory.put((byte) topicLength);
- this.msgBatchMemory.put(topicData);
+ this.encoderBuffer.put((byte) topicLength);
+ this.encoderBuffer.put(topicData);
// 17 PROPERTIES
- this.msgBatchMemory.putShort((short) (propertiesLen + batchPropLen));
+ this.encoderBuffer.putShort((short) (propertiesLen + batchPropLen));
if (propertiesLen > 0) {
- this.msgBatchMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
+ this.encoderBuffer.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
}
if (batchPropLen > 0) {
- this.msgBatchMemory.put(batchPropData, 0, batchPropLen);
+ this.encoderBuffer.put(batchPropData, 0, batchPropLen);
}
}
- msgBatchMemory.flip();
- return msgBatchMemory;
+ putMessageContext.setBatchSize(batchSize);
+ putMessageContext.setPhyPos(new long[batchSize]);
+ encoderBuffer.flip();
+ return encoderBuffer;
}
private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
@@ -1904,4 +1673,51 @@ public class CommitLog {
}
}
+
+ static class PutMessageThreadLocal {
+ private MessageExtEncoder encoder;
+ private StringBuilder keyBuilder;
+ PutMessageThreadLocal(int size) {
+ encoder = new MessageExtEncoder(size);
+ keyBuilder = new StringBuilder();
+ }
+
+ public MessageExtEncoder getEncoder() {
+ return encoder;
+ }
+
+ public StringBuilder getKeyBuilder() {
+ return keyBuilder;
+ }
+ }
+
+ static class PutMessageContext {
+ private String topicQueueTableKey;
+ private long[] phyPos;
+ private int batchSize;
+
+ public PutMessageContext(String topicQueueTableKey) {
+ this.topicQueueTableKey = topicQueueTableKey;
+ }
+
+ public String getTopicQueueTableKey() {
+ return topicQueueTableKey;
+ }
+
+ public long[] getPhyPos() {
+ return phyPos;
+ }
+
+ public void setPhyPos(long[] phyPos) {
+ this.phyPos = phyPos;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7dd5a32..69019c1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -34,6 +34,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -476,58 +477,20 @@ public class DefaultMessageStore implements MessageStore {
@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
- PutMessageStatus checkStoreStatus = this.checkStoreStatus();
- if (checkStoreStatus != PutMessageStatus.PUT_OK) {
- return new PutMessageResult(checkStoreStatus, null);
- }
-
- PutMessageStatus msgCheckStatus = this.checkMessage(msg);
- if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
- return new PutMessageResult(msgCheckStatus, null);
- }
-
- long beginTime = this.getSystemClock().now();
- PutMessageResult result = this.commitLog.putMessage(msg);
- long elapsedTime = this.getSystemClock().now() - beginTime;
- if (elapsedTime > 500) {
- log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
- }
-
- this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
-
- if (null == result || !result.isOk()) {
- this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
+ try {
+ return asyncPutMessage(msg).get();
+ } catch (InterruptedException | ExecutionException e) {
+ return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
}
-
- return result;
}
@Override
public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
- PutMessageStatus checkStoreStatus = this.checkStoreStatus();
- if (checkStoreStatus != PutMessageStatus.PUT_OK) {
- return new PutMessageResult(checkStoreStatus, null);
- }
-
- PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
- if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
- return new PutMessageResult(msgCheckStatus, null);
- }
-
- long beginTime = this.getSystemClock().now();
- PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
- long elapsedTime = this.getSystemClock().now() - beginTime;
- if (elapsedTime > 500) {
- log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
- }
-
- this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
-
- if (null == result || !result.isOk()) {
- this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
+ try {
+ return asyncPutMessages(messageExtBatch).get();
+ } catch (InterruptedException | ExecutionException e) {
+ return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
}
-
- return result;
}
@Override
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index 25f0e39..297271d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.store.CommitLog.PutMessageContext;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.util.LibC;
import sun.nio.ch.DirectBuffer;
@@ -188,15 +189,18 @@ public class MappedFile extends ReferenceResource {
return fileChannel;
}
- public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
- return appendMessagesInner(msg, cb);
+ public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,
+ PutMessageContext putMessageContext) {
+ return appendMessagesInner(msg, cb, putMessageContext);
}
- public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) {
- return appendMessagesInner(messageExtBatch, cb);
+ public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb,
+ PutMessageContext putMessageContext) {
+ return appendMessagesInner(messageExtBatch, cb, putMessageContext);
}
- public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
+ public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
+ PutMessageContext putMessageContext) {
assert messageExt != null;
assert cb != null;
@@ -207,9 +211,11 @@ public class MappedFile extends ReferenceResource {
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) {
- result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
+ result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
+ (MessageExtBrokerInner) messageExt, putMessageContext);
} else if (messageExt instanceof MessageExtBatch) {
- result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
+ result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
+ (MessageExtBatch) messageExt, putMessageContext);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
index e5f087b..df7e6e5 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.store;
+import java.nio.ByteBuffer;
+
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.message.MessageExt;
@@ -24,6 +26,16 @@ public class MessageExtBrokerInner extends MessageExt {
private String propertiesString;
private long tagsCode;
+ private ByteBuffer encodedBuff;
+
+ public ByteBuffer getEncodedBuff() {
+ return encodedBuff;
+ }
+
+ public void setEncodedBuff(ByteBuffer encodedBuff) {
+ this.encodedBuff = encodedBuff;
+ }
+
public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
if (null == tags || tags.length() == 0) { return 0; }
diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index ea791bd..011cbe1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -37,7 +37,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
@@ -414,237 +413,6 @@ public class DLedgerCommitLog extends CommitLog {
}
@Override
- public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
-
- StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
- final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
- String topic = msg.getTopic();
- setMessageInfo(msg,tranType);
-
- // Back to Results
- AppendMessageResult appendResult;
- AppendFuture<AppendEntryResponse> dledgerFuture;
- EncodeResult encodeResult;
-
- encodeResult = this.messageSerializer.serialize(msg);
- if (encodeResult.status != AppendMessageStatus.PUT_OK) {
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
- }
-
- putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
- long elapsedTimeInLock;
- long queueOffset;
- try {
- beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
- queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
- encodeResult.setQueueOffsetKey(queueOffset, false);
- AppendEntryRequest request = new AppendEntryRequest();
- request.setGroup(dLedgerConfig.getGroup());
- request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
- request.setBody(encodeResult.getData());
- dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
- if (dledgerFuture.getPos() == -1) {
- return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
- }
- long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;
-
- int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
- ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
-
- String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
- elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
- appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
- switch (tranType) {
- case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
- case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
- break;
- case MessageSysFlag.TRANSACTION_NOT_TYPE:
- case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
- // The next update ConsumeQueue information
- DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1);
- break;
- default:
- break;
- }
- } catch (Exception e) {
- log.error("Put message error", e);
- return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
- } finally {
- beginTimeInDledgerLock = 0;
- putMessageLock.unlock();
- }
-
- if (elapsedTimeInLock > 500) {
- log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult);
- }
-
- PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
- try {
- AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS);
- switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
- case SUCCESS:
- putMessageStatus = PutMessageStatus.PUT_OK;
- break;
- case INCONSISTENT_LEADER:
- case NOT_LEADER:
- case LEADER_NOT_READY:
- case DISK_FULL:
- putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
- break;
- case WAIT_QUORUM_ACK_TIMEOUT:
- //Do not return flush_slave_timeout to the client, for the ons client will ignore it.
- putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
- break;
- case LEADER_PENDING_FULL:
- putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
- break;
- }
- } catch (Throwable t) {
- log.error("Failed to get dledger append result", t);
- }
-
- PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
- if (putMessageStatus == PutMessageStatus.PUT_OK) {
- // Statistics
- storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
- storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(appendResult.getWroteBytes());
- }
- return putMessageResult;
- }
-
- @Override
- public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
- final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
-
- if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
- }
- if (messageExtBatch.getDelayTimeLevel() > 0) {
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
- }
-
- // Set the storage time
- messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
-
- StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
-
- InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
- if (bornSocketAddress.getAddress() instanceof Inet6Address) {
- messageExtBatch.setBornHostV6Flag();
- }
-
- InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
- if (storeSocketAddress.getAddress() instanceof Inet6Address) {
- messageExtBatch.setStoreHostAddressV6Flag();
- }
-
- // Back to Results
- AppendMessageResult appendResult;
- BatchAppendFuture<AppendEntryResponse> dledgerFuture;
- EncodeResult encodeResult;
-
- encodeResult = this.messageSerializer.serialize(messageExtBatch);
- if (encodeResult.status != AppendMessageStatus.PUT_OK) {
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
- .status));
- }
-
- putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
- msgIdBuilder.setLength(0);
- long elapsedTimeInLock;
- long queueOffset;
- int msgNum = 0;
- try {
- beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
- queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
- encodeResult.setQueueOffsetKey(queueOffset, true);
- BatchAppendEntryRequest request = new BatchAppendEntryRequest();
- request.setGroup(dLedgerConfig.getGroup());
- request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
- request.setBatchMsgs(encodeResult.batchData);
- AppendFuture<AppendEntryResponse> appendFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
- if (appendFuture.getPos() == -1) {
- log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode());
- return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
- }
- dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) appendFuture;
-
- long wroteOffset = 0;
-
- int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
- ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
-
- boolean isFirstOffset = true;
- long firstWroteOffset = 0;
- for (long pos : dledgerFuture.getPositions()) {
- wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
- if (isFirstOffset) {
- firstWroteOffset = wroteOffset;
- isFirstOffset = false;
- }
- String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
- if (msgIdBuilder.length() > 0) {
- msgIdBuilder.append(',').append(msgId);
- } else {
- msgIdBuilder.append(msgId);
- }
- msgNum++;
- }
-
- elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
- appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen,
- msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
- appendResult.setMsgNum(msgNum);
- DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + msgNum);
- } catch (Exception e) {
- log.error("Put message error", e);
- return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus
- .UNKNOWN_ERROR));
- } finally {
- beginTimeInDledgerLock = 0;
- putMessageLock.unlock();
- }
-
- if (elapsedTimeInLock > 500) {
- log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}",
- elapsedTimeInLock, messageExtBatch.getBody().length, appendResult);
- }
-
- PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
- try {
- AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS);
- switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
- case SUCCESS:
- putMessageStatus = PutMessageStatus.PUT_OK;
- break;
- case INCONSISTENT_LEADER:
- case NOT_LEADER:
- case LEADER_NOT_READY:
- case DISK_FULL:
- putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
- break;
- case WAIT_QUORUM_ACK_TIMEOUT:
- //Do not return flush_slave_timeout to the client, for the ons client will ignore it.
- putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
- break;
- case LEADER_PENDING_FULL:
- putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
- break;
- }
- } catch (Throwable t) {
- log.error("Failed to get dledger append result", t);
- }
-
- PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
- if (putMessageStatus == PutMessageStatus.PUT_OK) {
- // Statistics
- storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(msgNum);
- storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(encodeResult.totalMsgLen);
- }
- return putMessageResult;
- }
-
- @Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
index f46b3be..715c9d3 100644
--- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
@@ -30,6 +30,8 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.store.CommitLog.MessageExtEncoder;
+import org.apache.rocketmq.store.CommitLog.PutMessageContext;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Before;
@@ -42,7 +44,7 @@ public class AppendCallbackTest {
AppendMessageCallback callback;
- CommitLog.MessageExtBatchEncoder batchEncoder = new CommitLog.MessageExtBatchEncoder(10 * 1024 * 1024);
+ MessageExtEncoder batchEncoder = new MessageExtEncoder(10 * 1024 * 1024);
@Before
public void init() throws Exception {
@@ -84,10 +86,12 @@ public class AppendCallbackTest {
messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124));
messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
- messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+ PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue);
+ messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
//encounter end of file when append half of the data
- AppendMessageResult result = callback.doAppend(0, buff, 1000, messageExtBatch);
+ AppendMessageResult result =
+ callback.doAppend(0, buff, 1000, messageExtBatch, putMessageContext);
assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus());
assertEquals(0, result.getWroteOffset());
assertEquals(0, result.getLogicsOffset());
@@ -121,10 +125,12 @@ public class AppendCallbackTest {
messageExtBatch.setStoreHost(new InetSocketAddress("::1", 124));
messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
- messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+ PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue);
+ messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
//encounter end of file when append half of the data
- AppendMessageResult result = callback.doAppend(0, buff, 1000, messageExtBatch);
+ AppendMessageResult result =
+ callback.doAppend(0, buff, 1000, messageExtBatch, putMessageContext);
assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus());
assertEquals(0, result.getWroteOffset());
assertEquals(0, result.getLogicsOffset());
@@ -154,9 +160,11 @@ public class AppendCallbackTest {
messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124));
messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
- messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+ PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue);
+ messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
- AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBatch);
+ AppendMessageResult allresult =
+ callback.doAppend(0, buff, 1024 * 10, messageExtBatch, putMessageContext);
assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
assertEquals(0, allresult.getWroteOffset());
@@ -214,9 +222,11 @@ public class AppendCallbackTest {
messageExtBatch.setStoreHost(new InetSocketAddress("::1", 124));
messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
- messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
+ PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue);
+ messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
- AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBatch);
+ AppendMessageResult allresult =
+ callback.doAppend(0, buff, 1024 * 10, messageExtBatch, putMessageContext);
assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
assertEquals(0, allresult.getWroteOffset());