You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:25 UTC
[08/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreStatsService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreStatsService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreStatsService.java
new file mode 100644
index 0000000..b5ed3f3
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreStatsService.java
@@ -0,0 +1,615 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.ServiceThread;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class StoreStatsService extends ServiceThread {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+ private static final int FREQUENCY_OF_SAMPLING = 1000;
+
+ private static final int MAX_RECORDS_OF_SAMPLING = 60 * 10;
+ private static final String[] PUT_MESSAGE_ENTIRE_TIME_MAX_DESC = new String[]{
+ "[<=0ms]", "[0~10ms]", "[10~50ms]", "[50~100ms]", "[100~200ms]", "[200~500ms]", "[500ms~1s]", "[1~2s]", "[2~3s]", "[3~4s]", "[4~5s]", "[5~10s]", "[10s~]",
+ };
+
+ private static int printTPSInterval = 60 * 1;
+
+ private final AtomicLong putMessageFailedTimes = new AtomicLong(0);
+
+ private final Map<String, AtomicLong> putMessageTopicTimesTotal =
+ new ConcurrentHashMap<String, AtomicLong>(128);
+ private final Map<String, AtomicLong> putMessageTopicSizeTotal =
+ new ConcurrentHashMap<String, AtomicLong>(128);
+
+ private final AtomicLong getMessageTimesTotalFound = new AtomicLong(0);
+ private final AtomicLong getMessageTransferedMsgCount = new AtomicLong(0);
+ private final AtomicLong getMessageTimesTotalMiss = new AtomicLong(0);
+ private final LinkedList<CallSnapshot> putTimesList = new LinkedList<CallSnapshot>();
+
+ private final LinkedList<CallSnapshot> getTimesFoundList = new LinkedList<CallSnapshot>();
+ private final LinkedList<CallSnapshot> getTimesMissList = new LinkedList<CallSnapshot>();
+ private final LinkedList<CallSnapshot> transferedMsgCountList = new LinkedList<CallSnapshot>();
+ private volatile AtomicLong[] putMessageDistributeTime;
+ private long messageStoreBootTimestamp = System.currentTimeMillis();
+ private volatile long putMessageEntireTimeMax = 0;
+ private volatile long getMessageEntireTimeMax = 0;
+ // for putMessageEntireTimeMax
+ private ReentrantLock lockPut = new ReentrantLock();
+ // for getMessageEntireTimeMax
+ private ReentrantLock lockGet = new ReentrantLock();
+
+ private volatile long dispatchMaxBuffer = 0;
+
+ private ReentrantLock lockSampling = new ReentrantLock();
+ private long lastPrintTimestamp = System.currentTimeMillis();
+
+
+ public StoreStatsService() {
+ this.initPutMessageDistributeTime();
+ }
+
+ private AtomicLong[] initPutMessageDistributeTime() {
+ AtomicLong[] next = new AtomicLong[13];
+ for (int i = 0; i < next.length; i++) {
+ next[i] = new AtomicLong(0);
+ }
+
+ AtomicLong[] old = this.putMessageDistributeTime;
+
+ this.putMessageDistributeTime = next;
+
+ return old;
+ }
+
+ public long getPutMessageEntireTimeMax() {
+ return putMessageEntireTimeMax;
+ }
+
+ public void setPutMessageEntireTimeMax(long value) {
+ final AtomicLong[] times = this.putMessageDistributeTime;
+
+ if (null == times) return;
+
+ // us
+ if (value <= 0) {
+ times[0].incrementAndGet();
+ } else if (value < 10) {
+ times[1].incrementAndGet();
+ } else if (value < 50) {
+ times[2].incrementAndGet();
+ } else if (value < 100) {
+ times[3].incrementAndGet();
+ } else if (value < 200) {
+ times[4].incrementAndGet();
+ } else if (value < 500) {
+ times[5].incrementAndGet();
+ } else if (value < 1000) {
+ times[6].incrementAndGet();
+ }
+ // 2s
+ else if (value < 2000) {
+ times[7].incrementAndGet();
+ }
+ // 3s
+ else if (value < 3000) {
+ times[8].incrementAndGet();
+ }
+ // 4s
+ else if (value < 4000) {
+ times[9].incrementAndGet();
+ }
+ // 5s
+ else if (value < 5000) {
+ times[10].incrementAndGet();
+ }
+ // 10s
+ else if (value < 10000) {
+ times[11].incrementAndGet();
+ } else {
+ times[12].incrementAndGet();
+ }
+
+ if (value > this.putMessageEntireTimeMax) {
+ this.lockPut.lock();
+ this.putMessageEntireTimeMax =
+ value > this.putMessageEntireTimeMax ? value : this.putMessageEntireTimeMax;
+ this.lockPut.unlock();
+ }
+ }
+
+
+ public long getGetMessageEntireTimeMax() {
+ return getMessageEntireTimeMax;
+ }
+
+
+ public void setGetMessageEntireTimeMax(long value) {
+ if (value > this.getMessageEntireTimeMax) {
+ this.lockGet.lock();
+ this.getMessageEntireTimeMax =
+ value > this.getMessageEntireTimeMax ? value : this.getMessageEntireTimeMax;
+ this.lockGet.unlock();
+ }
+ }
+
+
+ public long getDispatchMaxBuffer() {
+ return dispatchMaxBuffer;
+ }
+
+
+ public void setDispatchMaxBuffer(long value) {
+ this.dispatchMaxBuffer = value > this.dispatchMaxBuffer ? value : this.dispatchMaxBuffer;
+ }
+
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(1024);
+ Long totalTimes = getPutMessageTimesTotal();
+ if (0 == totalTimes) {
+ totalTimes = 1L;
+ }
+
+ sb.append("\truntime: " + this.getFormatRuntime() + "\r\n");
+ sb.append("\tputMessageEntireTimeMax: " + this.putMessageEntireTimeMax + "\r\n");
+ sb.append("\tputMessageTimesTotal: " + totalTimes + "\r\n");
+ sb.append("\tputMessageSizeTotal: " + this.getPutMessageSizeTotal() + "\r\n");
+ sb.append("\tputMessageDistributeTime: " + this.getPutMessageDistributeTimeStringInfo(totalTimes)
+ + "\r\n");
+ sb.append("\tputMessageAverageSize: " + (this.getPutMessageSizeTotal() / totalTimes.doubleValue())
+ + "\r\n");
+ sb.append("\tdispatchMaxBuffer: " + this.dispatchMaxBuffer + "\r\n");
+ sb.append("\tgetMessageEntireTimeMax: " + this.getMessageEntireTimeMax + "\r\n");
+ sb.append("\tputTps: " + this.getPutTps() + "\r\n");
+ sb.append("\tgetFoundTps: " + this.getGetFoundTps() + "\r\n");
+ sb.append("\tgetMissTps: " + this.getGetMissTps() + "\r\n");
+ sb.append("\tgetTotalTps: " + this.getGetTotalTps() + "\r\n");
+ sb.append("\tgetTransferedTps: " + this.getGetTransferedTps() + "\r\n");
+ return sb.toString();
+ }
+
+ public long getPutMessageTimesTotal() {
+ long rs = 0;
+ for (AtomicLong data : putMessageTopicTimesTotal.values()) {
+ rs += data.get();
+ }
+ return rs;
+ }
+
+ private String getFormatRuntime() {
+ final long millisecond = 1;
+ final long second = 1000 * millisecond;
+ final long minute = 60 * second;
+ final long hour = 60 * minute;
+ final long day = 24 * hour;
+ final MessageFormat messageFormat = new MessageFormat("[ {0} days, {1} hours, {2} minutes, {3} seconds ]");
+
+ long time = System.currentTimeMillis() - this.messageStoreBootTimestamp;
+ long days = time / day;
+ long hours = (time % day) / hour;
+ long minutes = (time % hour) / minute;
+ long seconds = (time % minute) / second;
+ return messageFormat.format(new Long[]{days, hours, minutes, seconds});
+ }
+
+ public long getPutMessageSizeTotal() {
+ long rs = 0;
+ for (AtomicLong data : putMessageTopicSizeTotal.values()) {
+ rs += data.get();
+ }
+ return rs;
+ }
+
+ private String getPutMessageDistributeTimeStringInfo(Long total) {
+ return this.putMessageDistributeTimeToString();
+ }
+
+ private String getPutTps() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(this.getPutTps(10));
+ sb.append(" ");
+
+
+ sb.append(this.getPutTps(60));
+ sb.append(" ");
+
+
+ sb.append(this.getPutTps(600));
+
+ return sb.toString();
+ }
+
+ private String getGetFoundTps() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(this.getGetFoundTps(10));
+ sb.append(" ");
+
+
+ sb.append(this.getGetFoundTps(60));
+ sb.append(" ");
+
+
+ sb.append(this.getGetFoundTps(600));
+
+ return sb.toString();
+ }
+
+ private String getGetMissTps() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(this.getGetMissTps(10));
+ sb.append(" ");
+
+
+ sb.append(this.getGetMissTps(60));
+ sb.append(" ");
+
+
+ sb.append(this.getGetMissTps(600));
+
+ return sb.toString();
+ }
+
+ private String getGetTotalTps() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(this.getGetTotalTps(10));
+ sb.append(" ");
+
+
+ sb.append(this.getGetTotalTps(60));
+ sb.append(" ");
+
+
+ sb.append(this.getGetTotalTps(600));
+
+ return sb.toString();
+ }
+
+ private String getGetTransferedTps() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(this.getGetTransferedTps(10));
+ sb.append(" ");
+
+
+ sb.append(this.getGetTransferedTps(60));
+ sb.append(" ");
+
+
+ sb.append(this.getGetTransferedTps(600));
+
+ return sb.toString();
+ }
+
+ private String putMessageDistributeTimeToString() {
+ final AtomicLong[] times = this.putMessageDistributeTime;
+ if (null == times) return null;
+
+ final StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < times.length; i++) {
+ long value = times[i].get();
+ sb.append(String.format("%s:%d", PUT_MESSAGE_ENTIRE_TIME_MAX_DESC[i], value));
+ sb.append(" ");
+ }
+
+ return sb.toString();
+ }
+
+ private String getPutTps(int time) {
+ String result = "";
+ this.lockSampling.lock();
+ try {
+ CallSnapshot last = this.putTimesList.getLast();
+
+ if (this.putTimesList.size() > time) {
+ CallSnapshot lastBefore = this.putTimesList.get(this.putTimesList.size() - (time + 1));
+ result += CallSnapshot.getTPS(lastBefore, last);
+ }
+
+ } finally {
+ this.lockSampling.unlock();
+ }
+ return result;
+ }
+
+ private String getGetFoundTps(int time) {
+ String result = "";
+ this.lockSampling.lock();
+ try {
+ CallSnapshot last = this.getTimesFoundList.getLast();
+
+ if (this.getTimesFoundList.size() > time) {
+ CallSnapshot lastBefore =
+ this.getTimesFoundList.get(this.getTimesFoundList.size() - (time + 1));
+ result += CallSnapshot.getTPS(lastBefore, last);
+ }
+ } finally {
+ this.lockSampling.unlock();
+ }
+
+ return result;
+ }
+
+ private String getGetMissTps(int time) {
+ String result = "";
+ this.lockSampling.lock();
+ try {
+ CallSnapshot last = this.getTimesMissList.getLast();
+
+ if (this.getTimesMissList.size() > time) {
+ CallSnapshot lastBefore =
+ this.getTimesMissList.get(this.getTimesMissList.size() - (time + 1));
+ result += CallSnapshot.getTPS(lastBefore, last);
+ }
+
+ } finally {
+ this.lockSampling.unlock();
+ }
+
+ return result;
+ }
+
+ private String getGetTotalTps(int time) {
+ this.lockSampling.lock();
+ double found = 0;
+ double miss = 0;
+ try {
+ {
+ CallSnapshot last = this.getTimesFoundList.getLast();
+
+ if (this.getTimesFoundList.size() > time) {
+ CallSnapshot lastBefore =
+ this.getTimesFoundList.get(this.getTimesFoundList.size() - (time + 1));
+ found = CallSnapshot.getTPS(lastBefore, last);
+ }
+ }
+ {
+ CallSnapshot last = this.getTimesMissList.getLast();
+
+ if (this.getTimesMissList.size() > time) {
+ CallSnapshot lastBefore =
+ this.getTimesMissList.get(this.getTimesMissList.size() - (time + 1));
+ miss = CallSnapshot.getTPS(lastBefore, last);
+ }
+ }
+
+ } finally {
+ this.lockSampling.unlock();
+ }
+
+ return Double.toString(found + miss);
+ }
+
+ private String getGetTransferedTps(int time) {
+ String result = "";
+ this.lockSampling.lock();
+ try {
+ CallSnapshot last = this.transferedMsgCountList.getLast();
+
+ if (this.transferedMsgCountList.size() > time) {
+ CallSnapshot lastBefore =
+ this.transferedMsgCountList.get(this.transferedMsgCountList.size() - (time + 1));
+ result += CallSnapshot.getTPS(lastBefore, last);
+ }
+
+ } finally {
+ this.lockSampling.unlock();
+ }
+
+ return result;
+ }
+
+ public HashMap<String, String> getRuntimeInfo() {
+ HashMap<String, String> result = new HashMap<String, String>(64);
+
+ Long totalTimes = getPutMessageTimesTotal();
+ if (0 == totalTimes) {
+ totalTimes = 1L;
+ }
+
+ result.put("bootTimestamp", String.valueOf(this.messageStoreBootTimestamp));
+ result.put("runtime", this.getFormatRuntime());
+ result.put("putMessageEntireTimeMax", String.valueOf(this.putMessageEntireTimeMax));
+ result.put("putMessageTimesTotal", String.valueOf(totalTimes));
+ result.put("putMessageSizeTotal", String.valueOf(this.getPutMessageSizeTotal()));
+ result.put("putMessageDistributeTime",
+ String.valueOf(this.getPutMessageDistributeTimeStringInfo(totalTimes)));
+ result.put("putMessageAverageSize",
+ String.valueOf(this.getPutMessageSizeTotal() / totalTimes.doubleValue()));
+ result.put("dispatchMaxBuffer", String.valueOf(this.dispatchMaxBuffer));
+ result.put("getMessageEntireTimeMax", String.valueOf(this.getMessageEntireTimeMax));
+ result.put("putTps", String.valueOf(this.getPutTps()));
+ result.put("getFoundTps", String.valueOf(this.getGetFoundTps()));
+ result.put("getMissTps", String.valueOf(this.getGetMissTps()));
+ result.put("getTotalTps", String.valueOf(this.getGetTotalTps()));
+ result.put("getTransferedTps", String.valueOf(this.getGetTransferedTps()));
+
+ return result;
+ }
+
+ public void run() {
+ log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ this.waitForRunning(FREQUENCY_OF_SAMPLING);
+
+ this.sampling();
+
+ this.printTps();
+ } catch (Exception e) {
+ log.warn(this.getServiceName() + " service has exception. ", e);
+ }
+ }
+
+ log.info(this.getServiceName() + " service end");
+ }
+
+ @Override
+ public String getServiceName() {
+ return StoreStatsService.class.getSimpleName();
+ }
+
+ private void sampling() {
+ this.lockSampling.lock();
+ try {
+ this.putTimesList.add(new CallSnapshot(System.currentTimeMillis(), getPutMessageTimesTotal()));
+ if (this.putTimesList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
+ this.putTimesList.removeFirst();
+ }
+
+ this.getTimesFoundList.add(new CallSnapshot(System.currentTimeMillis(),
+ this.getMessageTimesTotalFound.get()));
+ if (this.getTimesFoundList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
+ this.getTimesFoundList.removeFirst();
+ }
+
+ this.getTimesMissList.add(new CallSnapshot(System.currentTimeMillis(),
+ this.getMessageTimesTotalMiss.get()));
+ if (this.getTimesMissList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
+ this.getTimesMissList.removeFirst();
+ }
+
+ this.transferedMsgCountList.add(new CallSnapshot(System.currentTimeMillis(),
+ this.getMessageTransferedMsgCount.get()));
+ if (this.transferedMsgCountList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
+ this.transferedMsgCountList.removeFirst();
+ }
+
+ } finally {
+ this.lockSampling.unlock();
+ }
+ }
+
+ private void printTps() {
+ if (System.currentTimeMillis() > (this.lastPrintTimestamp + printTPSInterval * 1000)) {
+ this.lastPrintTimestamp = System.currentTimeMillis();
+
+ log.info("[STORETPS] put_tps {} get_found_tps {} get_miss_tps {} get_transfered_tps {}",
+ this.getPutTps(printTPSInterval),
+ this.getGetFoundTps(printTPSInterval),
+ this.getGetMissTps(printTPSInterval),
+ this.getGetTransferedTps(printTPSInterval)
+ );
+
+ final AtomicLong[] times = this.initPutMessageDistributeTime();
+ if (null == times) return;
+
+ final StringBuilder sb = new StringBuilder();
+ long totalPut = 0;
+ for (int i = 0; i < times.length; i++) {
+ long value = times[i].get();
+ totalPut += value;
+ sb.append(String.format("%s:%d", PUT_MESSAGE_ENTIRE_TIME_MAX_DESC[i], value));
+ sb.append(" ");
+ }
+
+ log.info("[PAGECACHERT] TotalPut {}, PutMessageDistributeTime {}", totalPut, sb.toString());
+ }
+ }
+
+ public AtomicLong getGetMessageTimesTotalFound() {
+ return getMessageTimesTotalFound;
+ }
+
+
+ public AtomicLong getGetMessageTimesTotalMiss() {
+ return getMessageTimesTotalMiss;
+ }
+
+
+ public AtomicLong getGetMessageTransferedMsgCount() {
+ return getMessageTransferedMsgCount;
+ }
+
+
+ public AtomicLong getPutMessageFailedTimes() {
+ return putMessageFailedTimes;
+ }
+
+
+ public AtomicLong getSinglePutMessageTopicSizeTotal(String topic) {
+ AtomicLong rs = putMessageTopicSizeTotal.get(topic);
+ if (null == rs) {
+ rs = new AtomicLong(0);
+ putMessageTopicSizeTotal.put(topic, rs);
+ }
+ return rs;
+ }
+
+
+ public AtomicLong getSinglePutMessageTopicTimesTotal(String topic) {
+ AtomicLong rs = putMessageTopicTimesTotal.get(topic);
+ if (null == rs) {
+ rs = new AtomicLong(0);
+ putMessageTopicTimesTotal.put(topic, rs);
+ }
+ return rs;
+ }
+
+
+ public Map<String, AtomicLong> getPutMessageTopicTimesTotal() {
+ return putMessageTopicTimesTotal;
+ }
+
+
+ public Map<String, AtomicLong> getPutMessageTopicSizeTotal() {
+ return putMessageTopicSizeTotal;
+ }
+
+ static class CallSnapshot {
+ public final long timestamp;
+ public final long callTimesTotal;
+
+
+ public CallSnapshot(long timestamp, long callTimesTotal) {
+ this.timestamp = timestamp;
+ this.callTimesTotal = callTimesTotal;
+ }
+
+
+ public static double getTPS(final CallSnapshot begin, final CallSnapshot end) {
+ long total = end.callTimesTotal - begin.callTimesTotal;
+ Long time = end.timestamp - begin.timestamp;
+
+ double tps = total / time.doubleValue();
+
+ return tps * 1000;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreUtil.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreUtil.java
new file mode 100644
index 0000000..9e0b565
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreUtil.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class StoreUtil {
+ public static final long TOTAL_PHYSICAL_MEMORY_SIZE = getTotalPhysicalMemorySize();
+
+
+ @SuppressWarnings("restriction")
+ public static long getTotalPhysicalMemorySize() {
+ long physicalTotal = 1024 * 1024 * 1024 * 24;
+ OperatingSystemMXBean osmxb = ManagementFactory.getOperatingSystemMXBean();
+ if (osmxb instanceof com.sun.management.OperatingSystemMXBean) {
+ physicalTotal = ((com.sun.management.OperatingSystemMXBean) osmxb).getTotalPhysicalMemorySize();
+ }
+
+ return physicalTotal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/TransientStorePool.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/TransientStorePool.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/TransientStorePool.java
new file mode 100644
index 0000000..8abe7e9
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/TransientStorePool.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import com.alibaba.rocketmq.store.util.LibC;
+import com.sun.jna.NativeLong;
+import com.sun.jna.Pointer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+/**
+ * @author xinyuzhou.zxy
+ */
+public class TransientStorePool {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+ private final int poolSize;
+ private final int fileSize;
+ private final Deque<ByteBuffer> availableBuffers;
+ private final MessageStoreConfig storeConfig;
+
+ public TransientStorePool(final MessageStoreConfig storeConfig) {
+ this.storeConfig = storeConfig;
+ this.poolSize = storeConfig.getTransientStorePoolSize();
+ this.fileSize = storeConfig.getMapedFileSizeCommitLog();
+ this.availableBuffers = new ConcurrentLinkedDeque<>();
+ }
+
+ /**
+ * It's a heavy init method.
+ */
+ public void init() {
+ for (int i = 0; i < poolSize; i++) {
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
+
+ final long address = ((DirectBuffer) byteBuffer).address();
+ Pointer pointer = new Pointer(address);
+ LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
+
+ availableBuffers.offer(byteBuffer);
+ }
+ }
+
+ public void destroy() {
+ for (ByteBuffer byteBuffer : availableBuffers) {
+ final long address = ((DirectBuffer) byteBuffer).address();
+ Pointer pointer = new Pointer(address);
+ LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
+ }
+ }
+
+ public void returnBuffer(ByteBuffer byteBuffer) {
+ byteBuffer.position(0);
+ byteBuffer.limit(fileSize);
+ this.availableBuffers.offerFirst(byteBuffer);
+ }
+
+ public ByteBuffer borrowBuffer() {
+ ByteBuffer buffer = availableBuffers.pollFirst();
+ if (availableBuffers.size() < poolSize * 0.4) {
+ log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());
+ }
+ return buffer;
+ }
+
+ public int remainBufferNumbs() {
+ if (storeConfig.isTransientStorePoolEnable()) {
+ return availableBuffers.size();
+ }
+ return Integer.MAX_VALUE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/BrokerRole.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/BrokerRole.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/BrokerRole.java
new file mode 100644
index 0000000..06714b2
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/BrokerRole.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.config;
+
+/**
+ * @author shijia.wxr
+ */
+public enum BrokerRole {
+ ASYNC_MASTER,
+ SYNC_MASTER,
+ SLAVE;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/FlushDiskType.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/FlushDiskType.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/FlushDiskType.java
new file mode 100644
index 0000000..48ae4b2
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/FlushDiskType.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.config;
+
+/**
+ * @author shijia.wxr
+ */
+public enum FlushDiskType {
+ SYNC_FLUSH,
+ ASYNC_FLUSH
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/MessageStoreConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/MessageStoreConfig.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/MessageStoreConfig.java
new file mode 100644
index 0000000..138bc2e
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/MessageStoreConfig.java
@@ -0,0 +1,727 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.config;
+
+import com.alibaba.rocketmq.common.annotation.ImportantField;
+import com.alibaba.rocketmq.store.ConsumeQueue;
+
+import java.io.File;
+
+
+/**
+ * @author vongosling
+ * @author shijia.wxr
+ */
+public class MessageStoreConfig {
+ //The root directory in which the log data is kept
+ @ImportantField
+ private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
+
+ //The directory in which the commitlog is kept
+ @ImportantField
+ private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ + File.separator + "commitlog";
+
+ // CommitLog file size,default is 1G
+ private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;
+ // ConsumeQueue file size,default is 30W
+ private int mapedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;
+
+ // CommitLog flush interval
+ // flush data to disk
+ @ImportantField
+ private int flushIntervalCommitLog = 500;
+
+ // Only used if TransientStorePool enabled
+ // flush data to FileChannel
+ @ImportantField
+ private int commitIntervalCommitLog = 200;
+
+ private boolean useReentrantLockWhenPutMessage = false;
+
+ // Whether schedule flush,default is real-time
+ @ImportantField
+ private boolean flushCommitLogTimed = false;
+ // ConsumeQueue flush interval
+ private int flushIntervalConsumeQueue = 1000;
+ // Resource reclaim interval
+ private int cleanResourceInterval = 10000;
+ // CommitLog removal interval
+ private int deleteCommitLogFilesInterval = 100;
+ // ConsumeQueue removal interval
+ private int deleteConsumeQueueFilesInterval = 100;
+ private int destroyMapedFileIntervalForcibly = 1000 * 120;
+ private int redeleteHangedFileInterval = 1000 * 120;
+ // When to delete,default is at 4 am
+ @ImportantField
+ private String deleteWhen = "04";
+ private int diskMaxUsedSpaceRatio = 75;
+ // The number of hours to keep a log file before deleting it (in hours)
+ @ImportantField
+ private int fileReservedTime = 72;
+ // Flow control for ConsumeQueue
+ private int putMsgIndexHightWater = 600000;
+ // The maximum size of a single log file,default is 512K
+ private int maxMessageSize = 1024 * 1024 * 4;
+ // Whether check the CRC32 of the records consumed.
+ // This ensures no on-the-wire or on-disk corruption to the messages occurred.
+ // This check adds some overhead,so it may be disabled in cases seeking extreme performance.
+ private boolean checkCRCOnRecover = true;
+ // How many pages are to be flushed when flush CommitLog
+ private int flushCommitLogLeastPages = 4;
+ // How many pages are to be committed when commit data to file
+ private int commitCommitLogLeastPages = 4;
+ // Flush page size when the disk in warming state
+ private int flushLeastPagesWhenWarmMapedFile = 1024 / 4 * 16;
+ // How many pages are to be flushed when flush ConsumeQueue
+ private int flushConsumeQueueLeastPages = 2;
+ private int flushCommitLogThoroughInterval = 1000 * 10;
+ private int commitCommitLogThoroughInterval = 200;
+ private int flushConsumeQueueThoroughInterval = 1000 * 60;
+ @ImportantField
+ private int maxTransferBytesOnMessageInMemory = 1024 * 256;
+ @ImportantField
+ private int maxTransferCountOnMessageInMemory = 32;
+ @ImportantField
+ private int maxTransferBytesOnMessageInDisk = 1024 * 64;
+ @ImportantField
+ private int maxTransferCountOnMessageInDisk = 8;
+ @ImportantField
+ private int accessMessageInMemoryMaxRatio = 40;
+ @ImportantField
+ private boolean messageIndexEnable = true;
+ private int maxHashSlotNum = 5000000;
+ private int maxIndexNum = 5000000 * 4;
+ private int maxMsgsNumBatch = 64;
+ @ImportantField
+ private boolean messageIndexSafe = false;
+ private int haListenPort = 10912;
+ private int haSendHeartbeatInterval = 1000 * 5;
+ private int haHousekeepingInterval = 1000 * 20;
+ private int haTransferBatchSize = 1024 * 32;
+ @ImportantField
+ private String haMasterAddress = null;
+ private int haSlaveFallbehindMax = 1024 * 1024 * 256;
+ @ImportantField
+ private BrokerRole brokerRole = BrokerRole.ASYNC_MASTER;
+ @ImportantField
+ private FlushDiskType flushDiskType = FlushDiskType.ASYNC_FLUSH;
+ private int syncFlushTimeout = 1000 * 5;
+ private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
+ private long flushDelayOffsetInterval = 1000 * 10;
+ @ImportantField
+ private boolean cleanFileForciblyEnable = true;
+ private boolean warmMapedFileEnable = false;
+ private boolean offsetCheckInSlave = false;
+ private boolean debugLockEnable = false;
+ private boolean duplicationEnable = false;
+ private boolean diskFallRecorded = true;
+ private long osPageCacheBusyTimeOutMills = 1000;
+ private int defaultQueryMaxNum = 32;
+
+ @ImportantField
+ private boolean transientStorePoolEnable = false;
+ private int transientStorePoolSize = 5;
+ private boolean fastFailIfNoBufferInStorePool = false;
+
+ public boolean isDebugLockEnable() {
+ return debugLockEnable;
+ }
+
+ public void setDebugLockEnable(final boolean debugLockEnable) {
+ this.debugLockEnable = debugLockEnable;
+ }
+
+ public boolean isDuplicationEnable() {
+ return duplicationEnable;
+ }
+
+ public void setDuplicationEnable(final boolean duplicationEnable) {
+ this.duplicationEnable = duplicationEnable;
+ }
+
+ public long getOsPageCacheBusyTimeOutMills() {
+ return osPageCacheBusyTimeOutMills;
+ }
+
+ public void setOsPageCacheBusyTimeOutMills(final long osPageCacheBusyTimeOutMills) {
+ this.osPageCacheBusyTimeOutMills = osPageCacheBusyTimeOutMills;
+ }
+
+ public boolean isDiskFallRecorded() {
+ return diskFallRecorded;
+ }
+
+ public void setDiskFallRecorded(final boolean diskFallRecorded) {
+ this.diskFallRecorded = diskFallRecorded;
+ }
+
+ public boolean isWarmMapedFileEnable() {
+ return warmMapedFileEnable;
+ }
+
+
+ public void setWarmMapedFileEnable(boolean warmMapedFileEnable) {
+ this.warmMapedFileEnable = warmMapedFileEnable;
+ }
+
+
+ public int getMapedFileSizeCommitLog() {
+ return mapedFileSizeCommitLog;
+ }
+
+
+ public void setMapedFileSizeCommitLog(int mapedFileSizeCommitLog) {
+ this.mapedFileSizeCommitLog = mapedFileSizeCommitLog;
+ }
+
+
+ public int getMapedFileSizeConsumeQueue() {
+
+ int factor = (int) Math.ceil(this.mapedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
+ return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);
+ }
+
+
+ public void setMapedFileSizeConsumeQueue(int mapedFileSizeConsumeQueue) {
+ this.mapedFileSizeConsumeQueue = mapedFileSizeConsumeQueue;
+ }
+
+
+ public int getFlushIntervalCommitLog() {
+ return flushIntervalCommitLog;
+ }
+
+
+ public void setFlushIntervalCommitLog(int flushIntervalCommitLog) {
+ this.flushIntervalCommitLog = flushIntervalCommitLog;
+ }
+
+
+ public int getFlushIntervalConsumeQueue() {
+ return flushIntervalConsumeQueue;
+ }
+
+
+ public void setFlushIntervalConsumeQueue(int flushIntervalConsumeQueue) {
+ this.flushIntervalConsumeQueue = flushIntervalConsumeQueue;
+ }
+
+
+ public int getPutMsgIndexHightWater() {
+ return putMsgIndexHightWater;
+ }
+
+
+ public void setPutMsgIndexHightWater(int putMsgIndexHightWater) {
+ this.putMsgIndexHightWater = putMsgIndexHightWater;
+ }
+
+
+ public int getCleanResourceInterval() {
+ return cleanResourceInterval;
+ }
+
+
+ public void setCleanResourceInterval(int cleanResourceInterval) {
+ this.cleanResourceInterval = cleanResourceInterval;
+ }
+
+
+ public int getMaxMessageSize() {
+ return maxMessageSize;
+ }
+
+
+ public void setMaxMessageSize(int maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
+ }
+
+
+ public boolean isCheckCRCOnRecover() {
+ return checkCRCOnRecover;
+ }
+
+
+ public boolean getCheckCRCOnRecover() {
+ return checkCRCOnRecover;
+ }
+
+
+ public void setCheckCRCOnRecover(boolean checkCRCOnRecover) {
+ this.checkCRCOnRecover = checkCRCOnRecover;
+ }
+
+
+ public String getStorePathCommitLog() {
+ return storePathCommitLog;
+ }
+
+
+ public void setStorePathCommitLog(String storePathCommitLog) {
+ this.storePathCommitLog = storePathCommitLog;
+ }
+
+
+ public String getDeleteWhen() {
+ return deleteWhen;
+ }
+
+
+ public void setDeleteWhen(String deleteWhen) {
+ this.deleteWhen = deleteWhen;
+ }
+
+
+ public int getDiskMaxUsedSpaceRatio() {
+ if (this.diskMaxUsedSpaceRatio < 10)
+ return 10;
+
+ if (this.diskMaxUsedSpaceRatio > 95)
+ return 95;
+
+ return diskMaxUsedSpaceRatio;
+ }
+
+
+ public void setDiskMaxUsedSpaceRatio(int diskMaxUsedSpaceRatio) {
+ this.diskMaxUsedSpaceRatio = diskMaxUsedSpaceRatio;
+ }
+
+
+ public int getDeleteCommitLogFilesInterval() {
+ return deleteCommitLogFilesInterval;
+ }
+
+
+ public void setDeleteCommitLogFilesInterval(int deleteCommitLogFilesInterval) {
+ this.deleteCommitLogFilesInterval = deleteCommitLogFilesInterval;
+ }
+
+
+ public int getDeleteConsumeQueueFilesInterval() {
+ return deleteConsumeQueueFilesInterval;
+ }
+
+
+ public void setDeleteConsumeQueueFilesInterval(int deleteConsumeQueueFilesInterval) {
+ this.deleteConsumeQueueFilesInterval = deleteConsumeQueueFilesInterval;
+ }
+
+
+ public int getMaxTransferBytesOnMessageInMemory() {
+ return maxTransferBytesOnMessageInMemory;
+ }
+
+
+ public void setMaxTransferBytesOnMessageInMemory(int maxTransferBytesOnMessageInMemory) {
+ this.maxTransferBytesOnMessageInMemory = maxTransferBytesOnMessageInMemory;
+ }
+
+
+ public int getMaxTransferCountOnMessageInMemory() {
+ return maxTransferCountOnMessageInMemory;
+ }
+
+
+ public void setMaxTransferCountOnMessageInMemory(int maxTransferCountOnMessageInMemory) {
+ this.maxTransferCountOnMessageInMemory = maxTransferCountOnMessageInMemory;
+ }
+
+
+ public int getMaxTransferBytesOnMessageInDisk() {
+ return maxTransferBytesOnMessageInDisk;
+ }
+
+
+ public void setMaxTransferBytesOnMessageInDisk(int maxTransferBytesOnMessageInDisk) {
+ this.maxTransferBytesOnMessageInDisk = maxTransferBytesOnMessageInDisk;
+ }
+
+
+ public int getMaxTransferCountOnMessageInDisk() {
+ return maxTransferCountOnMessageInDisk;
+ }
+
+
+ public void setMaxTransferCountOnMessageInDisk(int maxTransferCountOnMessageInDisk) {
+ this.maxTransferCountOnMessageInDisk = maxTransferCountOnMessageInDisk;
+ }
+
+
+ public int getFlushCommitLogLeastPages() {
+ return flushCommitLogLeastPages;
+ }
+
+
+ public void setFlushCommitLogLeastPages(int flushCommitLogLeastPages) {
+ this.flushCommitLogLeastPages = flushCommitLogLeastPages;
+ }
+
+
+ public int getFlushConsumeQueueLeastPages() {
+ return flushConsumeQueueLeastPages;
+ }
+
+
+ public void setFlushConsumeQueueLeastPages(int flushConsumeQueueLeastPages) {
+ this.flushConsumeQueueLeastPages = flushConsumeQueueLeastPages;
+ }
+
+
+ public int getFlushCommitLogThoroughInterval() {
+ return flushCommitLogThoroughInterval;
+ }
+
+
+ public void setFlushCommitLogThoroughInterval(int flushCommitLogThoroughInterval) {
+ this.flushCommitLogThoroughInterval = flushCommitLogThoroughInterval;
+ }
+
+
+ public int getFlushConsumeQueueThoroughInterval() {
+ return flushConsumeQueueThoroughInterval;
+ }
+
+
+ public void setFlushConsumeQueueThoroughInterval(int flushConsumeQueueThoroughInterval) {
+ this.flushConsumeQueueThoroughInterval = flushConsumeQueueThoroughInterval;
+ }
+
+
+ public int getDestroyMapedFileIntervalForcibly() {
+ return destroyMapedFileIntervalForcibly;
+ }
+
+
+ public void setDestroyMapedFileIntervalForcibly(int destroyMapedFileIntervalForcibly) {
+ this.destroyMapedFileIntervalForcibly = destroyMapedFileIntervalForcibly;
+ }
+
+
+ public int getFileReservedTime() {
+ return fileReservedTime;
+ }
+
+
+ public void setFileReservedTime(int fileReservedTime) {
+ this.fileReservedTime = fileReservedTime;
+ }
+
+
+ public int getRedeleteHangedFileInterval() {
+ return redeleteHangedFileInterval;
+ }
+
+
+ public void setRedeleteHangedFileInterval(int redeleteHangedFileInterval) {
+ this.redeleteHangedFileInterval = redeleteHangedFileInterval;
+ }
+
+
+ public int getAccessMessageInMemoryMaxRatio() {
+ return accessMessageInMemoryMaxRatio;
+ }
+
+
+ public void setAccessMessageInMemoryMaxRatio(int accessMessageInMemoryMaxRatio) {
+ this.accessMessageInMemoryMaxRatio = accessMessageInMemoryMaxRatio;
+ }
+
+
+ public boolean isMessageIndexEnable() {
+ return messageIndexEnable;
+ }
+
+
+ public void setMessageIndexEnable(boolean messageIndexEnable) {
+ this.messageIndexEnable = messageIndexEnable;
+ }
+
+
+ public int getMaxHashSlotNum() {
+ return maxHashSlotNum;
+ }
+
+
+ public void setMaxHashSlotNum(int maxHashSlotNum) {
+ this.maxHashSlotNum = maxHashSlotNum;
+ }
+
+
+ public int getMaxIndexNum() {
+ return maxIndexNum;
+ }
+
+
+ public void setMaxIndexNum(int maxIndexNum) {
+ this.maxIndexNum = maxIndexNum;
+ }
+
+
+ public int getMaxMsgsNumBatch() {
+ return maxMsgsNumBatch;
+ }
+
+
+ public void setMaxMsgsNumBatch(int maxMsgsNumBatch) {
+ this.maxMsgsNumBatch = maxMsgsNumBatch;
+ }
+
+
+ public int getHaListenPort() {
+ return haListenPort;
+ }
+
+
+ public void setHaListenPort(int haListenPort) {
+ this.haListenPort = haListenPort;
+ }
+
+
+ public int getHaSendHeartbeatInterval() {
+ return haSendHeartbeatInterval;
+ }
+
+
+ public void setHaSendHeartbeatInterval(int haSendHeartbeatInterval) {
+ this.haSendHeartbeatInterval = haSendHeartbeatInterval;
+ }
+
+
+ public int getHaHousekeepingInterval() {
+ return haHousekeepingInterval;
+ }
+
+
+ public void setHaHousekeepingInterval(int haHousekeepingInterval) {
+ this.haHousekeepingInterval = haHousekeepingInterval;
+ }
+
+
+ public BrokerRole getBrokerRole() {
+ return brokerRole;
+ }
+
+ public void setBrokerRole(BrokerRole brokerRole) {
+ this.brokerRole = brokerRole;
+ }
+
+ public void setBrokerRole(String brokerRole) {
+ this.brokerRole = BrokerRole.valueOf(brokerRole);
+ }
+
+ public int getHaTransferBatchSize() {
+ return haTransferBatchSize;
+ }
+
+
+ public void setHaTransferBatchSize(int haTransferBatchSize) {
+ this.haTransferBatchSize = haTransferBatchSize;
+ }
+
+
+ public int getHaSlaveFallbehindMax() {
+ return haSlaveFallbehindMax;
+ }
+
+
+ public void setHaSlaveFallbehindMax(int haSlaveFallbehindMax) {
+ this.haSlaveFallbehindMax = haSlaveFallbehindMax;
+ }
+
+
+ public FlushDiskType getFlushDiskType() {
+ return flushDiskType;
+ }
+
+ public void setFlushDiskType(FlushDiskType flushDiskType) {
+ this.flushDiskType = flushDiskType;
+ }
+
+ public void setFlushDiskType(String type) {
+ this.flushDiskType = FlushDiskType.valueOf(type);
+ }
+
+ public int getSyncFlushTimeout() {
+ return syncFlushTimeout;
+ }
+
+
+ public void setSyncFlushTimeout(int syncFlushTimeout) {
+ this.syncFlushTimeout = syncFlushTimeout;
+ }
+
+
+ public String getHaMasterAddress() {
+ return haMasterAddress;
+ }
+
+
+ public void setHaMasterAddress(String haMasterAddress) {
+ this.haMasterAddress = haMasterAddress;
+ }
+
+
+ public String getMessageDelayLevel() {
+ return messageDelayLevel;
+ }
+
+
+ public void setMessageDelayLevel(String messageDelayLevel) {
+ this.messageDelayLevel = messageDelayLevel;
+ }
+
+
+ public long getFlushDelayOffsetInterval() {
+ return flushDelayOffsetInterval;
+ }
+
+
+ public void setFlushDelayOffsetInterval(long flushDelayOffsetInterval) {
+ this.flushDelayOffsetInterval = flushDelayOffsetInterval;
+ }
+
+
+ public boolean isCleanFileForciblyEnable() {
+ return cleanFileForciblyEnable;
+ }
+
+
+ public void setCleanFileForciblyEnable(boolean cleanFileForciblyEnable) {
+ this.cleanFileForciblyEnable = cleanFileForciblyEnable;
+ }
+
+
+ public boolean isMessageIndexSafe() {
+ return messageIndexSafe;
+ }
+
+
+ public void setMessageIndexSafe(boolean messageIndexSafe) {
+ this.messageIndexSafe = messageIndexSafe;
+ }
+
+
+ public boolean isFlushCommitLogTimed() {
+ return flushCommitLogTimed;
+ }
+
+
+ public void setFlushCommitLogTimed(boolean flushCommitLogTimed) {
+ this.flushCommitLogTimed = flushCommitLogTimed;
+ }
+
+
+ public String getStorePathRootDir() {
+ return storePathRootDir;
+ }
+
+
+ public void setStorePathRootDir(String storePathRootDir) {
+ this.storePathRootDir = storePathRootDir;
+ }
+
+
+ public int getFlushLeastPagesWhenWarmMapedFile() {
+ return flushLeastPagesWhenWarmMapedFile;
+ }
+
+
+ public void setFlushLeastPagesWhenWarmMapedFile(int flushLeastPagesWhenWarmMapedFile) {
+ this.flushLeastPagesWhenWarmMapedFile = flushLeastPagesWhenWarmMapedFile;
+ }
+
+
+ public boolean isOffsetCheckInSlave() {
+ return offsetCheckInSlave;
+ }
+
+
+ public void setOffsetCheckInSlave(boolean offsetCheckInSlave) {
+ this.offsetCheckInSlave = offsetCheckInSlave;
+ }
+
+ public int getDefaultQueryMaxNum() {
+ return defaultQueryMaxNum;
+ }
+
+ public void setDefaultQueryMaxNum(int defaultQueryMaxNum) {
+ this.defaultQueryMaxNum = defaultQueryMaxNum;
+ }
+
+ /**
+ * Enable transient commitLog store poll only if transientStorePoolEnable is true and the FlushDiskType is ASYNC_FLUSH
+ * @return <tt>true</tt> or <tt>false</tt>
+ */
+ public boolean isTransientStorePoolEnable() {
+ return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
+ && BrokerRole.SLAVE != getBrokerRole();
+ }
+
+ public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) {
+ this.transientStorePoolEnable = transientStorePoolEnable;
+ }
+
+ public int getTransientStorePoolSize() {
+ return transientStorePoolSize;
+ }
+
+ public void setTransientStorePoolSize(final int transientStorePoolSize) {
+ this.transientStorePoolSize = transientStorePoolSize;
+ }
+
+ public int getCommitIntervalCommitLog() {
+ return commitIntervalCommitLog;
+ }
+
+ public void setCommitIntervalCommitLog(final int commitIntervalCommitLog) {
+ this.commitIntervalCommitLog = commitIntervalCommitLog;
+ }
+
+ public boolean isFastFailIfNoBufferInStorePool() {
+ return fastFailIfNoBufferInStorePool;
+ }
+
+ public void setFastFailIfNoBufferInStorePool(final boolean fastFailIfNoBufferInStorePool) {
+ this.fastFailIfNoBufferInStorePool = fastFailIfNoBufferInStorePool;
+ }
+
+ public boolean isUseReentrantLockWhenPutMessage() {
+ return useReentrantLockWhenPutMessage;
+ }
+
+ public void setUseReentrantLockWhenPutMessage(final boolean useReentrantLockWhenPutMessage) {
+ this.useReentrantLockWhenPutMessage = useReentrantLockWhenPutMessage;
+ }
+
+ public int getCommitCommitLogLeastPages() {
+ return commitCommitLogLeastPages;
+ }
+
+ public void setCommitCommitLogLeastPages(final int commitCommitLogLeastPages) {
+ this.commitCommitLogLeastPages = commitCommitLogLeastPages;
+ }
+
+ public int getCommitCommitLogThoroughInterval() {
+ return commitCommitLogThoroughInterval;
+ }
+
+ public void setCommitCommitLogThoroughInterval(final int commitCommitLogThoroughInterval) {
+ this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/StorePathConfigHelper.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/StorePathConfigHelper.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/StorePathConfigHelper.java
new file mode 100644
index 0000000..d3d77c0
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/StorePathConfigHelper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.config;
+
+import java.io.File;
+
+
+public class StorePathConfigHelper {
+
+ public static String getStorePathConsumeQueue(final String rootDir) {
+ return rootDir + File.separator + "consumequeue";
+ }
+
+
+ public static String getStorePathIndex(final String rootDir) {
+ return rootDir + File.separator + "index";
+ }
+
+
+ public static String getStoreCheckpoint(final String rootDir) {
+ return rootDir + File.separator + "checkpoint";
+ }
+
+
+ public static String getAbortFile(final String rootDir) {
+ return rootDir + File.separator + "abort";
+ }
+
+
+ public static String getDelayOffsetStorePath(final String rootDir) {
+ return rootDir + File.separator + "config" + File.separator + "delayOffset.json";
+ }
+
+
+ public static String getTranStateTableStorePath(final String rootDir) {
+ return rootDir + File.separator + "transaction" + File.separator + "statetable";
+ }
+
+
+ public static String getTranRedoLogStorePath(final String rootDir) {
+ return rootDir + File.separator + "transaction" + File.separator + "redolog";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAConnection.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAConnection.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAConnection.java
new file mode 100644
index 0000000..1773059
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAConnection.java
@@ -0,0 +1,429 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.ha;
+
+import com.alibaba.rocketmq.common.ServiceThread;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import com.alibaba.rocketmq.store.SelectMappedBufferResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class HAConnection {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ private final HAService haService;
+ private final SocketChannel socketChannel;
+ private final String clientAddr;
+ private WriteSocketService writeSocketService;
+ private ReadSocketService readSocketService;
+
+ private volatile long slaveRequestOffset = -1;
+ private volatile long slaveAckOffset = -1;
+
+
+ public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
+ this.haService = haService;
+ this.socketChannel = socketChannel;
+ this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
+ this.socketChannel.configureBlocking(false);
+ this.socketChannel.socket().setSoLinger(false, -1);
+ this.socketChannel.socket().setTcpNoDelay(true);
+ this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
+ this.socketChannel.socket().setSendBufferSize(1024 * 64);
+ this.writeSocketService = new WriteSocketService(this.socketChannel);
+ this.readSocketService = new ReadSocketService(this.socketChannel);
+ this.haService.getConnectionCount().incrementAndGet();
+ }
+
+
+ public void start() {
+ this.readSocketService.start();
+ this.writeSocketService.start();
+ }
+
+
+ public void shutdown() {
+ this.writeSocketService.shutdown(true);
+ this.readSocketService.shutdown(true);
+ this.close();
+ }
+
+
+ public void close() {
+ if (this.socketChannel != null) {
+ try {
+ this.socketChannel.close();
+ } catch (IOException e) {
+ HAConnection.log.error("", e);
+ }
+ }
+ }
+
+
+ public SocketChannel getSocketChannel() {
+ return socketChannel;
+ }
+
+ /**
+
+ *
+ * @author shijia.wxr
+ */
+ class ReadSocketService extends ServiceThread {
+ private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
+ private final Selector selector;
+ private final SocketChannel socketChannel;
+ private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+ private int processPostion = 0;
+ private volatile long lastReadTimestamp = System.currentTimeMillis();
+
+
+ public ReadSocketService(final SocketChannel socketChannel) throws IOException {
+ this.selector = RemotingUtil.openSelector();
+ this.socketChannel = socketChannel;
+ this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+ this.thread.setDaemon(true);
+ }
+
+
+ @Override
+ public void run() {
+ HAConnection.log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ this.selector.select(1000);
+ boolean ok = this.processReadEvent();
+ if (!ok) {
+ HAConnection.log.error("processReadEvent error");
+ break;
+ }
+
+
+ long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
+ if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
+ log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
+ break;
+ }
+ } catch (Exception e) {
+ HAConnection.log.error(this.getServiceName() + " service has exception.", e);
+ break;
+ }
+ }
+
+ this.makeStop();
+
+ writeSocketService.makeStop();
+
+
+ haService.removeConnection(HAConnection.this);
+
+
+ HAConnection.this.haService.getConnectionCount().decrementAndGet();
+
+ SelectionKey sk = this.socketChannel.keyFor(this.selector);
+ if (sk != null) {
+ sk.cancel();
+ }
+
+ try {
+ this.selector.close();
+ this.socketChannel.close();
+ } catch (IOException e) {
+ HAConnection.log.error("", e);
+ }
+
+ HAConnection.log.info(this.getServiceName() + " service end");
+ }
+
+ @Override
+ public String getServiceName() {
+ return ReadSocketService.class.getSimpleName();
+ }
+
+ private boolean processReadEvent() {
+ int readSizeZeroTimes = 0;
+
+ if (!this.byteBufferRead.hasRemaining()) {
+ this.byteBufferRead.flip();
+ this.processPostion = 0;
+ }
+
+ while (this.byteBufferRead.hasRemaining()) {
+ try {
+ int readSize = this.socketChannel.read(this.byteBufferRead);
+ if (readSize > 0) {
+ readSizeZeroTimes = 0;
+ this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
+ if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
+ int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
+ long readOffset = this.byteBufferRead.getLong(pos - 8);
+ this.processPostion = pos;
+
+
+ HAConnection.this.slaveAckOffset = readOffset;
+ if (HAConnection.this.slaveRequestOffset < 0) {
+ HAConnection.this.slaveRequestOffset = readOffset;
+ log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
+ }
+
+
+ HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
+ }
+ } else if (readSize == 0) {
+ if (++readSizeZeroTimes >= 3) {
+ break;
+ }
+ } else {
+ log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
+ return false;
+ }
+ } catch (IOException e) {
+ log.error("processReadEvent exception", e);
+ return false;
+ }
+ }
+
+ return true;
+ }
+ }
+
+ /**
+
+ *
+ * @author shijia.wxr
+ */
+ class WriteSocketService extends ServiceThread {
+ private final Selector selector;
+ private final SocketChannel socketChannel;
+
+ private final int headerSize = 8 + 4;
+ private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);
+ private long nextTransferFromWhere = -1;
+ private SelectMappedBufferResult selectMappedBufferResult;
+ private boolean lastWriteOver = true;
+ private long lastWriteTimestamp = System.currentTimeMillis();
+
+
+ public WriteSocketService(final SocketChannel socketChannel) throws IOException {
+ this.selector = RemotingUtil.openSelector();
+ this.socketChannel = socketChannel;
+ this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
+ this.thread.setDaemon(true);
+ }
+
+
+ @Override
+ public void run() {
+ HAConnection.log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ this.selector.select(1000);
+
+ if (-1 == HAConnection.this.slaveRequestOffset) {
+ Thread.sleep(10);
+ continue;
+ }
+
+
+
+ if (-1 == this.nextTransferFromWhere) {
+ if (0 == HAConnection.this.slaveRequestOffset) {
+ long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
+ masterOffset =
+ masterOffset
+ - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
+ .getMapedFileSizeCommitLog());
+
+ if (masterOffset < 0) {
+ masterOffset = 0;
+ }
+
+ this.nextTransferFromWhere = masterOffset;
+ } else {
+ this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
+ }
+
+ log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ + "], and slave request " + HAConnection.this.slaveRequestOffset);
+ }
+
+ if (this.lastWriteOver) {
+
+ long interval =
+ HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
+
+ if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
+ .getHaSendHeartbeatInterval()) {
+
+ // Build Header
+ this.byteBufferHeader.position(0);
+ this.byteBufferHeader.limit(headerSize);
+ this.byteBufferHeader.putLong(this.nextTransferFromWhere);
+ this.byteBufferHeader.putInt(0);
+ this.byteBufferHeader.flip();
+
+ this.lastWriteOver = this.transferData();
+ if (!this.lastWriteOver)
+ continue;
+ }
+ }
+
+ else {
+ this.lastWriteOver = this.transferData();
+ if (!this.lastWriteOver)
+ continue;
+ }
+
+ SelectMappedBufferResult selectResult =
+ HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
+ if (selectResult != null) {
+ int size = selectResult.getSize();
+ if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
+ size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
+ }
+
+ long thisOffset = this.nextTransferFromWhere;
+ this.nextTransferFromWhere += size;
+
+ selectResult.getByteBuffer().limit(size);
+ this.selectMappedBufferResult = selectResult;
+
+ // Build Header
+ this.byteBufferHeader.position(0);
+ this.byteBufferHeader.limit(headerSize);
+ this.byteBufferHeader.putLong(thisOffset);
+ this.byteBufferHeader.putInt(size);
+ this.byteBufferHeader.flip();
+
+ this.lastWriteOver = this.transferData();
+ } else {
+
+ HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
+ }
+ } catch (Exception e) {
+
+ HAConnection.log.error(this.getServiceName() + " service has exception.", e);
+ break;
+ }
+ }
+
+
+ if (this.selectMappedBufferResult != null) {
+ this.selectMappedBufferResult.release();
+ }
+
+ this.makeStop();
+
+ readSocketService.makeStop();
+
+
+ haService.removeConnection(HAConnection.this);
+
+ SelectionKey sk = this.socketChannel.keyFor(this.selector);
+ if (sk != null) {
+ sk.cancel();
+ }
+
+ try {
+ this.selector.close();
+ this.socketChannel.close();
+ } catch (IOException e) {
+ HAConnection.log.error("", e);
+ }
+
+ HAConnection.log.info(this.getServiceName() + " service end");
+ }
+
+
+ /**
+
+ */
+ private boolean transferData() throws Exception {
+ int writeSizeZeroTimes = 0;
+ // Write Header
+ while (this.byteBufferHeader.hasRemaining()) {
+ int writeSize = this.socketChannel.write(this.byteBufferHeader);
+ if (writeSize > 0) {
+ writeSizeZeroTimes = 0;
+ this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
+ } else if (writeSize == 0) {
+ if (++writeSizeZeroTimes >= 3) {
+ break;
+ }
+ } else {
+ throw new Exception("ha master write header error < 0");
+ }
+ }
+
+ if (null == this.selectMappedBufferResult) {
+ return !this.byteBufferHeader.hasRemaining();
+ }
+
+ writeSizeZeroTimes = 0;
+
+ // Write Body
+ if (!this.byteBufferHeader.hasRemaining()) {
+ while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
+ int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
+ if (writeSize > 0) {
+ writeSizeZeroTimes = 0;
+ this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
+ } else if (writeSize == 0) {
+ if (++writeSizeZeroTimes >= 3) {
+ break;
+ }
+ } else {
+ throw new Exception("ha master write body error < 0");
+ }
+ }
+ }
+
+ boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
+
+ if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
+ this.selectMappedBufferResult.release();
+ this.selectMappedBufferResult = null;
+ }
+
+ return result;
+ }
+
+
+ @Override
+ public String getServiceName() {
+ return WriteSocketService.class.getSimpleName();
+ }
+
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ }
+ }
+}