You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/05/07 15:07:03 UTC
incubator-rocketmq git commit: Add javadoc to message store.
Repository: incubator-rocketmq
Updated Branches:
refs/heads/develop 6898d96c0 -> e9814ad47
Add javadoc to message store.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/e9814ad4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/e9814ad4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/e9814ad4
Branch: refs/heads/develop
Commit: e9814ad47e3c2fb277ee37db1eb48c6c43848404
Parents: 6898d96
Author: Zhanhui Li <li...@apache.org>
Authored: Sun May 7 23:07:03 2017 +0800
Committer: Zhanhui Li <li...@apache.org>
Committed: Sun May 7 23:07:03 2017 +0800
----------------------------------------------------------------------
.../broker/client/net/Broker2Client.java | 2 +-
.../longpolling/PullRequestHoldService.java | 4 +-
.../broker/offset/ConsumerOffsetManager.java | 4 +-
.../plugin/AbstractPluginMessageStore.java | 20 +-
.../broker/processor/AdminBrokerProcessor.java | 16 +-
.../processor/ConsumerManageProcessor.java | 2 +-
.../rocketmq/store/DefaultMessageStore.java | 16 +-
.../org/apache/rocketmq/store/MessageStore.java | 239 ++++++++++++++++++-
.../store/schedule/ScheduleMessageService.java | 2 +-
9 files changed, 259 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e9814ad4/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
index c00898c..863da62 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
@@ -149,7 +149,7 @@ public class Broker2Client {
long timeStampOffset;
if (timeStamp == -1) {
- timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
+ timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
} else {
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e9814ad4/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
index 1a53db1..71f56a4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
@@ -98,7 +98,7 @@ public class PullRequestHoldService extends ServiceThread {
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
- final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
+ final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
@@ -124,7 +124,7 @@ public class PullRequestHoldService extends ServiceThread {
for (PullRequest request : requestList) {
long newestOffset = maxOffset;
if (newestOffset <= request.getPullFromThisOffset()) {
- newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
+ newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
if (newestOffset > request.getPullFromThisOffset()) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e9814ad4/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index bdcf30c..769c4ad 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -73,7 +73,7 @@ public class ConsumerOffsetManager extends ConfigManager {
while (it.hasNext() && result) {
Entry<Integer, Long> next = it.next();
- long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, next.getKey());
+ long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, next.getKey());
long offsetInPersist = next.getValue();
result = offsetInPersist <= minOffsetInStore;
}
@@ -201,7 +201,7 @@ public class ConsumerOffsetManager extends ConfigManager {
String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);
if (topic.equals(topicGroupArr[0])) {
for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) {
- long minOffset = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, entry.getKey());
+ long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, entry.getKey());
if (entry.getValue() >= minOffset) {
Long offset = queueMinOffset.get(entry.getKey());
if (offset == null) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e9814ad4/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 8ded973..690f70b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -92,18 +92,18 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
}
@Override
- public long getMaxOffsetInQuque(String topic, int queueId) {
- return next.getMaxOffsetInQuque(topic, queueId);
+ public long getMaxOffsetInQueue(String topic, int queueId) {
+ return next.getMaxOffsetInQueue(topic, queueId);
}
@Override
- public long getMinOffsetInQuque(String topic, int queueId) {
- return next.getMinOffsetInQuque(topic, queueId);
+ public long getMinOffsetInQueue(String topic, int queueId) {
+ return next.getMinOffsetInQueue(topic, queueId);
}
@Override
- public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) {
- return next.getCommitLogOffsetInQueue(topic, queueId, cqOffset);
+ public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) {
+ return next.getCommitLogOffsetInQueue(topic, queueId, consumeQueueOffset);
}
@Override
@@ -152,8 +152,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
}
@Override
- public long getMessageStoreTimeStamp(String topic, int queueId, long offset) {
- return next.getMessageStoreTimeStamp(topic, queueId, offset);
+ public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {
+ return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset);
}
@Override
@@ -172,8 +172,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
}
@Override
- public void excuteDeleteFilesManualy() {
- next.excuteDeleteFilesManualy();
+ public void executeDeleteFilesManually() {
+ next.executeDeleteFilesManually();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e9814ad4/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index daea53c..f59d295 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -376,7 +376,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
final GetMaxOffsetRequestHeader requestHeader =
(GetMaxOffsetRequestHeader) request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
- long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId());
+ long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
responseHeader.setOffset(offset);
@@ -391,7 +391,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
final GetMinOffsetRequestHeader requestHeader =
(GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
- long offset = this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId());
+ long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
@@ -537,11 +537,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
mq.setQueueId(i);
TopicOffset topicOffset = new TopicOffset();
- long min = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, i);
+ long min = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, i);
if (min < 0)
min = 0;
- long max = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
+ long max = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
if (max < 0)
max = 0;
@@ -679,7 +679,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
OffsetWrapper offsetWrapper = new OffsetWrapper();
- long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
+ long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
if (brokerOffset < 0)
brokerOffset = 0;
@@ -862,7 +862,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
long minTime = this.brokerController.getMessageStore().getEarliestMessageTime(topic, i);
timeSpan.setMinTimeStamp(minTime);
- long max = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
+ long max = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
long maxTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1);
timeSpan.setMaxTimeStamp(maxTime);
@@ -876,7 +876,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
timeSpan.setConsumeTimeStamp(consumeTime);
- long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(), i);
+ long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), i);
if (consumerOffset < maxBrokerOffset) {
long nextTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset);
timeSpan.setDelayTime(System.currentTimeMillis() - nextTime);
@@ -1126,7 +1126,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
mq.setQueueId(i);
OffsetWrapper offsetWrapper = new OffsetWrapper();
- long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
+ long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
if (brokerOffset < 0)
brokerOffset = 0;
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e9814ad4/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 2c1029c..bb42705 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -135,7 +135,7 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
response.setRemark(null);
} else {
long minOffset =
- this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
+ this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e9814ad4/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7bed62c..931edc7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -580,7 +580,7 @@ public class DefaultMessageStore implements MessageStore {
/**
*/
- public long getMaxOffsetInQuque(String topic, int queueId) {
+ public long getMaxOffsetInQueue(String topic, int queueId) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
long offset = logic.getMaxOffsetInQueue();
@@ -593,7 +593,7 @@ public class DefaultMessageStore implements MessageStore {
/**
*/
- public long getMinOffsetInQuque(String topic, int queueId) {
+ public long getMinOffsetInQueue(String topic, int queueId) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
return logic.getMinOffsetInQueue();
@@ -603,10 +603,10 @@ public class DefaultMessageStore implements MessageStore {
}
@Override
- public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) {
+ public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) {
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
- SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(cqOffset);
+ SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeQueueOffset);
if (bufferConsumeQueue != null) {
try {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
@@ -740,10 +740,10 @@ public class DefaultMessageStore implements MessageStore {
}
@Override
- public long getMessageStoreTimeStamp(String topic, int queueId, long offset) {
+ public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {
ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
if (logicQueue != null) {
- SelectMappedBufferResult result = logicQueue.getIndexBuffer(offset);
+ SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset);
if (result != null) {
try {
final long phyOffset = result.getByteBuffer().getLong();
@@ -798,7 +798,7 @@ public class DefaultMessageStore implements MessageStore {
}
@Override
- public void excuteDeleteFilesManualy() {
+ public void executeDeleteFilesManually() {
this.cleanCommitLogService.excuteDeleteFilesManualy();
}
@@ -1434,7 +1434,7 @@ public class DefaultMessageStore implements MessageStore {
public void excuteDeleteFilesManualy() {
this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
- DefaultMessageStore.log.info("excuteDeleteFilesManualy was invoked");
+ DefaultMessageStore.log.info("executeDeleteFilesManually was invoked");
}
public void run() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e9814ad4/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index e841c08..55572ce 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -22,91 +22,304 @@ import java.util.Set;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
+/**
+ * This class defines contracting interfaces to implement, allowing third-party vendor to use customized message store.
+ */
public interface MessageStore {
+ /**
+ * Load previously stored messages.
+ * @return true if success; false otherwise.
+ */
boolean load();
+ /**
+ * Launch this message store.
+ * @throws Exception if there is any error.
+ */
void start() throws Exception;
+ /**
+ * Shutdown this message store.
+ */
void shutdown();
+ /**
+ * Destroy this message store. Generally, all persistent files should be removed after invocation.
+ */
void destroy();
+ /**
+ * Store a message into store.
+ * @param msg Message instance to store
+ * @return result of store operation.
+ */
PutMessageResult putMessage(final MessageExtBrokerInner msg);
+ /**
+ * Store a batch of messages.
+ * @param messageExtBatch Message batch.
+ * @return result of storing batch messages.
+ */
PutMessageResult putMessages(final MessageExtBatch messageExtBatch);
+ /**
+ * Query at most <code>maxMsgNums</code> messages belonging to <code>topic</code> at <code>queueId</code> starting
+ * from given <code>offset</code>. Resulting messages will further be screened using provided message filter.
+ *
+ * @param group Consumer group that launches this query.
+ * @param topic Topic to query.
+ * @param queueId Queue ID to query.
+ * @param offset Logical offset to start from.
+ * @param maxMsgNums Maximum count of messages to query.
+ * @param messageFilter Message filter used to screen desired messages.
+ * @return Matched messages.
+ */
GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset, final int maxMsgNums, final MessageFilter messageFilter);
- long getMaxOffsetInQuque(final String topic, final int queueId);
-
- long getMinOffsetInQuque(final String topic, final int queueId);
-
- long getCommitLogOffsetInQueue(final String topic, final int queueId, final long cqOffset);
-
+ /**
+ * Get maximum offset of the topic queue.
+ * @param topic Topic name.
+ * @param queueId Queue ID.
+ * @return Maximum offset at present.
+ */
+ long getMaxOffsetInQueue(final String topic, final int queueId);
+
+ /**
+ * Get the minimum offset of the topic queue.
+ * @param topic Topic name.
+ * @param queueId Queue ID.
+ * @return Minimum offset at present.
+ */
+ long getMinOffsetInQueue(final String topic, final int queueId);
+
+ /**
+ * Get the offset of the message in the commit log, which is also known as physical offset.
+ * @param topic Topic of the message to lookup.
+ * @param queueId Queue ID.
+ * @param consumeQueueOffset offset of consume queue.
+ * @return physical offset.
+ */
+ long getCommitLogOffsetInQueue(final String topic, final int queueId, final long consumeQueueOffset);
+
+ /**
+ * Look up the physical offset of the message whose store timestamp is as specified.
+ * @param topic Topic of the message.
+ * @param queueId Queue ID.
+ * @param timestamp Timestamp to look up.
+ * @return physical offset which matches.
+ */
long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp);
+ /**
+ * Look up the message by given commit log offset.
+ * @param commitLogOffset physical offset.
+ * @return Message whose physical offset is as specified.
+ */
MessageExt lookMessageByOffset(final long commitLogOffset);
+ /**
+ * Get one message from the specified commit log offset.
+ * @param commitLogOffset commit log offset.
+ * @return wrapped result of the message.
+ */
SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset);
+ /**
+ * Get one message from the specified commit log offset.
+ * @param commitLogOffset commit log offset.
+ * @param msgSize message size.
+ * @return wrapped result of the message.
+ */
SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset, final int msgSize);
+ /**
+ * Get the running information of this store.
+ * @return message store running info.
+ */
String getRunningDataInfo();
+ /**
+ * Message store runtime information, which should generally contains various statistical information.
+ * @return runtime information of the message store in format of key-value pairs.
+ */
HashMap<String, String> getRuntimeInfo();
+ /**
+ * Get the maximum commit log offset.
+ * @return maximum commit log offset.
+ */
long getMaxPhyOffset();
+ /**
+ * Get the minimum commit log offset.
+ * @return minimum commit log offset.
+ */
long getMinPhyOffset();
+ /**
+ * Get the store time of the earliest message in the given queue.
+ * @param topic Topic of the messages to query.
+ * @param queueId Queue ID to find.
+ * @return store time of the earliest message.
+ */
long getEarliestMessageTime(final String topic, final int queueId);
+ /**
+ * Get the store time of the earliest message in this store.
+ * @return timestamp of the earliest message in this store.
+ */
long getEarliestMessageTime();
- long getMessageStoreTimeStamp(final String topic, final int queueId, final long offset);
-
+ /**
+ * Get the store time of the message specified.
+ * @param topic message topic.
+ * @param queueId queue ID.
+ * @param consumeQueueOffset consume queue offset.
+ * @return store timestamp of the message.
+ */
+ long getMessageStoreTimeStamp(final String topic, final int queueId, final long consumeQueueOffset);
+
+ /**
+ * Get the total number of the messages in the specified queue.
+ * @param topic Topic
+ * @param queueId Queue ID.
+ * @return total number.
+ */
long getMessageTotalInQueue(final String topic, final int queueId);
+ /**
+ * Get the raw commit log data starting from the given offset, which should used for replication purpose.
+ * @param offset starting offset.
+ * @return commit log data.
+ */
SelectMappedBufferResult getCommitLogData(final long offset);
+ /**
+ * Append data to commit log.
+ * @param startOffset starting offset.
+ * @param data data to append.
+ * @return true if success; false otherwise.
+ */
boolean appendToCommitLog(final long startOffset, final byte[] data);
- void excuteDeleteFilesManualy();
-
- QueryMessageResult queryMessage(final String topic, final String key, final int maxNum,
- final long begin, final long end);
-
+ /**
+ * Execute file deletion manually.
+ */
+ void executeDeleteFilesManually();
+
+ /**
+ * Query messages by given key.
+ * @param topic topic of the message.
+ * @param key message key.
+ * @param maxNum maximum number of the messages possible.
+ * @param begin begin timestamp.
+ * @param end end timestamp.
+ * @return
+ */
+ QueryMessageResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
+ final long end);
+
+ /**
+ * Update HA master address.
+ * @param newAddr new address.
+ */
void updateHaMasterAddress(final String newAddr);
+ /**
+ * Return how much the slave falls behind.
+ * @return number of bytes that slave falls behind.
+ */
long slaveFallBehindMuch();
+ /**
+ * Return the current timestamp of the store.
+ * @return current time in milliseconds since 1970-01-01.
+ */
long now();
+ /**
+ * Clean unused topics.
+ * @param topics all valid topics.
+ * @return number of the topics deleted.
+ */
int cleanUnusedTopic(final Set<String> topics);
+ /**
+ * Clean expired consume queues.
+ */
void cleanExpiredConsumerQueue();
+ /**
+ * Check if the given message has been swapped out of the memory.
+ * @param topic topic.
+ * @param queueId queue ID.
+ * @param consumeOffset consume queue offset.
+ * @return true if the message is no longer in memory; false otherwise.
+ */
boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset);
+ /**
+ * Get number of the bytes that have been stored in commit log and not yet dispatched to consume queue.
+ * @return number of the bytes to dispatch.
+ */
long dispatchBehindBytes();
+ /**
+ * Flush the message store to persist all data.
+ * @return maximum offset flushed to persistent storage device.
+ */
long flush();
+ /**
+ * Reset written offset.
+ * @param phyOffset new offset.
+ * @return true if success; false otherwise.
+ */
boolean resetWriteOffset(long phyOffset);
+ /**
+ * Get confirm offset.
+ * @return confirm offset.
+ */
long getConfirmOffset();
+ /**
+ * Set confirm offset.
+ * @param phyOffset confirm offset to set.
+ */
void setConfirmOffset(long phyOffset);
+ /**
+ * Check if the operation system page cache is busy or not.
+ * @return true if the OS page cache is busy; false otherwise.
+ */
boolean isOSPageCacheBusy();
+ /**
+ * Get lock time in milliseconds of the store by far.
+ * @return lock time in milliseconds.
+ */
long lockTimeMills();
+ /**
+ * Check if the transient store pool is deficient.
+ * @return true if the transient store pool is running out; false otherwise.
+ */
boolean isTransientStorePoolDeficient();
+ /**
+ * Get the dispatcher list.
+ * @return list of the dispatcher.
+ */
LinkedList<CommitLogDispatcher> getDispatcherList();
+ /**
+ * Get consume queue of the topic/queue.
+ * @param topic Topic.
+ * @param queueId Queue ID.
+ * @return Consume queue.
+ */
ConsumeQueue getConsumeQueue(String topic, int queueId);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e9814ad4/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index d45b994..501876e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -79,7 +79,7 @@ public class ScheduleMessageService extends ConfigManager {
Entry<Integer, Long> next = it.next();
int queueId = delayLevel2QueueId(next.getKey());
long delayOffset = next.getValue();
- long maxOffset = this.defaultMessageStore.getMaxOffsetInQuque(SCHEDULE_TOPIC, queueId);
+ long maxOffset = this.defaultMessageStore.getMaxOffsetInQueue(SCHEDULE_TOPIC, queueId);
String value = String.format("%d,%d", delayOffset, maxOffset);
String key = String.format("%s_%d", RunningStats.scheduleMessageOffset.name(), next.getKey());
stats.put(key, value);