You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:24 UTC
[07/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
new file mode 100644
index 0000000..00f9833
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
@@ -0,0 +1,654 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.ha;
+
+import com.alibaba.rocketmq.common.ServiceThread;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import com.alibaba.rocketmq.store.CommitLog.GroupCommitRequest;
+import com.alibaba.rocketmq.store.DefaultMessageStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class HAService {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+ private final AtomicInteger connectionCount = new AtomicInteger(0);
+
+ private final List<HAConnection> connectionList = new LinkedList<HAConnection>();
+
+ private final AcceptSocketService acceptSocketService;
+
+ private final DefaultMessageStore defaultMessageStore;
+
+ private final WaitNotifyObject waitNotifyObject = new WaitNotifyObject();
+ private final AtomicLong push2SlaveMaxOffset = new AtomicLong(0);
+
+ private final GroupTransferService groupTransferService;
+
+ private final HAClient haClient;
+
+
+ public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
+ this.defaultMessageStore = defaultMessageStore;
+ this.acceptSocketService =
+ new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
+ this.groupTransferService = new GroupTransferService();
+ this.haClient = new HAClient();
+ }
+
+
+ public void updateMasterAddress(final String newAddr) {
+ if (this.haClient != null) {
+ this.haClient.updateMasterAddress(newAddr);
+ }
+ }
+
+
+ public void putRequest(final GroupCommitRequest request) {
+ this.groupTransferService.putRequest(request);
+ }
+
+
+ public boolean isSlaveOK(final long masterPutWhere) {
+ boolean result = this.connectionCount.get() > 0;
+ result =
+ result
+ && ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore
+ .getMessageStoreConfig().getHaSlaveFallbehindMax());
+ return result;
+ }
+
+
+ /**
+
+ */
+ public void notifyTransferSome(final long offset) {
+ for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
+ boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
+ if (ok) {
+ this.groupTransferService.notifyTransferSome();
+ break;
+ } else {
+ value = this.push2SlaveMaxOffset.get();
+ }
+ }
+ }
+
+
+ public AtomicInteger getConnectionCount() {
+ return connectionCount;
+ }
+
+
+ // public void notifyTransferSome() {
+ // this.groupTransferService.notifyTransferSome();
+ // }
+
+ public void start() {
+ this.acceptSocketService.beginAccept();
+ this.acceptSocketService.start();
+ this.groupTransferService.start();
+ this.haClient.start();
+ }
+
+
+ public void addConnection(final HAConnection conn) {
+ synchronized (this.connectionList) {
+ this.connectionList.add(conn);
+ }
+ }
+
+
+ public void removeConnection(final HAConnection conn) {
+ synchronized (this.connectionList) {
+ this.connectionList.remove(conn);
+ }
+ }
+
+
+ public void shutdown() {
+ this.haClient.shutdown();
+ this.acceptSocketService.shutdown(true);
+ this.destroyConnections();
+ this.groupTransferService.shutdown();
+ }
+
+
+ public void destroyConnections() {
+ synchronized (this.connectionList) {
+ for (HAConnection c : this.connectionList) {
+ c.shutdown();
+ }
+
+ this.connectionList.clear();
+ }
+ }
+
+
+ public DefaultMessageStore getDefaultMessageStore() {
+ return defaultMessageStore;
+ }
+
+
+ public WaitNotifyObject getWaitNotifyObject() {
+ return waitNotifyObject;
+ }
+
+ public AtomicLong getPush2SlaveMaxOffset() {
+ return push2SlaveMaxOffset;
+ }
+
+ class AcceptSocketService extends ServiceThread {
+ private ServerSocketChannel serverSocketChannel;
+ private Selector selector;
+ private SocketAddress socketAddressListen;
+
+
+ public AcceptSocketService(final int port) {
+ this.socketAddressListen = new InetSocketAddress(port);
+ }
+
+
+ public void beginAccept() {
+ try {
+ this.serverSocketChannel = ServerSocketChannel.open();
+ this.selector = RemotingUtil.openSelector();
+ this.serverSocketChannel.socket().setReuseAddress(true);
+ this.serverSocketChannel.socket().bind(this.socketAddressListen);
+ this.serverSocketChannel.configureBlocking(false);
+ this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
+ } catch (Exception e) {
+ log.error("beginAccept exception", e);
+ }
+ }
+
+
+ @Override
+ public void run() {
+ log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ this.selector.select(1000);
+ Set<SelectionKey> selected = this.selector.selectedKeys();
+ if (selected != null) {
+ for (SelectionKey k : selected) {
+ if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
+ SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
+ if (sc != null) {
+ HAService.log.info("HAService receive new connection, "
+ + sc.socket().getRemoteSocketAddress());
+
+ try {
+ HAConnection conn = new HAConnection(HAService.this, sc);
+ conn.start();
+ HAService.this.addConnection(conn);
+ } catch (Exception e) {
+ log.error("new HAConnection exception", e);
+ sc.close();
+ }
+ }
+ } else {
+ log.warn("Unexpected ops in select " + k.readyOps());
+ }
+ }
+
+ selected.clear();
+ }
+
+ } catch (Exception e) {
+ log.error(this.getServiceName() + " service has exception.", e);
+ }
+ }
+
+ log.error(this.getServiceName() + " service end");
+ }
+
+
+ @Override
+ public String getServiceName() {
+ return AcceptSocketService.class.getSimpleName();
+ }
+ }
+
+ /**
+ * GroupTransferService Service
+ */
+ class GroupTransferService extends ServiceThread {
+
+ private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
+ private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
+ private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
+
+
+ public void putRequest(final GroupCommitRequest request) {
+ synchronized (this) {
+ this.requestsWrite.add(request);
+ if (hasNotified.compareAndSet(false, true)) {
+ waitPoint.countDown(); // notify
+ }
+ }
+ }
+
+
+ public void notifyTransferSome() {
+ this.notifyTransferObject.wakeup();
+ }
+
+
+ private void swapRequests() {
+ List<GroupCommitRequest> tmp = this.requestsWrite;
+ this.requestsWrite = this.requestsRead;
+ this.requestsRead = tmp;
+ }
+
+
+ private void doWaitTransfer() {
+ if (!this.requestsRead.isEmpty()) {
+ for (GroupCommitRequest req : this.requestsRead) {
+ boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
+ for (int i = 0; !transferOK && i < 5; i++) {
+ this.notifyTransferObject.waitForRunning(1000);
+ transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
+ }
+
+ if (!transferOK) {
+ log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
+ }
+
+ req.wakeupCustomer(transferOK);
+ }
+
+ this.requestsRead.clear();
+ }
+ }
+
+
+ public void run() {
+ log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ this.waitForRunning(0);
+ this.doWaitTransfer();
+ } catch (Exception e) {
+ log.warn(this.getServiceName() + " service has exception. ", e);
+ }
+ }
+
+ log.info(this.getServiceName() + " service end");
+ }
+
+
+ @Override
+ protected void onWaitEnd() {
+ this.swapRequests();
+ }
+
+
+ @Override
+ public String getServiceName() {
+ return GroupTransferService.class.getSimpleName();
+ }
+ }
+
+ class HAClient extends ServiceThread {
+ private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
+ private final AtomicReference<String> masterAddress = new AtomicReference<String>();
+ private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
+ private SocketChannel socketChannel;
+ private Selector selector;
+ private long lastWriteTimestamp = System.currentTimeMillis();
+
+ private long currentReportedOffset = 0;
+ private int dispatchPostion = 0;
+ private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+ private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+
+
+ public HAClient() throws IOException {
+ this.selector = RemotingUtil.openSelector();
+ }
+
+
+ public void updateMasterAddress(final String newAddr) {
+ String currentAddr = this.masterAddress.get();
+ if (currentAddr == null || !currentAddr.equals(newAddr)) {
+ this.masterAddress.set(newAddr);
+ log.info("update master address, OLD: " + currentAddr + " NEW: " + newAddr);
+ }
+ }
+
+
+ private boolean isTimeToReportOffset() {
+ long interval =
+ HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
+ boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
+ .getHaSendHeartbeatInterval();
+
+ return needHeart;
+ }
+
+
+ private boolean reportSlaveMaxOffset(final long maxOffset) {
+ this.reportOffset.position(0);
+ this.reportOffset.limit(8);
+ this.reportOffset.putLong(maxOffset);
+ this.reportOffset.position(0);
+ this.reportOffset.limit(8);
+
+ for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
+ try {
+ this.socketChannel.write(this.reportOffset);
+ } catch (IOException e) {
+ log.error(this.getServiceName()
+ + "reportSlaveMaxOffset this.socketChannel.write exception", e);
+ return false;
+ }
+ }
+
+ return !this.reportOffset.hasRemaining();
+ }
+
+
+ // private void reallocateByteBuffer() {
+ // ByteBuffer bb = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+ // int remain = this.byteBufferRead.limit() - this.dispatchPostion;
+ // bb.put(this.byteBufferRead.array(), this.dispatchPostion, remain);
+ // this.dispatchPostion = 0;
+ // this.byteBufferRead = bb;
+ // }
+
+ /**
+
+ */
+ private void reallocateByteBuffer() {
+ int remain = READ_MAX_BUFFER_SIZE - this.dispatchPostion;
+ if (remain > 0) {
+ this.byteBufferRead.position(this.dispatchPostion);
+
+ this.byteBufferBackup.position(0);
+ this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
+ this.byteBufferBackup.put(this.byteBufferRead);
+ }
+
+ this.swapByteBuffer();
+
+ this.byteBufferRead.position(remain);
+ this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
+ this.dispatchPostion = 0;
+ }
+
+
+ private void swapByteBuffer() {
+ ByteBuffer tmp = this.byteBufferRead;
+ this.byteBufferRead = this.byteBufferBackup;
+ this.byteBufferBackup = tmp;
+ }
+
+
+ private boolean processReadEvent() {
+ int readSizeZeroTimes = 0;
+ while (this.byteBufferRead.hasRemaining()) {
+ try {
+ int readSize = this.socketChannel.read(this.byteBufferRead);
+ if (readSize > 0) {
+ lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
+ readSizeZeroTimes = 0;
+ boolean result = this.dispatchReadRequest();
+ if (!result) {
+ log.error("HAClient, dispatchReadRequest error");
+ return false;
+ }
+ } else if (readSize == 0) {
+ if (++readSizeZeroTimes >= 3) {
+ break;
+ }
+ } else {
+ // TODO ERROR
+ log.info("HAClient, processReadEvent read socket < 0");
+ return false;
+ }
+ } catch (IOException e) {
+ log.info("HAClient, processReadEvent read socket exception", e);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+
+ private boolean dispatchReadRequest() {
+ final int msgHeaderSize = 8 + 4; // phyoffset + size
+ int readSocketPos = this.byteBufferRead.position();
+
+ while (true) {
+ int diff = this.byteBufferRead.position() - this.dispatchPostion;
+ if (diff >= msgHeaderSize) {
+ long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
+ int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
+
+ long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
+
+
+ if (slavePhyOffset != 0) {
+ if (slavePhyOffset != masterPhyOffset) {
+ log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ + slavePhyOffset + " MASTER: " + masterPhyOffset);
+ return false;
+ }
+ }
+
+
+ if (diff >= (msgHeaderSize + bodySize)) {
+ byte[] bodyData = new byte[bodySize];
+ this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
+ this.byteBufferRead.get(bodyData);
+
+
+ HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
+
+ this.byteBufferRead.position(readSocketPos);
+ this.dispatchPostion += msgHeaderSize + bodySize;
+
+ if (!reportSlaveMaxOffsetPlus()) {
+ return false;
+ }
+
+ continue;
+ }
+ }
+
+ if (!this.byteBufferRead.hasRemaining()) {
+ this.reallocateByteBuffer();
+ }
+
+ break;
+ }
+
+ return true;
+ }
+
+
+ private boolean reportSlaveMaxOffsetPlus() {
+ boolean result = true;
+ long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
+ if (currentPhyOffset > this.currentReportedOffset) {
+ this.currentReportedOffset = currentPhyOffset;
+ result = this.reportSlaveMaxOffset(this.currentReportedOffset);
+ if (!result) {
+ this.closeMaster();
+ log.error("HAClient, reportSlaveMaxOffset error, " + this.currentReportedOffset);
+ }
+ }
+
+ return result;
+ }
+
+
+ private boolean connectMaster() throws ClosedChannelException {
+ if (null == socketChannel) {
+ String addr = this.masterAddress.get();
+ if (addr != null) {
+
+ SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
+ if (socketAddress != null) {
+ this.socketChannel = RemotingUtil.connect(socketAddress);
+ if (this.socketChannel != null) {
+ this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+ }
+ }
+ }
+
+ this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
+
+ this.lastWriteTimestamp = System.currentTimeMillis();
+ }
+
+ return this.socketChannel != null;
+ }
+
+
+ private void closeMaster() {
+ if (null != this.socketChannel) {
+ try {
+
+ SelectionKey sk = this.socketChannel.keyFor(this.selector);
+ if (sk != null) {
+ sk.cancel();
+ }
+
+ this.socketChannel.close();
+
+ this.socketChannel = null;
+ } catch (IOException e) {
+ log.warn("closeMaster exception. ", e);
+ }
+
+ this.lastWriteTimestamp = 0;
+ this.dispatchPostion = 0;
+
+ this.byteBufferBackup.position(0);
+ this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
+
+ this.byteBufferRead.position(0);
+ this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
+ }
+ }
+
+
+ @Override
+ public void run() {
+ log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ if (this.connectMaster()) {
+
+ if (this.isTimeToReportOffset()) {
+ boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
+ if (!result) {
+ this.closeMaster();
+ }
+ }
+
+
+ this.selector.select(1000);
+
+
+ boolean ok = this.processReadEvent();
+ if (!ok) {
+ this.closeMaster();
+ }
+
+ if (!reportSlaveMaxOffsetPlus()) {
+ continue;
+ }
+
+
+ long interval =
+ HAService.this.getDefaultMessageStore().getSystemClock().now()
+ - this.lastWriteTimestamp;
+ if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
+ .getHaHousekeepingInterval()) {
+ log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ + "] expired, " + interval);
+ this.closeMaster();
+ log.warn("HAClient, master not response some time, so close connection");
+ }
+ } else {
+ this.waitForRunning(1000 * 5);
+ }
+ } catch (Exception e) {
+ log.warn(this.getServiceName() + " service has exception. ", e);
+ this.waitForRunning(1000 * 5);
+ }
+ }
+
+ log.info(this.getServiceName() + " service end");
+ }
+
+
+ //
+ // private void disableWriteFlag() {
+ // if (this.socketChannel != null) {
+ // SelectionKey sk = this.socketChannel.keyFor(this.selector);
+ // if (sk != null) {
+ // int ops = sk.interestOps();
+ // ops &= ~SelectionKey.OP_WRITE;
+ // sk.interestOps(ops);
+ // }
+ // }
+ // }
+ //
+ //
+ // private void enableWriteFlag() {
+ // if (this.socketChannel != null) {
+ // SelectionKey sk = this.socketChannel.keyFor(this.selector);
+ // if (sk != null) {
+ // int ops = sk.interestOps();
+ // ops |= SelectionKey.OP_WRITE;
+ // sk.interestOps(ops);
+ // }
+ // }
+ // }
+
+ @Override
+ public String getServiceName() {
+ return HAClient.class.getSimpleName();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/WaitNotifyObject.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/WaitNotifyObject.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/WaitNotifyObject.java
new file mode 100644
index 0000000..f540cdb
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/WaitNotifyObject.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.ha;
+
+import java.util.HashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class WaitNotifyObject {
+
+ protected final HashMap<Long/* thread id */, Boolean/* notified */> waitingThreadTable =
+ new HashMap<Long, Boolean>(16);
+
+ protected volatile boolean hasNotified = false;
+
+
+ public void wakeup() {
+ synchronized (this) {
+ if (!this.hasNotified) {
+ this.hasNotified = true;
+ this.notify();
+ }
+ }
+ }
+
+
+ protected void waitForRunning(long interval) {
+ synchronized (this) {
+ if (this.hasNotified) {
+ this.hasNotified = false;
+ this.onWaitEnd();
+ return;
+ }
+
+ try {
+ this.wait(interval);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ this.hasNotified = false;
+ this.onWaitEnd();
+ }
+ }
+ }
+
+
+ protected void onWaitEnd() {
+ }
+
+ public void wakeupAll() {
+ synchronized (this) {
+ boolean needNotify = false;
+
+ for (Boolean value : this.waitingThreadTable.values()) {
+ needNotify = needNotify || !value;
+ value = true;
+ }
+
+ if (needNotify) {
+ this.notifyAll();
+ }
+ }
+ }
+
+ public void allWaitForRunning(long interval) {
+ long currentThreadId = Thread.currentThread().getId();
+ synchronized (this) {
+ Boolean notified = this.waitingThreadTable.get(currentThreadId);
+ if (notified != null && notified) {
+ this.waitingThreadTable.put(currentThreadId, false);
+ this.onWaitEnd();
+ return;
+ }
+
+ try {
+ this.wait(interval);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ this.waitingThreadTable.put(currentThreadId, false);
+ this.onWaitEnd();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
new file mode 100644
index 0000000..f353320
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.index;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.store.MappedFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class IndexFile {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ private static int hashSlotSize = 4;
+ private static int indexSize = 20;
+ private static int invalidIndex = 0;
+ private final int hashSlotNum;
+ private final int indexNum;
+ private final MappedFile mappedFile;
+ private final FileChannel fileChannel;
+ private final MappedByteBuffer mappedByteBuffer;
+ private final IndexHeader indexHeader;
+
+
+ public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
+ final long endPhyOffset, final long endTimestamp) throws IOException {
+ int fileTotalSize =
+ IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
+ this.mappedFile = new MappedFile(fileName, fileTotalSize);
+ this.fileChannel = this.mappedFile.getFileChannel();
+ this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
+ this.hashSlotNum = hashSlotNum;
+ this.indexNum = indexNum;
+
+ ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
+ this.indexHeader = new IndexHeader(byteBuffer);
+
+ if (endPhyOffset > 0) {
+ this.indexHeader.setBeginPhyOffset(endPhyOffset);
+ this.indexHeader.setEndPhyOffset(endPhyOffset);
+ }
+
+ if (endTimestamp > 0) {
+ this.indexHeader.setBeginTimestamp(endTimestamp);
+ this.indexHeader.setEndTimestamp(endTimestamp);
+ }
+ }
+
+
+ public String getFileName() {
+ return this.mappedFile.getFileName();
+ }
+
+
+ public void load() {
+ this.indexHeader.load();
+ }
+
+
+ public void flush() {
+ long beginTime = System.currentTimeMillis();
+ if (this.mappedFile.hold()) {
+ this.indexHeader.updateByteBuffer();
+ this.mappedByteBuffer.force();
+ this.mappedFile.release();
+ log.info("flush index file eclipse time(ms) " + (System.currentTimeMillis() - beginTime));
+ }
+ }
+
+ public boolean isWriteFull() {
+ return this.indexHeader.getIndexCount() >= this.indexNum;
+ }
+
+
+ public boolean destroy(final long intervalForcibly) {
+ return this.mappedFile.destroy(intervalForcibly);
+ }
+
+ public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
+ if (this.indexHeader.getIndexCount() < this.indexNum) {
+ int keyHash = indexKeyHashMethod(key);
+ int slotPos = keyHash % this.hashSlotNum;
+ int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
+
+ FileLock fileLock = null;
+
+ try {
+
+ // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
+ // false);
+ int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
+ if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
+ slotValue = invalidIndex;
+ }
+
+ long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
+
+
+ timeDiff = timeDiff / 1000;
+
+
+ if (this.indexHeader.getBeginTimestamp() <= 0) {
+ timeDiff = 0;
+ } else if (timeDiff > Integer.MAX_VALUE) {
+ timeDiff = Integer.MAX_VALUE;
+ } else if (timeDiff < 0) {
+ timeDiff = 0;
+ }
+
+ int absIndexPos =
+ IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ + this.indexHeader.getIndexCount() * indexSize;
+
+
+ this.mappedByteBuffer.putInt(absIndexPos, keyHash);
+ this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
+ this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
+ this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
+
+
+ this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
+
+
+ if (this.indexHeader.getIndexCount() <= 1) {
+ this.indexHeader.setBeginPhyOffset(phyOffset);
+ this.indexHeader.setBeginTimestamp(storeTimestamp);
+ }
+
+ this.indexHeader.incHashSlotCount();
+ this.indexHeader.incIndexCount();
+ this.indexHeader.setEndPhyOffset(phyOffset);
+ this.indexHeader.setEndTimestamp(storeTimestamp);
+
+ return true;
+ } catch (Exception e) {
+ log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
+ } finally {
+ if (fileLock != null) {
+ try {
+ fileLock.release();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ } else {
+ log.warn("putKey index count " + this.indexHeader.getIndexCount() + " index max num "
+ + this.indexNum);
+ }
+
+ return false;
+ }
+
+ public int indexKeyHashMethod(final String key) {
+ int keyHash = key.hashCode();
+ int keyHashPositive = Math.abs(keyHash);
+ if (keyHashPositive < 0)
+ keyHashPositive = 0;
+ return keyHashPositive;
+ }
+
+ public long getBeginTimestamp() {
+ return this.indexHeader.getBeginTimestamp();
+ }
+
+ public long getEndTimestamp() {
+ return this.indexHeader.getEndTimestamp();
+ }
+
+ public long getEndPhyOffset() {
+ return this.indexHeader.getEndPhyOffset();
+ }
+
+ public boolean isTimeMatched(final long begin, final long end) {
+ boolean result =
+ begin < this.indexHeader.getBeginTimestamp() && end > this.indexHeader.getEndTimestamp();
+
+ result =
+ result
+ || (begin >= this.indexHeader.getBeginTimestamp() && begin <= this.indexHeader
+ .getEndTimestamp());
+
+ result =
+ result
+ || (end >= this.indexHeader.getBeginTimestamp() && end <= this.indexHeader
+ .getEndTimestamp());
+ return result;
+ }
+
+ public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
+ final long begin, final long end, boolean lock) {
+ if (this.mappedFile.hold()) {
+ int keyHash = indexKeyHashMethod(key);
+ int slotPos = keyHash % this.hashSlotNum;
+ int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
+
+ FileLock fileLock = null;
+ try {
+ if (lock) {
+ // fileLock = this.fileChannel.lock(absSlotPos,
+ // hashSlotSize, true);
+ }
+
+ int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
+ // if (fileLock != null) {
+ // fileLock.release();
+ // fileLock = null;
+ // }
+
+ if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
+ || this.indexHeader.getIndexCount() <= 1) {
+ // TODO NOTFOUND
+ } else {
+ for (int nextIndexToRead = slotValue;;) {
+ if (phyOffsets.size() >= maxNum) {
+ break;
+ }
+
+ int absIndexPos =
+ IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ + nextIndexToRead * indexSize;
+
+ int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
+ long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
+
+ long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
+ int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
+
+
+ if (timeDiff < 0) {
+ break;
+ }
+
+ timeDiff *= 1000L;
+
+ long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
+ boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
+
+ if (keyHash == keyHashRead && timeMatched) {
+ phyOffsets.add(phyOffsetRead);
+ }
+
+ if (prevIndexRead <= invalidIndex
+ || prevIndexRead > this.indexHeader.getIndexCount()
+ || prevIndexRead == nextIndexToRead || timeRead < begin) {
+ break;
+ }
+
+ nextIndexToRead = prevIndexRead;
+ }
+ }
+ } catch (Exception e) {
+ log.error("selectPhyOffset exception ", e);
+ } finally {
+ if (fileLock != null) {
+ try {
+ fileLock.release();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ this.mappedFile.release();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexHeader.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexHeader.java
new file mode 100644
index 0000000..d6015e3
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexHeader.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.index;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+
+ *
+ * @author shijia.wxr
+ *
+ */
+public class IndexHeader {
+ public static final int INDEX_HEADER_SIZE = 40;
+ private static int beginTimestampIndex = 0;
+ private static int endTimestampIndex = 8;
+ private static int beginPhyoffsetIndex = 16;
+ private static int endPhyoffsetIndex = 24;
+ private static int hashSlotcountIndex = 32;
+ private static int indexCountIndex = 36;
+ private final ByteBuffer byteBuffer;
+ private AtomicLong beginTimestamp = new AtomicLong(0);
+ private AtomicLong endTimestamp = new AtomicLong(0);
+ private AtomicLong beginPhyOffset = new AtomicLong(0);
+ private AtomicLong endPhyOffset = new AtomicLong(0);
+ private AtomicInteger hashSlotCount = new AtomicInteger(0);
+
+ private AtomicInteger indexCount = new AtomicInteger(1);
+
+
+ public IndexHeader(final ByteBuffer byteBuffer) {
+ this.byteBuffer = byteBuffer;
+ }
+
+
+ public void load() {
+ this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex));
+ this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex));
+ this.beginPhyOffset.set(byteBuffer.getLong(beginPhyoffsetIndex));
+ this.endPhyOffset.set(byteBuffer.getLong(endPhyoffsetIndex));
+
+ this.hashSlotCount.set(byteBuffer.getInt(hashSlotcountIndex));
+ this.indexCount.set(byteBuffer.getInt(indexCountIndex));
+
+ if (this.indexCount.get() <= 0) {
+ this.indexCount.set(1);
+ }
+ }
+
+ public void updateByteBuffer() {
+ this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get());
+ this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get());
+ this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get());
+ this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get());
+ this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get());
+ this.byteBuffer.putInt(indexCountIndex, this.indexCount.get());
+ }
+
+
+ public long getBeginTimestamp() {
+ return beginTimestamp.get();
+ }
+
+
+ public void setBeginTimestamp(long beginTimestamp) {
+ this.beginTimestamp.set(beginTimestamp);
+ this.byteBuffer.putLong(beginTimestampIndex, beginTimestamp);
+ }
+
+
+ public long getEndTimestamp() {
+ return endTimestamp.get();
+ }
+
+
+ public void setEndTimestamp(long endTimestamp) {
+ this.endTimestamp.set(endTimestamp);
+ this.byteBuffer.putLong(endTimestampIndex, endTimestamp);
+ }
+
+
+ public long getBeginPhyOffset() {
+ return beginPhyOffset.get();
+ }
+
+
+ public void setBeginPhyOffset(long beginPhyOffset) {
+ this.beginPhyOffset.set(beginPhyOffset);
+ this.byteBuffer.putLong(beginPhyoffsetIndex, beginPhyOffset);
+ }
+
+
+ public long getEndPhyOffset() {
+ return endPhyOffset.get();
+ }
+
+
+ public void setEndPhyOffset(long endPhyOffset) {
+ this.endPhyOffset.set(endPhyOffset);
+ this.byteBuffer.putLong(endPhyoffsetIndex, endPhyOffset);
+ }
+
+
+ public AtomicInteger getHashSlotCount() {
+ return hashSlotCount;
+ }
+
+
+ public void incHashSlotCount() {
+ int value = this.hashSlotCount.incrementAndGet();
+ this.byteBuffer.putInt(hashSlotcountIndex, value);
+ }
+
+
+ public int getIndexCount() {
+ return indexCount.get();
+ }
+
+
+ public void incIndexCount() {
+ int value = this.indexCount.incrementAndGet();
+ this.byteBuffer.putInt(indexCountIndex, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
new file mode 100644
index 0000000..f275f80
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.index;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.message.MessageConst;
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+import com.alibaba.rocketmq.store.DefaultMessageStore;
+import com.alibaba.rocketmq.store.DispatchRequest;
+import com.alibaba.rocketmq.store.config.StorePathConfigHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class IndexService {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ private final DefaultMessageStore defaultMessageStore;
+
+ private final int hashSlotNum;
+ private final int indexNum;
+ private final String storePath;
+
+ private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>();
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+
+ public IndexService(final DefaultMessageStore store) {
+ this.defaultMessageStore = store;
+ this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
+ this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
+ this.storePath =
+ StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
+ }
+
+
+ public boolean load(final boolean lastExitOK) {
+ File dir = new File(this.storePath);
+ File[] files = dir.listFiles();
+ if (files != null) {
+ // ascending order
+ Arrays.sort(files);
+ for (File file : files) {
+ try {
+ IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);
+ f.load();
+
+ if (!lastExitOK) {
+ if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
+ .getIndexMsgTimestamp()) {
+ f.destroy(0);
+ continue;
+ }
+ }
+
+ log.info("load index file OK, " + f.getFileName());
+ this.indexFileList.add(f);
+ } catch (IOException e) {
+ log.error("load file " + file + " error", e);
+ return false;
+ } catch (NumberFormatException e) {
+ continue;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ public void deleteExpiredFile(long offset) {
+ Object[] files = null;
+ try {
+ this.readWriteLock.readLock().lock();
+ if (this.indexFileList.isEmpty()) {
+ return;
+ }
+
+ long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
+ if (endPhyOffset < offset) {
+ files = this.indexFileList.toArray();
+ }
+ } catch (Exception e) {
+ log.error("destroy exception", e);
+ } finally {
+ this.readWriteLock.readLock().unlock();
+ }
+
+ if (files != null) {
+ List<IndexFile> fileList = new ArrayList<IndexFile>();
+ for (int i = 0; i < (files.length - 1); i++) {
+ IndexFile f = (IndexFile) files[i];
+ if (f.getEndPhyOffset() < offset) {
+ fileList.add(f);
+ } else {
+ break;
+ }
+ }
+
+ this.deleteExpiredFile(fileList);
+ }
+ }
+
+ private void deleteExpiredFile(List<IndexFile> files) {
+ if (!files.isEmpty()) {
+ try {
+ this.readWriteLock.writeLock().lock();
+ for (IndexFile file : files) {
+ boolean destroyed = file.destroy(3000);
+ destroyed = destroyed && this.indexFileList.remove(file);
+ if (!destroyed) {
+ log.error("deleteExpiredFile remove failed.");
+ break;
+ }
+ }
+ } catch (Exception e) {
+ log.error("deleteExpiredFile has exception.", e);
+ } finally {
+ this.readWriteLock.writeLock().unlock();
+ }
+ }
+ }
+
+
+ public void destroy() {
+ try {
+ this.readWriteLock.readLock().lock();
+ for (IndexFile f : this.indexFileList) {
+ f.destroy(1000 * 3);
+ }
+ this.indexFileList.clear();
+ } catch (Exception e) {
+ log.error("destroy exception", e);
+ } finally {
+ this.readWriteLock.readLock().unlock();
+ }
+ }
+
+
+ public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
+ List<Long> phyOffsets = new ArrayList<Long>(maxNum);
+
+ long indexLastUpdateTimestamp = 0;
+ long indexLastUpdatePhyoffset = 0;
+ maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
+ try {
+ this.readWriteLock.readLock().lock();
+ if (!this.indexFileList.isEmpty()) {
+ for (int i = this.indexFileList.size(); i > 0; i--) {
+ IndexFile f = this.indexFileList.get(i - 1);
+ boolean lastFile = i == this.indexFileList.size();
+ if (lastFile) {
+ indexLastUpdateTimestamp = f.getEndTimestamp();
+ indexLastUpdatePhyoffset = f.getEndPhyOffset();
+ }
+
+ if (f.isTimeMatched(begin, end)) {
+
+ f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
+ }
+
+
+ if (f.getBeginTimestamp() < begin) {
+ break;
+ }
+
+ if (phyOffsets.size() >= maxNum) {
+ break;
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("queryMsg exception", e);
+ } finally {
+ this.readWriteLock.readLock().unlock();
+ }
+
+ return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
+ }
+
+
+ private String buildKey(final String topic, final String key) {
+ return topic + "#" + key;
+ }
+
+
+ public void buildIndex(DispatchRequest req) {
+ IndexFile indexFile = retryGetAndCreateIndexFile();
+ if (indexFile != null) {
+ long endPhyOffset = indexFile.getEndPhyOffset();
+ DispatchRequest msg = req;
+ String topic = msg.getTopic();
+ String keys = msg.getKeys();
+ if (msg.getCommitLogOffset() < endPhyOffset) {
+ return;
+ }
+
+ final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
+ switch (tranType) {
+ case MessageSysFlag.TRANSACTION_NOT_TYPE:
+ case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+ case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+ break;
+ case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+ return;
+ }
+
+ if (req.getUniqKey() != null) {
+ indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
+ if (indexFile == null) {
+ log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
+ return;
+ }
+ }
+
+ if (keys != null && keys.length() > 0) {
+ String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
+ for (int i = 0; i < keyset.length; i++) {
+ String key = keyset[i];
+ if (key.length() > 0) {
+ indexFile = putKey(indexFile, msg, buildKey(topic, key));
+ if (indexFile == null) {
+ log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
+ return;
+ }
+ }
+ }
+ }
+ } else {
+ log.error("build index error, stop building index");
+ }
+ }
+
+
+ private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
+ for (boolean ok =
+ indexFile.putKey(idxKey, msg.getCommitLogOffset(),
+ msg.getStoreTimestamp()); !ok; ) {
+ log.warn("index file full, so create another one, " + indexFile.getFileName());
+ indexFile = retryGetAndCreateIndexFile();
+ if (null == indexFile) {
+ return null;
+ }
+
+ ok =
+ indexFile.putKey(idxKey, msg.getCommitLogOffset(),
+ msg.getStoreTimestamp());
+ }
+ return indexFile;
+ }
+
+
+ public IndexFile retryGetAndCreateIndexFile() {
+ IndexFile indexFile = null;
+
+
+ for (int times = 0; null == indexFile && times < 3; times++) {
+ indexFile = this.getAndCreateLastIndexFile();
+ if (null != indexFile)
+ break;
+
+ try {
+ log.error("try to create index file, " + times + " times");
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+
+ if (null == indexFile) {
+ this.defaultMessageStore.getAccessRights().makeIndexFileError();
+ log.error("mark index file can not build flag");
+ }
+
+ return indexFile;
+ }
+
+
+ public IndexFile getAndCreateLastIndexFile() {
+ IndexFile indexFile = null;
+ IndexFile prevIndexFile = null;
+ long lastUpdateEndPhyOffset = 0;
+ long lastUpdateIndexTimestamp = 0;
+
+ {
+ this.readWriteLock.readLock().lock();
+ if (!this.indexFileList.isEmpty()) {
+ IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
+ if (!tmp.isWriteFull()) {
+ indexFile = tmp;
+ } else {
+ lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
+ lastUpdateIndexTimestamp = tmp.getEndTimestamp();
+ prevIndexFile = tmp;
+ }
+ }
+
+ this.readWriteLock.readLock().unlock();
+ }
+
+
+ if (indexFile == null) {
+ try {
+ String fileName =
+ this.storePath + File.separator
+ + UtilAll.timeMillisToHumanString(System.currentTimeMillis());
+ indexFile =
+ new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
+ lastUpdateIndexTimestamp);
+ this.readWriteLock.writeLock().lock();
+ this.indexFileList.add(indexFile);
+ } catch (Exception e) {
+ log.error("getLastIndexFile exception ", e);
+ } finally {
+ this.readWriteLock.writeLock().unlock();
+ }
+
+
+ if (indexFile != null) {
+ final IndexFile flushThisFile = prevIndexFile;
+ Thread flushThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ IndexService.this.flush(flushThisFile);
+ }
+ }, "FlushIndexFileThread");
+
+ flushThread.setDaemon(true);
+ flushThread.start();
+ }
+ }
+
+ return indexFile;
+ }
+
+
+ public void flush(final IndexFile f) {
+ if (null == f)
+ return;
+
+ long indexMsgTimestamp = 0;
+
+ if (f.isWriteFull()) {
+ indexMsgTimestamp = f.getEndTimestamp();
+ }
+
+ f.flush();
+
+ if (indexMsgTimestamp > 0) {
+ this.defaultMessageStore.getStoreCheckpoint().setIndexMsgTimestamp(indexMsgTimestamp);
+ this.defaultMessageStore.getStoreCheckpoint().flush();
+ }
+ }
+
+
+ public void start() {
+
+ }
+
+
+ public void shutdown() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/QueryOffsetResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/QueryOffsetResult.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/QueryOffsetResult.java
new file mode 100644
index 0000000..89d0755
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/QueryOffsetResult.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.index;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class QueryOffsetResult {
+ private final List<Long> phyOffsets;
+ private final long indexLastUpdateTimestamp;
+ private final long indexLastUpdatePhyoffset;
+
+
+ public QueryOffsetResult(List<Long> phyOffsets, long indexLastUpdateTimestamp,
+ long indexLastUpdatePhyoffset) {
+ this.phyOffsets = phyOffsets;
+ this.indexLastUpdateTimestamp = indexLastUpdateTimestamp;
+ this.indexLastUpdatePhyoffset = indexLastUpdatePhyoffset;
+ }
+
+
+ public List<Long> getPhyOffsets() {
+ return phyOffsets;
+ }
+
+
+ public long getIndexLastUpdateTimestamp() {
+ return indexLastUpdateTimestamp;
+ }
+
+
+ public long getIndexLastUpdatePhyoffset() {
+ return indexLastUpdatePhyoffset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
new file mode 100644
index 0000000..5f4720d
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.schedule;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class DelayOffsetSerializeWrapper extends RemotingSerializable {
+ private ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable =
+ new ConcurrentHashMap<Integer, Long>(32);
+
+
+ public ConcurrentHashMap<Integer, Long> getOffsetTable() {
+ return offsetTable;
+ }
+
+
+ public void setOffsetTable(ConcurrentHashMap<Integer, Long> offsetTable) {
+ this.offsetTable = offsetTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageService.java
new file mode 100644
index 0000000..e243a7e
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageService.java
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.schedule;
+
+import com.alibaba.rocketmq.common.ConfigManager;
+import com.alibaba.rocketmq.common.TopicFilterType;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.message.MessageAccessor;
+import com.alibaba.rocketmq.common.message.MessageConst;
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.running.RunningStats;
+import com.alibaba.rocketmq.store.*;
+import com.alibaba.rocketmq.store.config.StorePathConfigHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class ScheduleMessageService extends ConfigManager {
+ public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ private static final long FIRST_DELAY_TIME = 1000L;
+ private static final long DELAY_FOR_A_WHILE = 100L;
+ private static final long DELAY_FOR_A_PERIOD = 10000L;
+
+ private final ConcurrentHashMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
+ new ConcurrentHashMap<Integer, Long>(32);
+
+ private final ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable =
+ new ConcurrentHashMap<Integer, Long>(32);
+
+ private final Timer timer = new Timer("ScheduleMessageTimerThread", true);
+
+ private final DefaultMessageStore defaultMessageStore;
+
+ private int maxDelayLevel;
+
+
+ public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) {
+ this.defaultMessageStore = defaultMessageStore;
+ }
+
+ public static int queueId2DelayLevel(final int queueId) {
+ return queueId + 1;
+ }
+
+ public void buildRunningStats(HashMap<String, String> stats) {
+ Iterator<Entry<Integer, Long>> it = this.offsetTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Integer, Long> next = it.next();
+ int queueId = delayLevel2QueueId(next.getKey());
+ long delayOffset = next.getValue();
+ long maxOffset = this.defaultMessageStore.getMaxOffsetInQuque(SCHEDULE_TOPIC, queueId);
+ String value = String.format("%d,%d", delayOffset, maxOffset);
+ String key = String.format("%s_%d", RunningStats.scheduleMessageOffset.name(), next.getKey());
+ stats.put(key, value);
+ }
+ }
+
+ public static int delayLevel2QueueId(final int delayLevel) {
+ return delayLevel - 1;
+ }
+
+
+ private void updateOffset(int delayLevel, long offset) {
+ this.offsetTable.put(delayLevel, offset);
+ }
+
+
+ public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
+ Long time = this.delayLevelTable.get(delayLevel);
+ if (time != null) {
+ return time + storeTimestamp;
+ }
+
+ return storeTimestamp + 1000;
+ }
+
+
+ public void start() {
+
+ for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
+ Integer level = entry.getKey();
+ Long timeDelay = entry.getValue();
+ Long offset = this.offsetTable.get(level);
+ if (null == offset) {
+ offset = 0L;
+ }
+
+ if (timeDelay != null) {
+ this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
+ }
+ }
+
+ this.timer.scheduleAtFixedRate(new TimerTask() {
+
+ @Override
+ public void run() {
+ try {
+ ScheduleMessageService.this.persist();
+ } catch (Exception e) {
+ log.error("scheduleAtFixedRate flush exception", e);
+ }
+ }
+ }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
+ }
+
+
+ public void shutdown() {
+ this.timer.cancel();
+ }
+
+
+ public int getMaxDelayLevel() {
+ return maxDelayLevel;
+ }
+
+
+ public String encode() {
+ return this.encode(false);
+ }
+
+ public boolean load() {
+ boolean result = super.load();
+ result = result && this.parseDelayLevel();
+ return result;
+ }
+
+ @Override
+ public String configFilePath() {
+ return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig()
+ .getStorePathRootDir());
+ }
+
+ @Override
+ public void decode(String jsonString) {
+ if (jsonString != null) {
+ DelayOffsetSerializeWrapper delayOffsetSerializeWrapper =
+ DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class);
+ if (delayOffsetSerializeWrapper != null) {
+ this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable());
+ }
+ }
+ }
+
+ public String encode(final boolean prettyFormat) {
+ DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new DelayOffsetSerializeWrapper();
+ delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);
+ return delayOffsetSerializeWrapper.toJson(prettyFormat);
+ }
+
+ public boolean parseDelayLevel() {
+ HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
+ timeUnitTable.put("s", 1000L);
+ timeUnitTable.put("m", 1000L * 60);
+ timeUnitTable.put("h", 1000L * 60 * 60);
+ timeUnitTable.put("d", 1000L * 60 * 60 * 24);
+
+ String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
+ try {
+ String[] levelArray = levelString.split(" ");
+ for (int i = 0; i < levelArray.length; i++) {
+ String value = levelArray[i];
+ String ch = value.substring(value.length() - 1);
+ Long tu = timeUnitTable.get(ch);
+
+ int level = i + 1;
+ if (level > this.maxDelayLevel) {
+ this.maxDelayLevel = level;
+ }
+ long num = Long.parseLong(value.substring(0, value.length() - 1));
+ long delayTimeMillis = tu * num;
+ this.delayLevelTable.put(level, delayTimeMillis);
+ }
+ } catch (Exception e) {
+ log.error("parseDelayLevel exception", e);
+ log.info("levelString String = {}", levelString);
+ return false;
+ }
+
+ return true;
+ }
+
+ class DeliverDelayedMessageTimerTask extends TimerTask {
+ private final int delayLevel;
+ private final long offset;
+
+
+ public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
+ this.delayLevel = delayLevel;
+ this.offset = offset;
+ }
+
+
+ @Override
+ public void run() {
+ try {
+ this.executeOnTimeup();
+ } catch (Exception e) {
+ // XXX: warn and notify me
+ log.error("ScheduleMessageService, executeOnTimeup exception", e);
+ ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
+ this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
+ }
+ }
+
+
+ /**
+
+ *
+ * @return
+ */
+ private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
+
+ long result = deliverTimestamp;
+
+ long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
+ if (deliverTimestamp > maxTimestamp) {
+ result = now;
+ }
+
+ return result;
+ }
+
+
+ public void executeOnTimeup() {
+ ConsumeQueue cq =
+ ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
+ delayLevel2QueueId(delayLevel));
+
+ long failScheduleOffset = offset;
+
+ if (cq != null) {
+ SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
+ if (bufferCQ != null) {
+ try {
+ long nextOffset = offset;
+ int i = 0;
+ for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+ long offsetPy = bufferCQ.getByteBuffer().getLong();
+ int sizePy = bufferCQ.getByteBuffer().getInt();
+ long tagsCode = bufferCQ.getByteBuffer().getLong();
+
+
+ long now = System.currentTimeMillis();
+ long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
+
+ nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+ long countdown = deliverTimestamp - now;
+
+ if (countdown <= 0) {
+ MessageExt msgExt =
+ ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
+ offsetPy, sizePy);
+
+ if (msgExt != null) {
+ try {
+ MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
+ PutMessageResult putMessageResult =
+ ScheduleMessageService.this.defaultMessageStore
+ .putMessage(msgInner);
+
+ if (putMessageResult != null
+ && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+ continue;
+ }
+
+ else {
+ // XXX: warn and notify me
+ log.error(
+ "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
+ msgExt.getTopic(), msgExt.getMsgId());
+ ScheduleMessageService.this.timer.schedule(
+ new DeliverDelayedMessageTimerTask(this.delayLevel,
+ nextOffset), DELAY_FOR_A_PERIOD);
+ ScheduleMessageService.this.updateOffset(this.delayLevel,
+ nextOffset);
+ return;
+ }
+ } catch (Exception e) {
+ /*
+ * XXX: warn and notify me
+
+
+
+ */
+ log.error(
+ "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ + offsetPy + ",sizePy=" + sizePy, e);
+ }
+ }
+ }
+
+ else {
+ ScheduleMessageService.this.timer.schedule(
+ new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
+ countdown);
+ ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
+ return;
+ }
+ } // end of for
+
+ nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+ ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
+ this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
+ ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
+ return;
+ } finally {
+
+ bufferCQ.release();
+ }
+ } // end of if (bufferCQ != null)
+ else {
+ /*
+
+
+ */
+ long cqMinOffset = cq.getMinOffsetInQuque();
+ if (offset < cqMinOffset) {
+ failScheduleOffset = cqMinOffset;
+ log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ + cqMinOffset + ", queueId=" + cq.getQueueId());
+ }
+ }
+ } // end of if (cq != null)
+
+ ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
+ failScheduleOffset), DELAY_FOR_A_WHILE);
+ }
+
+
+ private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
+ MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+ msgInner.setBody(msgExt.getBody());
+ msgInner.setFlag(msgExt.getFlag());
+ MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+
+ TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
+ long tagsCodeValue =
+ MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
+ msgInner.setTagsCode(tagsCodeValue);
+ msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+
+ msgInner.setSysFlag(msgExt.getSysFlag());
+ msgInner.setBornTimestamp(msgExt.getBornTimestamp());
+ msgInner.setBornHost(msgExt.getBornHost());
+ msgInner.setStoreHost(msgExt.getStoreHost());
+ msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
+
+ msgInner.setWaitStoreMsgOK(false);
+ MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+
+ msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
+
+ String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
+ int queueId = Integer.parseInt(queueIdStr);
+ msgInner.setQueueId(queueId);
+
+ return msgInner;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStats.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStats.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStats.java
new file mode 100644
index 0000000..539d4be
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStats.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.stats;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.store.DefaultMessageStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerStats {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final DefaultMessageStore defaultMessageStore;
+
+ private volatile long msgPutTotalYesterdayMorning;
+
+ private volatile long msgPutTotalTodayMorning;
+
+ private volatile long msgGetTotalYesterdayMorning;
+
+ private volatile long msgGetTotalTodayMorning;
+
+
+ public BrokerStats(DefaultMessageStore defaultMessageStore) {
+ this.defaultMessageStore = defaultMessageStore;
+ }
+
+
+ /**
+
+ */
+ public void record() {
+ this.msgPutTotalYesterdayMorning = this.msgPutTotalTodayMorning;
+ this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning;
+
+ this.msgPutTotalTodayMorning =
+ this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
+ this.msgGetTotalTodayMorning =
+ this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get();
+
+ log.info("yesterday put message total: {}", msgPutTotalTodayMorning - msgPutTotalYesterdayMorning);
+ log.info("yesterday get message total: {}", msgGetTotalTodayMorning - msgGetTotalYesterdayMorning);
+ }
+
+
+ public long getMsgPutTotalYesterdayMorning() {
+ return msgPutTotalYesterdayMorning;
+ }
+
+
+ public void setMsgPutTotalYesterdayMorning(long msgPutTotalYesterdayMorning) {
+ this.msgPutTotalYesterdayMorning = msgPutTotalYesterdayMorning;
+ }
+
+
+ public long getMsgPutTotalTodayMorning() {
+ return msgPutTotalTodayMorning;
+ }
+
+
+ public void setMsgPutTotalTodayMorning(long msgPutTotalTodayMorning) {
+ this.msgPutTotalTodayMorning = msgPutTotalTodayMorning;
+ }
+
+
+ public long getMsgGetTotalYesterdayMorning() {
+ return msgGetTotalYesterdayMorning;
+ }
+
+
+ public void setMsgGetTotalYesterdayMorning(long msgGetTotalYesterdayMorning) {
+ this.msgGetTotalYesterdayMorning = msgGetTotalYesterdayMorning;
+ }
+
+
+ public long getMsgGetTotalTodayMorning() {
+ return msgGetTotalTodayMorning;
+ }
+
+
+ public void setMsgGetTotalTodayMorning(long msgGetTotalTodayMorning) {
+ this.msgGetTotalTodayMorning = msgGetTotalTodayMorning;
+ }
+
+
+ public long getMsgPutTotalTodayNow() {
+ return this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
+ }
+
+
+ public long getMsgGetTotalTodayNow() {
+ return this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get();
+ }
+}