You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/12/08 08:59:21 UTC
[rocketmq] branch develop updated: [ISSUE #690] Support batch msgs
in dledger mode (#2406)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4730987 [ISSUE #690] Support batch msgs in dledger mode (#2406)
4730987 is described below
commit 4730987c9cc046c6db980a48aaed9eacc13c630d
Author: TerrellChen <39...@users.noreply.github.com>
AuthorDate: Tue Dec 8 16:58:59 2020 +0800
[ISSUE #690] Support batch msgs in dledger mode (#2406)
* implement issue-690
* add unit test
* fix version
* fix wroteOffset;update version;polish
* polish
* fix wrong wroteOffset of AppendMessageResult
* move serialization out of lock in async method
---
store/pom.xml | 2 +-
.../rocketmq/store/dledger/DLedgerCommitLog.java | 376 ++++++++++++++++++++-
.../org/apache/rocketmq/store/StoreTestBase.java | 68 +++-
.../store/dledger/DLedgerCommitlogTest.java | 122 ++++++-
4 files changed, 544 insertions(+), 24 deletions(-)
diff --git a/store/pom.xml b/store/pom.xml
index 8f4b44a..bcb3e6a 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -31,7 +31,7 @@
<dependency>
<groupId>io.openmessaging.storage</groupId>
<artifactId>dledger</artifactId>
- <version>0.2.0</version>
+ <version>0.2.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 24e0f69..9a6e7a7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -17,11 +17,13 @@
package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dledger.AppendFuture;
+import io.openmessaging.storage.dledger.BatchAppendFuture;
import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
+import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.store.file.MmapFile;
@@ -32,6 +34,8 @@ import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.UtilAll;
@@ -74,6 +78,8 @@ public class DLedgerCommitLog extends CommitLog {
private boolean isInrecoveringOldCommitlog = false;
+ private final StringBuilder msgIdBuilder = new StringBuilder();
+
public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) {
super(defaultMessageStore);
dLedgerConfig = new DLedgerConfig();
@@ -507,7 +513,129 @@ public class DLedgerCommitLog extends CommitLog {
@Override
public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+ final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+ if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+ }
+ if (messageExtBatch.getDelayTimeLevel() > 0) {
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+ }
+
+ // Set the storage time
+ messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+
+ StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+ InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+ if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+ messageExtBatch.setBornHostV6Flag();
+ }
+
+ InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+ if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+ messageExtBatch.setStoreHostAddressV6Flag();
+ }
+
+ // Back to Results
+ AppendMessageResult appendResult;
+ BatchAppendFuture<AppendEntryResponse> dledgerFuture;
+ EncodeResult encodeResult;
+
+ putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+ msgIdBuilder.setLength(0);
+ long elapsedTimeInLock;
+ long queueOffset;
+ long msgNum = 0;
+ try {
+ beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+ encodeResult = this.messageSerializer.serialize(messageExtBatch);
+ queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+ if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
+ .status));
+ }
+ BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+ request.setGroup(dLedgerConfig.getGroup());
+ request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+ request.setBatchMsgs(encodeResult.batchData);
+ dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+ if (dledgerFuture.getPos() == -1) {
+ log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode());
+ return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
+ }
+ long wroteOffset = 0;
+
+ int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+ ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
+
+ boolean isFirstOffset = true;
+ long firstWroteOffset = 0;
+ for (long pos : dledgerFuture.getPositions()) {
+ wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
+ if (isFirstOffset) {
+ firstWroteOffset = wroteOffset;
+ isFirstOffset = false;
+ }
+ String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
+ if (msgIdBuilder.length() > 0) {
+ msgIdBuilder.append(',').append(msgId);
+ } else {
+ msgIdBuilder.append(msgId);
+ }
+ msgNum++;
+ }
+
+ elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
+ appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen,
+ msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
+ DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + msgNum);
+ } catch (Exception e) {
+ log.error("Put message error", e);
+ return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus
+ .UNKNOWN_ERROR));
+ } finally {
+ beginTimeInDledgerLock = 0;
+ putMessageLock.unlock();
+ }
+
+ if (elapsedTimeInLock > 500) {
+ log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}",
+ elapsedTimeInLock, messageExtBatch.getBody().length, appendResult);
+ }
+
+ PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
+ try {
+ AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS);
+ switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
+ case SUCCESS:
+ putMessageStatus = PutMessageStatus.PUT_OK;
+ break;
+ case INCONSISTENT_LEADER:
+ case NOT_LEADER:
+ case LEADER_NOT_READY:
+ case DISK_FULL:
+ putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
+ break;
+ case WAIT_QUORUM_ACK_TIMEOUT:
+ //Do not return flush_slave_timeout to the client, for the ons client will ignore it.
+ putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
+ break;
+ case LEADER_PENDING_FULL:
+ putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
+ break;
+ }
+ } catch (Throwable t) {
+ log.error("Failed to get dledger append result", t);
+ }
+
+ PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
+ if (putMessageStatus == PutMessageStatus.PUT_OK) {
+ // Statistics
+ storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(msgNum);
+ storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(encodeResult.totalMsgLen);
+ }
+ return putMessageResult;
}
@Override
@@ -609,7 +737,125 @@ public class DLedgerCommitLog extends CommitLog {
@Override
public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+ final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+ if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+ }
+ if (messageExtBatch.getDelayTimeLevel() > 0) {
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+ }
+
+ // Set the storage time
+ messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+
+ StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
+
+ InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
+ if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+ messageExtBatch.setBornHostV6Flag();
+ }
+
+ InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
+ if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+ messageExtBatch.setStoreHostAddressV6Flag();
+ }
+
+ // Back to Results
+ AppendMessageResult appendResult;
+ BatchAppendFuture<AppendEntryResponse> dledgerFuture;
+ EncodeResult encodeResult;
+
+ encodeResult = this.messageSerializer.serialize(messageExtBatch);
+ if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
+ .status)));
+ }
+
+ putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+ msgIdBuilder.setLength(0);
+ long elapsedTimeInLock;
+ long queueOffset;
+ long msgNum = 0;
+ try {
+ beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
+ queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+ BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+ request.setGroup(dLedgerConfig.getGroup());
+ request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+ request.setBatchMsgs(encodeResult.batchData);
+ dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+ if (dledgerFuture.getPos() == -1) {
+ log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode());
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+ }
+ long wroteOffset = 0;
+
+ int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+ ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
+
+ boolean isFirstOffset = true;
+ long firstWroteOffset = 0;
+ for (long pos : dledgerFuture.getPositions()) {
+ wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
+ if (isFirstOffset) {
+ firstWroteOffset = wroteOffset;
+ isFirstOffset = false;
+ }
+ String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
+ if (msgIdBuilder.length() > 0) {
+ msgIdBuilder.append(',').append(msgId);
+ } else {
+ msgIdBuilder.append(msgId);
+ }
+ msgNum++;
+ }
+
+ elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
+ appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen,
+ msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
+ DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + msgNum);
+ } catch (Exception e) {
+ log.error("Put message error", e);
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+ } finally {
+ beginTimeInDledgerLock = 0;
+ putMessageLock.unlock();
+ }
+
+ if (elapsedTimeInLock > 500) {
+ log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}",
+ elapsedTimeInLock, messageExtBatch.getBody().length, appendResult);
+ }
+
+ return dledgerFuture.thenApply(appendEntryResponse -> {
+ PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
+ switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
+ case SUCCESS:
+ putMessageStatus = PutMessageStatus.PUT_OK;
+ break;
+ case INCONSISTENT_LEADER:
+ case NOT_LEADER:
+ case LEADER_NOT_READY:
+ case DISK_FULL:
+ putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
+ break;
+ case WAIT_QUORUM_ACK_TIMEOUT:
+ //Do not return flush_slave_timeout to the client, for the ons client will ignore it.
+ putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
+ break;
+ case LEADER_PENDING_FULL:
+ putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
+ break;
+ }
+ PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
+ if (putMessageStatus == PutMessageStatus.PUT_OK) {
+ // Statistics
+ storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).incrementAndGet();
+ storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(appendResult.getWroteBytes());
+ }
+ return putMessageResult;
+ });
}
@Override
@@ -701,7 +947,9 @@ public class DLedgerCommitLog extends CommitLog {
class EncodeResult {
private String queueOffsetKey;
private ByteBuffer data;
+ private List<byte[]> batchData;
private AppendMessageStatus status;
+ private int totalMsgLen;
public EncodeResult(AppendMessageStatus status, ByteBuffer data, String queueOffsetKey) {
this.data = data;
@@ -716,6 +964,13 @@ public class DLedgerCommitLog extends CommitLog {
public byte[] getData() {
return data.array();
}
+
+ public EncodeResult(AppendMessageStatus status, String queueOffsetKey, List<byte[]> batchData, int totalMsgLen) {
+ this.batchData = batchData;
+ this.status = status;
+ this.queueOffsetKey = queueOffsetKey;
+ this.totalMsgLen = totalMsgLen;
+ }
}
class MessageSerializer {
@@ -823,6 +1078,123 @@ public class DLedgerCommitLog extends CommitLog {
return new EncodeResult(AppendMessageStatus.PUT_OK, msgStoreItemMemory, key);
}
+ public EncodeResult serialize(final MessageExtBatch messageExtBatch) {
+ keyBuilder.setLength(0);
+ keyBuilder.append(messageExtBatch.getTopic());
+ keyBuilder.append('-');
+ keyBuilder.append(messageExtBatch.getQueueId());
+ String key = keyBuilder.toString();
+
+ Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key);
+ if (null == queueOffset) {
+ queueOffset = 0L;
+ DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset);
+ }
+
+ int totalMsgLen = 0;
+ ByteBuffer messagesByteBuff = messageExtBatch.wrap();
+ List<byte[]> batchBody = new LinkedList<>();
+
+ int sysFlag = messageExtBatch.getSysFlag();
+ int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
+ int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
+ ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
+ ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
+
+ while (messagesByteBuff.hasRemaining()) {
+ // 1 TOTALSIZE
+ messagesByteBuff.getInt();
+ // 2 MAGICCODE
+ messagesByteBuff.getInt();
+ // 3 BODYCRC
+ messagesByteBuff.getInt();
+ // 4 FLAG
+ int flag = messagesByteBuff.getInt();
+ // 5 BODY
+ int bodyLen = messagesByteBuff.getInt();
+ int bodyPos = messagesByteBuff.position();
+ int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen);
+ messagesByteBuff.position(bodyPos + bodyLen);
+ // 6 properties
+ short propertiesLen = messagesByteBuff.getShort();
+ int propertiesPos = messagesByteBuff.position();
+ messagesByteBuff.position(propertiesPos + propertiesLen);
+
+ final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+
+ final int topicLength = topicData.length;
+
+ final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, propertiesLen);
+ ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen);
+
+ // Exceeds the maximum message
+ if (msgLen > this.maxMessageSize) {
+ CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " +
+ bodyLen
+ + ", maxMessageSize: " + this.maxMessageSize);
+ throw new RuntimeException("message size exceeded");
+ }
+
+ totalMsgLen += msgLen;
+ // Determines whether there is sufficient free space
+ if (totalMsgLen > maxMessageSize) {
+ throw new RuntimeException("message size exceeded");
+ }
+
+ // Initialization of storage space
+ this.resetByteBuffer(msgStoreItemMemory, msgLen);
+ // 1 TOTALSIZE
+ msgStoreItemMemory.putInt(msgLen);
+ // 2 MAGICCODE
+ msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE);
+ // 3 BODYCRC
+ msgStoreItemMemory.putInt(bodyCrc);
+ // 4 QUEUEID
+ msgStoreItemMemory.putInt(messageExtBatch.getQueueId());
+ // 5 FLAG
+ msgStoreItemMemory.putInt(flag);
+ // 6 QUEUEOFFSET
+ msgStoreItemMemory.putLong(queueOffset++);
+ // 7 PHYSICALOFFSET
+ msgStoreItemMemory.putLong(0);
+ // 8 SYSFLAG
+ msgStoreItemMemory.putInt(messageExtBatch.getSysFlag());
+ // 9 BORNTIMESTAMP
+ msgStoreItemMemory.putLong(messageExtBatch.getBornTimestamp());
+ // 10 BORNHOST
+ resetByteBuffer(bornHostHolder, bornHostLength);
+ msgStoreItemMemory.put(messageExtBatch.getBornHostBytes(bornHostHolder));
+ // 11 STORETIMESTAMP
+ msgStoreItemMemory.putLong(messageExtBatch.getStoreTimestamp());
+ // 12 STOREHOSTADDRESS
+ resetByteBuffer(storeHostHolder, storeHostLength);
+ msgStoreItemMemory.put(messageExtBatch.getStoreHostBytes(storeHostHolder));
+ // 13 RECONSUMETIMES
+ msgStoreItemMemory.putInt(messageExtBatch.getReconsumeTimes());
+ // 14 Prepared Transaction Offset
+ msgStoreItemMemory.putLong(0);
+ // 15 BODY
+ msgStoreItemMemory.putInt(bodyLen);
+ if (bodyLen > 0) {
+ msgStoreItemMemory.put(messagesByteBuff.array(), bodyPos, bodyLen);
+ }
+ // 16 TOPIC
+ msgStoreItemMemory.put((byte) topicLength);
+ msgStoreItemMemory.put(topicData);
+ // 17 PROPERTIES
+ msgStoreItemMemory.putShort(propertiesLen);
+ if (propertiesLen > 0) {
+ msgStoreItemMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
+ }
+ byte[] data = new byte[msgLen];
+ msgStoreItemMemory.clear();
+ msgStoreItemMemory.get(data);
+ batchBody.add(data);
+ }
+
+ return new EncodeResult(AppendMessageStatus.PUT_OK, key, batchBody, totalMsgLen);
+ }
+
private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
byteBuffer.flip();
byteBuffer.limit(limit);
diff --git a/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java
index a736754..5660de1 100644
--- a/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java
+++ b/store/src/test/java/org/apache/rocketmq/store/StoreTestBase.java
@@ -16,17 +16,19 @@
*/
package org.apache.rocketmq.store;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.junit.After;
+
import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.rocketmq.common.UtilAll;
-import org.junit.After;
public class StoreTestBase {
@@ -44,6 +46,28 @@ public class StoreTestBase {
return port.addAndGet(5);
}
+ protected MessageExtBatch buildBatchMessage(int size) {
+ MessageExtBatch messageExtBatch = new MessageExtBatch();
+ messageExtBatch.setTopic("StoreTest");
+ messageExtBatch.setTags("TAG1");
+ messageExtBatch.setKeys("Hello");
+ messageExtBatch.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
+ messageExtBatch.setSysFlag(0);
+
+ messageExtBatch.setBornTimestamp(System.currentTimeMillis());
+ messageExtBatch.setBornHost(BornHost);
+ messageExtBatch.setStoreHost(StoreHost);
+
+ List<Message> messageList = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ messageList.add(buildMessage());
+ }
+
+ messageExtBatch.setBody(MessageDecoder.encodeMessages(messageList));
+
+ return messageExtBatch;
+ }
+
protected MessageExtBrokerInner buildMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic("StoreTest");
@@ -59,6 +83,40 @@ public class StoreTestBase {
return msg;
}
+ protected MessageExtBatch buildIPv6HostBatchMessage(int size) {
+ MessageExtBatch messageExtBatch = new MessageExtBatch();
+ messageExtBatch.setTopic("StoreTest");
+ messageExtBatch.setTags("TAG1");
+ messageExtBatch.setKeys("Hello");
+ messageExtBatch.setBody(MessageBody);
+ messageExtBatch.setMsgId("24084004018081003FAA1DDE2B3F898A00002A9F0000000000000CA0");
+ messageExtBatch.setKeys(String.valueOf(System.currentTimeMillis()));
+ messageExtBatch.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
+ messageExtBatch.setSysFlag(0);
+ messageExtBatch.setBornHostV6Flag();
+ messageExtBatch.setStoreHostAddressV6Flag();
+ messageExtBatch.setBornTimestamp(System.currentTimeMillis());
+ try {
+ messageExtBatch.setBornHost(new InetSocketAddress(InetAddress.getByName("1050:0000:0000:0000:0005:0600:300c:326b"), 8123));
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+
+ try {
+ messageExtBatch.setStoreHost(new InetSocketAddress(InetAddress.getByName("::1"), 8123));
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+
+ List<Message> messageList = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ messageList.add(buildIPv6HostMessage());
+ }
+
+ messageExtBatch.setBody(MessageDecoder.encodeMessages(messageList));
+ return messageExtBatch;
+ }
+
protected MessageExtBrokerInner buildIPv6HostMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic("StoreTest");
diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
index e31d834..8ab8a23 100644
--- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
@@ -19,14 +19,17 @@ package org.apache.rocketmq.store.dledger;
import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.store.file.MmapFileList;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
@@ -41,7 +44,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
@Test
public void testTruncateCQ() throws Exception {
- String base = createBaseDir();
+ String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
String topic = UUID.randomUUID().toString();
@@ -94,10 +97,9 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
}
-
@Test
public void testRecover() throws Exception {
- String base = createBaseDir();
+ String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
String topic = UUID.randomUUID().toString();
@@ -135,10 +137,9 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
}
-
@Test
public void testPutAndGetMessage() throws Exception {
- String base = createBaseDir();
+ String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
@@ -148,7 +149,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
List<PutMessageResult> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
MessageExtBrokerInner msgInner =
- i < 5 ? buildMessage() : buildIPv6HostMessage();
+ i < 5 ? buildMessage() : buildIPv6HostMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(0);
PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
@@ -160,7 +161,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(10, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
- GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null);
+ GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null);
Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
Assert.assertEquals(10, getMessageResult.getMessageBufferList().size());
@@ -178,8 +179,52 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
}
@Test
+ public void testBatchPutAndGetMessage() throws Exception {
+ String base = createBaseDir();
+ String peers = String.format("n0-localhost:%d", nextPort());
+ String group = UUID.randomUUID().toString();
+ DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
+ Thread.sleep(1000);
+ String topic = UUID.randomUUID().toString();
+ // should be less than 4
+ int batchMessageSize = 2;
+ int repeat = 10;
+ List<PutMessageResult> results = new ArrayList<>();
+ for (int i = 0; i < repeat; i++) {
+ MessageExtBatch messageExtBatch =
+ i < repeat / 10 ? buildBatchMessage(batchMessageSize) : buildIPv6HostBatchMessage(batchMessageSize);
+ messageExtBatch.setTopic(topic);
+ messageExtBatch.setQueueId(0);
+ PutMessageResult putMessageResult = messageStore.putMessages(messageExtBatch);
+ results.add(putMessageResult);
+ Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+ Assert.assertEquals(i * batchMessageSize, putMessageResult.getAppendMessageResult().getLogicsOffset());
+ }
+ Thread.sleep(100);
+ Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
+ Assert.assertEquals(repeat * batchMessageSize, messageStore.getMaxOffsetInQueue(topic, 0));
+ Assert.assertEquals(0, messageStore.dispatchBehindBytes());
+ GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 100, null);
+ Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
+
+ Assert.assertEquals(repeat * batchMessageSize > 32 ? 32 : repeat * batchMessageSize, getMessageResult.getMessageBufferList().size());
+ Assert.assertEquals(repeat * batchMessageSize > 32 ? 32 : repeat * batchMessageSize, getMessageResult.getMessageMapedList().size());
+ Assert.assertEquals(repeat * batchMessageSize, getMessageResult.getMaxOffset());
+
+ for (int i = 0; i < results.size(); i++) {
+ ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i * batchMessageSize);
+ MessageExt messageExt = MessageDecoder.decode(buffer);
+ Assert.assertEquals(i * batchMessageSize, messageExt.getQueueOffset());
+ Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId().split(",").length, batchMessageSize);
+ Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(), messageExt.getCommitLogOffset());
+ }
+ messageStore.destroy();
+ messageStore.shutdown();
+ }
+
+ @Test
public void testAsyncPutAndGetMessage() throws Exception {
- String base = createBaseDir();
+ String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String group = UUID.randomUUID().toString();
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
@@ -189,7 +234,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
List<PutMessageResult> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
MessageExtBrokerInner msgInner =
- i < 5 ? buildMessage() : buildIPv6HostMessage();
+ i < 5 ? buildMessage() : buildIPv6HostMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(0);
CompletableFuture<PutMessageResult> futureResult = messageStore.asyncPutMessage(msgInner);
@@ -202,7 +247,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(10, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.dispatchBehindBytes());
- GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null);
+ GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null);
Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
Assert.assertEquals(10, getMessageResult.getMessageBufferList().size());
@@ -219,15 +264,60 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
messageStore.shutdown();
}
+ @Test
+ public void testAsyncBatchPutAndGetMessage() throws Exception {
+ String base = createBaseDir();
+ String peers = String.format("n0-localhost:%d", nextPort());
+ String group = UUID.randomUUID().toString();
+ DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
+ Thread.sleep(1000);
+ String topic = UUID.randomUUID().toString();
+ // should be less than 4
+ int batchMessageSize = 2;
+ int repeat = 10;
+
+ List<PutMessageResult> results = new ArrayList<>();
+ for (int i = 0; i < repeat; i++) {
+ MessageExtBatch messageExtBatch =
+ i < 5 ? buildBatchMessage(batchMessageSize) : buildIPv6HostBatchMessage(batchMessageSize);
+ messageExtBatch.setTopic(topic);
+ messageExtBatch.setQueueId(0);
+ CompletableFuture<PutMessageResult> futureResult = messageStore.asyncPutMessages(messageExtBatch);
+ PutMessageResult putMessageResult = futureResult.get(3000, TimeUnit.MILLISECONDS);
+ results.add(putMessageResult);
+ Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+ Assert.assertEquals(i * batchMessageSize, putMessageResult.getAppendMessageResult().getLogicsOffset());
+ }
+ Thread.sleep(100);
+ Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
+ Assert.assertEquals(repeat * batchMessageSize, messageStore.getMaxOffsetInQueue(topic, 0));
+ Assert.assertEquals(0, messageStore.dispatchBehindBytes());
+ GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, 0, 32, null);
+ Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus());
+
+ Assert.assertEquals(repeat * batchMessageSize > 32 ? 32 : repeat * batchMessageSize, getMessageResult.getMessageBufferList().size());
+ Assert.assertEquals(repeat * batchMessageSize > 32 ? 32 : repeat * batchMessageSize, getMessageResult.getMessageMapedList().size());
+ Assert.assertEquals(repeat * batchMessageSize, getMessageResult.getMaxOffset());
+
+ for (int i = 0; i < results.size(); i++) {
+ ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i * batchMessageSize);
+ MessageExt messageExt = MessageDecoder.decode(buffer);
+ Assert.assertEquals(i * batchMessageSize, messageExt.getQueueOffset());
+ Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId().split(",").length, batchMessageSize);
+ Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(), messageExt.getCommitLogOffset());
+ }
+ messageStore.destroy();
+ messageStore.shutdown();
+ }
@Test
public void testCommittedPos() throws Exception {
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());
String group = UUID.randomUUID().toString();
- DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,"n0", peers, "n0", false, 0);
+ DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group, "n0", peers, "n0", false, 0);
String topic = UUID.randomUUID().toString();
- MessageExtBrokerInner msgInner = buildMessage();
+ MessageExtBrokerInner msgInner = buildMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(0);
PutMessageResult putMessageResult = leaderStore.putMessage(msgInner);
@@ -239,7 +329,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0));
- DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,"n1", peers, "n0", false, 0);
+ DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group, "n1", peers, "n0", false, 0);
Thread.sleep(2000);
Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0));
@@ -258,10 +348,10 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
public void testIPv6HostMsgCommittedPos() throws Exception {
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());
String group = UUID.randomUUID().toString();
- DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group,"n0", peers, "n0", false, 0);
+ DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group, "n0", peers, "n0", false, 0);
String topic = UUID.randomUUID().toString();
- MessageExtBrokerInner msgInner = buildIPv6HostMessage();
+ MessageExtBrokerInner msgInner = buildIPv6HostMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(0);
PutMessageResult putMessageResult = leaderStore.putMessage(msgInner);
@@ -273,7 +363,7 @@ public class DLedgerCommitlogTest extends MessageStoreTestBase {
Assert.assertEquals(0, leaderStore.getMaxOffsetInQueue(topic, 0));
- DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group,"n1", peers, "n0", false, 0);
+ DefaultMessageStore followerStore = createDledgerMessageStore(createBaseDir(), group, "n1", peers, "n0", false, 0);
Thread.sleep(2000);
Assert.assertEquals(1, leaderStore.getMaxOffsetInQueue(topic, 0));