You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:25 UTC

[08/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreStatsService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreStatsService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreStatsService.java
new file mode 100644
index 0000000..b5ed3f3
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreStatsService.java
@@ -0,0 +1,615 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.ServiceThread;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class StoreStatsService extends ServiceThread {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+    private static final int FREQUENCY_OF_SAMPLING = 1000;
+
+    private static final int MAX_RECORDS_OF_SAMPLING = 60 * 10;
+    private static final String[] PUT_MESSAGE_ENTIRE_TIME_MAX_DESC = new String[]{
+        "[<=0ms]", "[0~10ms]", "[10~50ms]", "[50~100ms]", "[100~200ms]", "[200~500ms]", "[500ms~1s]", "[1~2s]", "[2~3s]", "[3~4s]", "[4~5s]", "[5~10s]", "[10s~]",
+    };
+
+    private static int printTPSInterval = 60 * 1;
+
+    private final AtomicLong putMessageFailedTimes = new AtomicLong(0);
+
+    private final Map<String, AtomicLong> putMessageTopicTimesTotal =
+            new ConcurrentHashMap<String, AtomicLong>(128);
+    private final Map<String, AtomicLong> putMessageTopicSizeTotal =
+            new ConcurrentHashMap<String, AtomicLong>(128);
+
+    private final AtomicLong getMessageTimesTotalFound = new AtomicLong(0);
+    private final AtomicLong getMessageTransferedMsgCount = new AtomicLong(0);
+    private final AtomicLong getMessageTimesTotalMiss = new AtomicLong(0);
+    private final LinkedList<CallSnapshot> putTimesList = new LinkedList<CallSnapshot>();
+
+    private final LinkedList<CallSnapshot> getTimesFoundList = new LinkedList<CallSnapshot>();
+    private final LinkedList<CallSnapshot> getTimesMissList = new LinkedList<CallSnapshot>();
+    private final LinkedList<CallSnapshot> transferedMsgCountList = new LinkedList<CallSnapshot>();
+    private volatile AtomicLong[] putMessageDistributeTime;
+    private long messageStoreBootTimestamp = System.currentTimeMillis();
+    private volatile long putMessageEntireTimeMax = 0;
+    private volatile long getMessageEntireTimeMax = 0;
+    // for putMessageEntireTimeMax
+    private ReentrantLock lockPut = new ReentrantLock();
+    // for getMessageEntireTimeMax
+    private ReentrantLock lockGet = new ReentrantLock();
+
+    private volatile long dispatchMaxBuffer = 0;
+
+    private ReentrantLock lockSampling = new ReentrantLock();
+    private long lastPrintTimestamp = System.currentTimeMillis();
+
+
+    public StoreStatsService() {
+        this.initPutMessageDistributeTime();
+    }
+
+    private AtomicLong[] initPutMessageDistributeTime() {
+        AtomicLong[] next = new AtomicLong[13];
+        for (int i = 0; i < next.length; i++) {
+            next[i] = new AtomicLong(0);
+        }
+
+        AtomicLong[] old = this.putMessageDistributeTime;
+
+        this.putMessageDistributeTime = next;
+
+        return old;
+    }
+
+    public long getPutMessageEntireTimeMax() {
+        return putMessageEntireTimeMax;
+    }
+
+    public void setPutMessageEntireTimeMax(long value) {
+        final AtomicLong[] times = this.putMessageDistributeTime;
+
+        if (null == times) return;
+
+        // us
+        if (value <= 0) {
+            times[0].incrementAndGet();
+        } else if (value < 10) {
+            times[1].incrementAndGet();
+        } else if (value < 50) {
+            times[2].incrementAndGet();
+        } else if (value < 100) {
+            times[3].incrementAndGet();
+        } else if (value < 200) {
+            times[4].incrementAndGet();
+        } else if (value < 500) {
+            times[5].incrementAndGet();
+        } else if (value < 1000) {
+            times[6].incrementAndGet();
+        }
+        // 2s
+        else if (value < 2000) {
+            times[7].incrementAndGet();
+        }
+        // 3s
+        else if (value < 3000) {
+            times[8].incrementAndGet();
+        }
+        // 4s
+        else if (value < 4000) {
+            times[9].incrementAndGet();
+        }
+        // 5s
+        else if (value < 5000) {
+            times[10].incrementAndGet();
+        }
+        // 10s
+        else if (value < 10000) {
+            times[11].incrementAndGet();
+        } else {
+            times[12].incrementAndGet();
+        }
+
+        if (value > this.putMessageEntireTimeMax) {
+            this.lockPut.lock();
+            this.putMessageEntireTimeMax =
+                    value > this.putMessageEntireTimeMax ? value : this.putMessageEntireTimeMax;
+            this.lockPut.unlock();
+        }
+    }
+
+
+    public long getGetMessageEntireTimeMax() {
+        return getMessageEntireTimeMax;
+    }
+
+
+    public void setGetMessageEntireTimeMax(long value) {
+        if (value > this.getMessageEntireTimeMax) {
+            this.lockGet.lock();
+            this.getMessageEntireTimeMax =
+                    value > this.getMessageEntireTimeMax ? value : this.getMessageEntireTimeMax;
+            this.lockGet.unlock();
+        }
+    }
+
+
+    public long getDispatchMaxBuffer() {
+        return dispatchMaxBuffer;
+    }
+
+
+    public void setDispatchMaxBuffer(long value) {
+        this.dispatchMaxBuffer = value > this.dispatchMaxBuffer ? value : this.dispatchMaxBuffer;
+    }
+
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder(1024);
+        Long totalTimes = getPutMessageTimesTotal();
+        if (0 == totalTimes) {
+            totalTimes = 1L;
+        }
+
+        sb.append("\truntime: " + this.getFormatRuntime() + "\r\n");
+        sb.append("\tputMessageEntireTimeMax: " + this.putMessageEntireTimeMax + "\r\n");
+        sb.append("\tputMessageTimesTotal: " + totalTimes + "\r\n");
+        sb.append("\tputMessageSizeTotal: " + this.getPutMessageSizeTotal() + "\r\n");
+        sb.append("\tputMessageDistributeTime: " + this.getPutMessageDistributeTimeStringInfo(totalTimes)
+                + "\r\n");
+        sb.append("\tputMessageAverageSize: " + (this.getPutMessageSizeTotal() / totalTimes.doubleValue())
+                + "\r\n");
+        sb.append("\tdispatchMaxBuffer: " + this.dispatchMaxBuffer + "\r\n");
+        sb.append("\tgetMessageEntireTimeMax: " + this.getMessageEntireTimeMax + "\r\n");
+        sb.append("\tputTps: " + this.getPutTps() + "\r\n");
+        sb.append("\tgetFoundTps: " + this.getGetFoundTps() + "\r\n");
+        sb.append("\tgetMissTps: " + this.getGetMissTps() + "\r\n");
+        sb.append("\tgetTotalTps: " + this.getGetTotalTps() + "\r\n");
+        sb.append("\tgetTransferedTps: " + this.getGetTransferedTps() + "\r\n");
+        return sb.toString();
+    }
+
+    public long getPutMessageTimesTotal() {
+        long rs = 0;
+        for (AtomicLong data : putMessageTopicTimesTotal.values()) {
+            rs += data.get();
+        }
+        return rs;
+    }
+
+    private String getFormatRuntime() {
+        final long millisecond = 1;
+        final long second = 1000 * millisecond;
+        final long minute = 60 * second;
+        final long hour = 60 * minute;
+        final long day = 24 * hour;
+        final MessageFormat messageFormat = new MessageFormat("[ {0} days, {1} hours, {2} minutes, {3} seconds ]");
+
+        long time = System.currentTimeMillis() - this.messageStoreBootTimestamp;
+        long days = time / day;
+        long hours = (time % day) / hour;
+        long minutes = (time % hour) / minute;
+        long seconds = (time % minute) / second;
+        return messageFormat.format(new Long[]{days, hours, minutes, seconds});
+    }
+
+    public long getPutMessageSizeTotal() {
+        long rs = 0;
+        for (AtomicLong data : putMessageTopicSizeTotal.values()) {
+            rs += data.get();
+        }
+        return rs;
+    }
+
+    private String getPutMessageDistributeTimeStringInfo(Long total) {
+        return this.putMessageDistributeTimeToString();
+    }
+
+    private String getPutTps() {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(this.getPutTps(10));
+        sb.append(" ");
+
+
+        sb.append(this.getPutTps(60));
+        sb.append(" ");
+
+
+        sb.append(this.getPutTps(600));
+
+        return sb.toString();
+    }
+
+    private String getGetFoundTps() {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(this.getGetFoundTps(10));
+        sb.append(" ");
+
+
+        sb.append(this.getGetFoundTps(60));
+        sb.append(" ");
+
+
+        sb.append(this.getGetFoundTps(600));
+
+        return sb.toString();
+    }
+
+    private String getGetMissTps() {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(this.getGetMissTps(10));
+        sb.append(" ");
+
+
+        sb.append(this.getGetMissTps(60));
+        sb.append(" ");
+
+
+        sb.append(this.getGetMissTps(600));
+
+        return sb.toString();
+    }
+
+    private String getGetTotalTps() {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(this.getGetTotalTps(10));
+        sb.append(" ");
+
+
+        sb.append(this.getGetTotalTps(60));
+        sb.append(" ");
+
+
+        sb.append(this.getGetTotalTps(600));
+
+        return sb.toString();
+    }
+
+    private String getGetTransferedTps() {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(this.getGetTransferedTps(10));
+        sb.append(" ");
+
+
+        sb.append(this.getGetTransferedTps(60));
+        sb.append(" ");
+
+
+        sb.append(this.getGetTransferedTps(600));
+
+        return sb.toString();
+    }
+
+    private String putMessageDistributeTimeToString() {
+        final AtomicLong[] times = this.putMessageDistributeTime;
+        if (null == times) return null;
+
+        final StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < times.length; i++) {
+            long value = times[i].get();
+            sb.append(String.format("%s:%d", PUT_MESSAGE_ENTIRE_TIME_MAX_DESC[i], value));
+            sb.append(" ");
+        }
+
+        return sb.toString();
+    }
+
+    private String getPutTps(int time) {
+        String result = "";
+        this.lockSampling.lock();
+        try {
+            CallSnapshot last = this.putTimesList.getLast();
+
+            if (this.putTimesList.size() > time) {
+                CallSnapshot lastBefore = this.putTimesList.get(this.putTimesList.size() - (time + 1));
+                result += CallSnapshot.getTPS(lastBefore, last);
+            }
+
+        } finally {
+            this.lockSampling.unlock();
+        }
+        return result;
+    }
+
+    private String getGetFoundTps(int time) {
+        String result = "";
+        this.lockSampling.lock();
+        try {
+            CallSnapshot last = this.getTimesFoundList.getLast();
+
+            if (this.getTimesFoundList.size() > time) {
+                CallSnapshot lastBefore =
+                        this.getTimesFoundList.get(this.getTimesFoundList.size() - (time + 1));
+                result += CallSnapshot.getTPS(lastBefore, last);
+            }
+        } finally {
+            this.lockSampling.unlock();
+        }
+
+        return result;
+    }
+
+    private String getGetMissTps(int time) {
+        String result = "";
+        this.lockSampling.lock();
+        try {
+            CallSnapshot last = this.getTimesMissList.getLast();
+
+            if (this.getTimesMissList.size() > time) {
+                CallSnapshot lastBefore =
+                        this.getTimesMissList.get(this.getTimesMissList.size() - (time + 1));
+                result += CallSnapshot.getTPS(lastBefore, last);
+            }
+
+        } finally {
+            this.lockSampling.unlock();
+        }
+
+        return result;
+    }
+
+    private String getGetTotalTps(int time) {
+        this.lockSampling.lock();
+        double found = 0;
+        double miss = 0;
+        try {
+            {
+                CallSnapshot last = this.getTimesFoundList.getLast();
+
+                if (this.getTimesFoundList.size() > time) {
+                    CallSnapshot lastBefore =
+                            this.getTimesFoundList.get(this.getTimesFoundList.size() - (time + 1));
+                    found = CallSnapshot.getTPS(lastBefore, last);
+                }
+            }
+            {
+                CallSnapshot last = this.getTimesMissList.getLast();
+
+                if (this.getTimesMissList.size() > time) {
+                    CallSnapshot lastBefore =
+                            this.getTimesMissList.get(this.getTimesMissList.size() - (time + 1));
+                    miss = CallSnapshot.getTPS(lastBefore, last);
+                }
+            }
+
+        } finally {
+            this.lockSampling.unlock();
+        }
+
+        return Double.toString(found + miss);
+    }
+
+    private String getGetTransferedTps(int time) {
+        String result = "";
+        this.lockSampling.lock();
+        try {
+            CallSnapshot last = this.transferedMsgCountList.getLast();
+
+            if (this.transferedMsgCountList.size() > time) {
+                CallSnapshot lastBefore =
+                        this.transferedMsgCountList.get(this.transferedMsgCountList.size() - (time + 1));
+                result += CallSnapshot.getTPS(lastBefore, last);
+            }
+
+        } finally {
+            this.lockSampling.unlock();
+        }
+
+        return result;
+    }
+
+    public HashMap<String, String> getRuntimeInfo() {
+        HashMap<String, String> result = new HashMap<String, String>(64);
+
+        Long totalTimes = getPutMessageTimesTotal();
+        if (0 == totalTimes) {
+            totalTimes = 1L;
+        }
+
+        result.put("bootTimestamp", String.valueOf(this.messageStoreBootTimestamp));
+        result.put("runtime", this.getFormatRuntime());
+        result.put("putMessageEntireTimeMax", String.valueOf(this.putMessageEntireTimeMax));
+        result.put("putMessageTimesTotal", String.valueOf(totalTimes));
+        result.put("putMessageSizeTotal", String.valueOf(this.getPutMessageSizeTotal()));
+        result.put("putMessageDistributeTime",
+                String.valueOf(this.getPutMessageDistributeTimeStringInfo(totalTimes)));
+        result.put("putMessageAverageSize",
+                String.valueOf(this.getPutMessageSizeTotal() / totalTimes.doubleValue()));
+        result.put("dispatchMaxBuffer", String.valueOf(this.dispatchMaxBuffer));
+        result.put("getMessageEntireTimeMax", String.valueOf(this.getMessageEntireTimeMax));
+        result.put("putTps", String.valueOf(this.getPutTps()));
+        result.put("getFoundTps", String.valueOf(this.getGetFoundTps()));
+        result.put("getMissTps", String.valueOf(this.getGetMissTps()));
+        result.put("getTotalTps", String.valueOf(this.getGetTotalTps()));
+        result.put("getTransferedTps", String.valueOf(this.getGetTransferedTps()));
+
+        return result;
+    }
+
+    public void run() {
+        log.info(this.getServiceName() + " service started");
+
+        while (!this.isStopped()) {
+            try {
+                this.waitForRunning(FREQUENCY_OF_SAMPLING);
+
+                this.sampling();
+
+                this.printTps();
+            } catch (Exception e) {
+                log.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        log.info(this.getServiceName() + " service end");
+    }
+
+    @Override
+    public String getServiceName() {
+        return StoreStatsService.class.getSimpleName();
+    }
+
+    private void sampling() {
+        this.lockSampling.lock();
+        try {
+            this.putTimesList.add(new CallSnapshot(System.currentTimeMillis(), getPutMessageTimesTotal()));
+            if (this.putTimesList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
+                this.putTimesList.removeFirst();
+            }
+
+            this.getTimesFoundList.add(new CallSnapshot(System.currentTimeMillis(),
+                    this.getMessageTimesTotalFound.get()));
+            if (this.getTimesFoundList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
+                this.getTimesFoundList.removeFirst();
+            }
+
+            this.getTimesMissList.add(new CallSnapshot(System.currentTimeMillis(),
+                    this.getMessageTimesTotalMiss.get()));
+            if (this.getTimesMissList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
+                this.getTimesMissList.removeFirst();
+            }
+
+            this.transferedMsgCountList.add(new CallSnapshot(System.currentTimeMillis(),
+                    this.getMessageTransferedMsgCount.get()));
+            if (this.transferedMsgCountList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
+                this.transferedMsgCountList.removeFirst();
+            }
+
+        } finally {
+            this.lockSampling.unlock();
+        }
+    }
+
+    private void printTps() {
+        if (System.currentTimeMillis() > (this.lastPrintTimestamp + printTPSInterval * 1000)) {
+            this.lastPrintTimestamp = System.currentTimeMillis();
+
+            log.info("[STORETPS] put_tps {} get_found_tps {} get_miss_tps {} get_transfered_tps {}",
+                    this.getPutTps(printTPSInterval),
+                    this.getGetFoundTps(printTPSInterval),
+                    this.getGetMissTps(printTPSInterval),
+                    this.getGetTransferedTps(printTPSInterval)
+            );
+
+            final AtomicLong[] times = this.initPutMessageDistributeTime();
+            if (null == times) return;
+
+            final StringBuilder sb = new StringBuilder();
+            long totalPut = 0;
+            for (int i = 0; i < times.length; i++) {
+                long value = times[i].get();
+                totalPut += value;
+                sb.append(String.format("%s:%d", PUT_MESSAGE_ENTIRE_TIME_MAX_DESC[i], value));
+                sb.append(" ");
+            }
+
+            log.info("[PAGECACHERT] TotalPut {}, PutMessageDistributeTime {}", totalPut, sb.toString());
+        }
+    }
+
+    public AtomicLong getGetMessageTimesTotalFound() {
+        return getMessageTimesTotalFound;
+    }
+
+
+    public AtomicLong getGetMessageTimesTotalMiss() {
+        return getMessageTimesTotalMiss;
+    }
+
+
+    public AtomicLong getGetMessageTransferedMsgCount() {
+        return getMessageTransferedMsgCount;
+    }
+
+
+    public AtomicLong getPutMessageFailedTimes() {
+        return putMessageFailedTimes;
+    }
+
+
+    public AtomicLong getSinglePutMessageTopicSizeTotal(String topic) {
+        AtomicLong rs = putMessageTopicSizeTotal.get(topic);
+        if (null == rs) {
+            rs = new AtomicLong(0);
+            putMessageTopicSizeTotal.put(topic, rs);
+        }
+        return rs;
+    }
+
+
+    public AtomicLong getSinglePutMessageTopicTimesTotal(String topic) {
+        AtomicLong rs = putMessageTopicTimesTotal.get(topic);
+        if (null == rs) {
+            rs = new AtomicLong(0);
+            putMessageTopicTimesTotal.put(topic, rs);
+        }
+        return rs;
+    }
+
+
+    public Map<String, AtomicLong> getPutMessageTopicTimesTotal() {
+        return putMessageTopicTimesTotal;
+    }
+
+
+    public Map<String, AtomicLong> getPutMessageTopicSizeTotal() {
+        return putMessageTopicSizeTotal;
+    }
+
+    static class CallSnapshot {
+        public final long timestamp;
+        public final long callTimesTotal;
+
+
+        public CallSnapshot(long timestamp, long callTimesTotal) {
+            this.timestamp = timestamp;
+            this.callTimesTotal = callTimesTotal;
+        }
+
+
+        public static double getTPS(final CallSnapshot begin, final CallSnapshot end) {
+            long total = end.callTimesTotal - begin.callTimesTotal;
+            Long time = end.timestamp - begin.timestamp;
+
+            double tps = total / time.doubleValue();
+
+            return tps * 1000;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreUtil.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreUtil.java
new file mode 100644
index 0000000..9e0b565
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreUtil.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class StoreUtil {
+    public static final long TOTAL_PHYSICAL_MEMORY_SIZE = getTotalPhysicalMemorySize();
+
+
+    @SuppressWarnings("restriction")
+    public static long getTotalPhysicalMemorySize() {
+        long physicalTotal = 1024 * 1024 * 1024 * 24;
+        OperatingSystemMXBean osmxb = ManagementFactory.getOperatingSystemMXBean();
+        if (osmxb instanceof com.sun.management.OperatingSystemMXBean) {
+            physicalTotal = ((com.sun.management.OperatingSystemMXBean) osmxb).getTotalPhysicalMemorySize();
+        }
+
+        return physicalTotal;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/TransientStorePool.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/TransientStorePool.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/TransientStorePool.java
new file mode 100644
index 0000000..8abe7e9
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/TransientStorePool.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import com.alibaba.rocketmq.store.util.LibC;
+import com.sun.jna.NativeLong;
+import com.sun.jna.Pointer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+/**
+ * @author xinyuzhou.zxy
+ */
+public class TransientStorePool {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+    private final int poolSize;
+    private final int fileSize;
+    private final Deque<ByteBuffer> availableBuffers;
+    private final MessageStoreConfig storeConfig;
+
+    public TransientStorePool(final MessageStoreConfig storeConfig) {
+        this.storeConfig = storeConfig;
+        this.poolSize = storeConfig.getTransientStorePoolSize();
+        this.fileSize = storeConfig.getMapedFileSizeCommitLog();
+        this.availableBuffers = new ConcurrentLinkedDeque<>();
+    }
+
+    /**
+     * It's a heavy init method.
+     */
+    public void init() {
+        for (int i = 0; i < poolSize; i++) {
+            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
+
+            final long address = ((DirectBuffer) byteBuffer).address();
+            Pointer pointer = new Pointer(address);
+            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
+
+            availableBuffers.offer(byteBuffer);
+        }
+    }
+
+    public void destroy() {
+        for (ByteBuffer byteBuffer : availableBuffers) {
+            final long address = ((DirectBuffer) byteBuffer).address();
+            Pointer pointer = new Pointer(address);
+            LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
+        }
+    }
+
+    public void returnBuffer(ByteBuffer byteBuffer) {
+        byteBuffer.position(0);
+        byteBuffer.limit(fileSize);
+        this.availableBuffers.offerFirst(byteBuffer);
+    }
+
+    public ByteBuffer borrowBuffer() {
+        ByteBuffer buffer = availableBuffers.pollFirst();
+        if (availableBuffers.size() < poolSize * 0.4) {
+            log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());
+        }
+        return buffer;
+    }
+
+    public int remainBufferNumbs() {
+        if (storeConfig.isTransientStorePoolEnable()) {
+            return availableBuffers.size();
+        }
+        return Integer.MAX_VALUE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/BrokerRole.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/BrokerRole.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/BrokerRole.java
new file mode 100644
index 0000000..06714b2
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/BrokerRole.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store.config;
+
+/**
+ * @author shijia.wxr
+ */
+public enum BrokerRole {
+    ASYNC_MASTER,
+    SYNC_MASTER,
+    SLAVE;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/FlushDiskType.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/FlushDiskType.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/FlushDiskType.java
new file mode 100644
index 0000000..48ae4b2
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/FlushDiskType.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store.config;
+
+/**
+ * @author shijia.wxr
+ */
+public enum FlushDiskType {
+    SYNC_FLUSH,
+    ASYNC_FLUSH
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/MessageStoreConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/MessageStoreConfig.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/MessageStoreConfig.java
new file mode 100644
index 0000000..138bc2e
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/MessageStoreConfig.java
@@ -0,0 +1,727 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store.config;
+
+import com.alibaba.rocketmq.common.annotation.ImportantField;
+import com.alibaba.rocketmq.store.ConsumeQueue;
+
+import java.io.File;
+
+
+/**
+ * @author vongosling
+ * @author shijia.wxr
+ */
+public class MessageStoreConfig {
+    //The root directory in which the log data is kept
+    @ImportantField
+    private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
+
+    //The directory in which the commitlog is kept
+    @ImportantField
+    private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+            + File.separator + "commitlog";
+
+    // CommitLog file size,default is 1G
+    private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;
+    // ConsumeQueue file size,default is 30W
+    private int mapedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;
+
+    // CommitLog flush interval
+    // flush data to disk
+    @ImportantField
+    private int flushIntervalCommitLog = 500;
+
+    // Only used if TransientStorePool enabled
+    // flush data to FileChannel
+    @ImportantField
+    private int commitIntervalCommitLog = 200;
+
+    private boolean useReentrantLockWhenPutMessage = false;
+
+    // Whether schedule flush,default is real-time
+    @ImportantField
+    private boolean flushCommitLogTimed = false;
+    // ConsumeQueue flush interval
+    private int flushIntervalConsumeQueue = 1000;
+    // Resource reclaim interval
+    private int cleanResourceInterval = 10000;
+    // CommitLog removal interval
+    private int deleteCommitLogFilesInterval = 100;
+    // ConsumeQueue removal interval
+    private int deleteConsumeQueueFilesInterval = 100;
+    private int destroyMapedFileIntervalForcibly = 1000 * 120;
+    private int redeleteHangedFileInterval = 1000 * 120;
+    // When to delete,default is at 4 am
+    @ImportantField
+    private String deleteWhen = "04";
+    private int diskMaxUsedSpaceRatio = 75;
+    // The number of hours to keep a log file before deleting it (in hours)
+    @ImportantField
+    private int fileReservedTime = 72;
+    // Flow control for ConsumeQueue
+    private int putMsgIndexHightWater = 600000;
+    // The maximum size of a single log file,default is 512K
+    private int maxMessageSize = 1024 * 1024 * 4;
+    // Whether check the CRC32 of the records consumed.
+    // This ensures no on-the-wire or on-disk corruption to the messages occurred.
+    // This check adds some overhead,so it may be disabled in cases seeking extreme performance.
+    private boolean checkCRCOnRecover = true;
+    // How many pages are to be flushed when flush CommitLog
+    private int flushCommitLogLeastPages = 4;
+    // How many pages are to be committed when commit data to file
+    private int commitCommitLogLeastPages = 4;
+    // Flush page size when the disk in warming state
+    private int flushLeastPagesWhenWarmMapedFile = 1024 / 4 * 16;
+    // How many pages are to be flushed when flush ConsumeQueue
+    private int flushConsumeQueueLeastPages = 2;
+    private int flushCommitLogThoroughInterval = 1000 * 10;
+    private int commitCommitLogThoroughInterval = 200;
+    private int flushConsumeQueueThoroughInterval = 1000 * 60;
+    @ImportantField
+    private int maxTransferBytesOnMessageInMemory = 1024 * 256;
+    @ImportantField
+    private int maxTransferCountOnMessageInMemory = 32;
+    @ImportantField
+    private int maxTransferBytesOnMessageInDisk = 1024 * 64;
+    @ImportantField
+    private int maxTransferCountOnMessageInDisk = 8;
+    @ImportantField
+    private int accessMessageInMemoryMaxRatio = 40;
+    @ImportantField
+    private boolean messageIndexEnable = true;
+    private int maxHashSlotNum = 5000000;
+    private int maxIndexNum = 5000000 * 4;
+    private int maxMsgsNumBatch = 64;
+    @ImportantField
+    private boolean messageIndexSafe = false;
+    private int haListenPort = 10912;
+    private int haSendHeartbeatInterval = 1000 * 5;
+    private int haHousekeepingInterval = 1000 * 20;
+    private int haTransferBatchSize = 1024 * 32;
+    @ImportantField
+    private String haMasterAddress = null;
+    private int haSlaveFallbehindMax = 1024 * 1024 * 256;
+    @ImportantField
+    private BrokerRole brokerRole = BrokerRole.ASYNC_MASTER;
+    @ImportantField
+    private FlushDiskType flushDiskType = FlushDiskType.ASYNC_FLUSH;
+    private int syncFlushTimeout = 1000 * 5;
+    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
+    private long flushDelayOffsetInterval = 1000 * 10;
+    @ImportantField
+    private boolean cleanFileForciblyEnable = true;
+    private boolean warmMapedFileEnable = false;
+    private boolean offsetCheckInSlave = false;
+    private boolean debugLockEnable = false;
+    private boolean duplicationEnable = false;
+    private boolean diskFallRecorded = true;
+    private long osPageCacheBusyTimeOutMills = 1000;
+    private int defaultQueryMaxNum = 32;
+
+    @ImportantField
+    private boolean transientStorePoolEnable = false;
+    private int transientStorePoolSize = 5;
+    private boolean fastFailIfNoBufferInStorePool = false;
+    
+    public boolean isDebugLockEnable() {
+        return debugLockEnable;
+    }
+
+    public void setDebugLockEnable(final boolean debugLockEnable) {
+        this.debugLockEnable = debugLockEnable;
+    }
+
+    public boolean isDuplicationEnable() {
+        return duplicationEnable;
+    }
+
+    public void setDuplicationEnable(final boolean duplicationEnable) {
+        this.duplicationEnable = duplicationEnable;
+    }
+
+    public long getOsPageCacheBusyTimeOutMills() {
+        return osPageCacheBusyTimeOutMills;
+    }
+
+    public void setOsPageCacheBusyTimeOutMills(final long osPageCacheBusyTimeOutMills) {
+        this.osPageCacheBusyTimeOutMills = osPageCacheBusyTimeOutMills;
+    }
+
+    public boolean isDiskFallRecorded() {
+        return diskFallRecorded;
+    }
+
+    public void setDiskFallRecorded(final boolean diskFallRecorded) {
+        this.diskFallRecorded = diskFallRecorded;
+    }
+
+    public boolean isWarmMapedFileEnable() {
+        return warmMapedFileEnable;
+    }
+
+
+    public void setWarmMapedFileEnable(boolean warmMapedFileEnable) {
+        this.warmMapedFileEnable = warmMapedFileEnable;
+    }
+
+
+    public int getMapedFileSizeCommitLog() {
+        return mapedFileSizeCommitLog;
+    }
+
+
+    public void setMapedFileSizeCommitLog(int mapedFileSizeCommitLog) {
+        this.mapedFileSizeCommitLog = mapedFileSizeCommitLog;
+    }
+
+
+    public int getMapedFileSizeConsumeQueue() {
+
+        int factor = (int) Math.ceil(this.mapedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
+        return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);
+    }
+
+
+    public void setMapedFileSizeConsumeQueue(int mapedFileSizeConsumeQueue) {
+        this.mapedFileSizeConsumeQueue = mapedFileSizeConsumeQueue;
+    }
+
+
+    public int getFlushIntervalCommitLog() {
+        return flushIntervalCommitLog;
+    }
+
+
+    public void setFlushIntervalCommitLog(int flushIntervalCommitLog) {
+        this.flushIntervalCommitLog = flushIntervalCommitLog;
+    }
+
+
+    public int getFlushIntervalConsumeQueue() {
+        return flushIntervalConsumeQueue;
+    }
+
+
+    public void setFlushIntervalConsumeQueue(int flushIntervalConsumeQueue) {
+        this.flushIntervalConsumeQueue = flushIntervalConsumeQueue;
+    }
+
+
+    public int getPutMsgIndexHightWater() {
+        return putMsgIndexHightWater;
+    }
+
+
+    public void setPutMsgIndexHightWater(int putMsgIndexHightWater) {
+        this.putMsgIndexHightWater = putMsgIndexHightWater;
+    }
+
+
+    public int getCleanResourceInterval() {
+        return cleanResourceInterval;
+    }
+
+
+    public void setCleanResourceInterval(int cleanResourceInterval) {
+        this.cleanResourceInterval = cleanResourceInterval;
+    }
+
+
+    public int getMaxMessageSize() {
+        return maxMessageSize;
+    }
+
+
+    public void setMaxMessageSize(int maxMessageSize) {
+        this.maxMessageSize = maxMessageSize;
+    }
+
+
+    public boolean isCheckCRCOnRecover() {
+        return checkCRCOnRecover;
+    }
+
+
+    public boolean getCheckCRCOnRecover() {
+        return checkCRCOnRecover;
+    }
+
+
+    public void setCheckCRCOnRecover(boolean checkCRCOnRecover) {
+        this.checkCRCOnRecover = checkCRCOnRecover;
+    }
+
+
+    public String getStorePathCommitLog() {
+        return storePathCommitLog;
+    }
+
+
+    public void setStorePathCommitLog(String storePathCommitLog) {
+        this.storePathCommitLog = storePathCommitLog;
+    }
+
+
+    public String getDeleteWhen() {
+        return deleteWhen;
+    }
+
+
+    public void setDeleteWhen(String deleteWhen) {
+        this.deleteWhen = deleteWhen;
+    }
+
+
+    public int getDiskMaxUsedSpaceRatio() {
+        if (this.diskMaxUsedSpaceRatio < 10)
+            return 10;
+
+        if (this.diskMaxUsedSpaceRatio > 95)
+            return 95;
+
+        return diskMaxUsedSpaceRatio;
+    }
+
+
+    public void setDiskMaxUsedSpaceRatio(int diskMaxUsedSpaceRatio) {
+        this.diskMaxUsedSpaceRatio = diskMaxUsedSpaceRatio;
+    }
+
+
+    public int getDeleteCommitLogFilesInterval() {
+        return deleteCommitLogFilesInterval;
+    }
+
+
+    public void setDeleteCommitLogFilesInterval(int deleteCommitLogFilesInterval) {
+        this.deleteCommitLogFilesInterval = deleteCommitLogFilesInterval;
+    }
+
+
+    public int getDeleteConsumeQueueFilesInterval() {
+        return deleteConsumeQueueFilesInterval;
+    }
+
+
+    public void setDeleteConsumeQueueFilesInterval(int deleteConsumeQueueFilesInterval) {
+        this.deleteConsumeQueueFilesInterval = deleteConsumeQueueFilesInterval;
+    }
+
+
+    public int getMaxTransferBytesOnMessageInMemory() {
+        return maxTransferBytesOnMessageInMemory;
+    }
+
+
+    public void setMaxTransferBytesOnMessageInMemory(int maxTransferBytesOnMessageInMemory) {
+        this.maxTransferBytesOnMessageInMemory = maxTransferBytesOnMessageInMemory;
+    }
+
+
+    public int getMaxTransferCountOnMessageInMemory() {
+        return maxTransferCountOnMessageInMemory;
+    }
+
+
+    public void setMaxTransferCountOnMessageInMemory(int maxTransferCountOnMessageInMemory) {
+        this.maxTransferCountOnMessageInMemory = maxTransferCountOnMessageInMemory;
+    }
+
+
+    public int getMaxTransferBytesOnMessageInDisk() {
+        return maxTransferBytesOnMessageInDisk;
+    }
+
+
+    public void setMaxTransferBytesOnMessageInDisk(int maxTransferBytesOnMessageInDisk) {
+        this.maxTransferBytesOnMessageInDisk = maxTransferBytesOnMessageInDisk;
+    }
+
+
+    public int getMaxTransferCountOnMessageInDisk() {
+        return maxTransferCountOnMessageInDisk;
+    }
+
+
+    public void setMaxTransferCountOnMessageInDisk(int maxTransferCountOnMessageInDisk) {
+        this.maxTransferCountOnMessageInDisk = maxTransferCountOnMessageInDisk;
+    }
+
+
+    public int getFlushCommitLogLeastPages() {
+        return flushCommitLogLeastPages;
+    }
+
+
+    public void setFlushCommitLogLeastPages(int flushCommitLogLeastPages) {
+        this.flushCommitLogLeastPages = flushCommitLogLeastPages;
+    }
+
+
+    public int getFlushConsumeQueueLeastPages() {
+        return flushConsumeQueueLeastPages;
+    }
+
+
+    public void setFlushConsumeQueueLeastPages(int flushConsumeQueueLeastPages) {
+        this.flushConsumeQueueLeastPages = flushConsumeQueueLeastPages;
+    }
+
+
+    public int getFlushCommitLogThoroughInterval() {
+        return flushCommitLogThoroughInterval;
+    }
+
+
+    public void setFlushCommitLogThoroughInterval(int flushCommitLogThoroughInterval) {
+        this.flushCommitLogThoroughInterval = flushCommitLogThoroughInterval;
+    }
+
+
+    public int getFlushConsumeQueueThoroughInterval() {
+        return flushConsumeQueueThoroughInterval;
+    }
+
+
+    public void setFlushConsumeQueueThoroughInterval(int flushConsumeQueueThoroughInterval) {
+        this.flushConsumeQueueThoroughInterval = flushConsumeQueueThoroughInterval;
+    }
+
+
+    public int getDestroyMapedFileIntervalForcibly() {
+        return destroyMapedFileIntervalForcibly;
+    }
+
+
+    public void setDestroyMapedFileIntervalForcibly(int destroyMapedFileIntervalForcibly) {
+        this.destroyMapedFileIntervalForcibly = destroyMapedFileIntervalForcibly;
+    }
+
+
+    public int getFileReservedTime() {
+        return fileReservedTime;
+    }
+
+
+    public void setFileReservedTime(int fileReservedTime) {
+        this.fileReservedTime = fileReservedTime;
+    }
+
+
+    public int getRedeleteHangedFileInterval() {
+        return redeleteHangedFileInterval;
+    }
+
+
+    public void setRedeleteHangedFileInterval(int redeleteHangedFileInterval) {
+        this.redeleteHangedFileInterval = redeleteHangedFileInterval;
+    }
+
+
+    public int getAccessMessageInMemoryMaxRatio() {
+        return accessMessageInMemoryMaxRatio;
+    }
+
+
+    public void setAccessMessageInMemoryMaxRatio(int accessMessageInMemoryMaxRatio) {
+        this.accessMessageInMemoryMaxRatio = accessMessageInMemoryMaxRatio;
+    }
+
+
+    public boolean isMessageIndexEnable() {
+        return messageIndexEnable;
+    }
+
+
+    public void setMessageIndexEnable(boolean messageIndexEnable) {
+        this.messageIndexEnable = messageIndexEnable;
+    }
+
+
+    public int getMaxHashSlotNum() {
+        return maxHashSlotNum;
+    }
+
+
+    public void setMaxHashSlotNum(int maxHashSlotNum) {
+        this.maxHashSlotNum = maxHashSlotNum;
+    }
+
+
+    public int getMaxIndexNum() {
+        return maxIndexNum;
+    }
+
+
+    public void setMaxIndexNum(int maxIndexNum) {
+        this.maxIndexNum = maxIndexNum;
+    }
+
+
+    public int getMaxMsgsNumBatch() {
+        return maxMsgsNumBatch;
+    }
+
+
+    public void setMaxMsgsNumBatch(int maxMsgsNumBatch) {
+        this.maxMsgsNumBatch = maxMsgsNumBatch;
+    }
+
+
+    public int getHaListenPort() {
+        return haListenPort;
+    }
+
+
+    public void setHaListenPort(int haListenPort) {
+        this.haListenPort = haListenPort;
+    }
+
+
+    public int getHaSendHeartbeatInterval() {
+        return haSendHeartbeatInterval;
+    }
+
+
+    public void setHaSendHeartbeatInterval(int haSendHeartbeatInterval) {
+        this.haSendHeartbeatInterval = haSendHeartbeatInterval;
+    }
+
+
+    public int getHaHousekeepingInterval() {
+        return haHousekeepingInterval;
+    }
+
+
+    public void setHaHousekeepingInterval(int haHousekeepingInterval) {
+        this.haHousekeepingInterval = haHousekeepingInterval;
+    }
+
+
+    public BrokerRole getBrokerRole() {
+        return brokerRole;
+    }
+
+    public void setBrokerRole(BrokerRole brokerRole) {
+        this.brokerRole = brokerRole;
+    }
+
+    public void setBrokerRole(String brokerRole) {
+        this.brokerRole = BrokerRole.valueOf(brokerRole);
+    }
+
+    public int getHaTransferBatchSize() {
+        return haTransferBatchSize;
+    }
+
+
+    public void setHaTransferBatchSize(int haTransferBatchSize) {
+        this.haTransferBatchSize = haTransferBatchSize;
+    }
+
+
+    public int getHaSlaveFallbehindMax() {
+        return haSlaveFallbehindMax;
+    }
+
+
+    public void setHaSlaveFallbehindMax(int haSlaveFallbehindMax) {
+        this.haSlaveFallbehindMax = haSlaveFallbehindMax;
+    }
+
+
+    public FlushDiskType getFlushDiskType() {
+        return flushDiskType;
+    }
+
+    public void setFlushDiskType(FlushDiskType flushDiskType) {
+        this.flushDiskType = flushDiskType;
+    }
+
+    public void setFlushDiskType(String type) {
+        this.flushDiskType = FlushDiskType.valueOf(type);
+    }
+
+    public int getSyncFlushTimeout() {
+        return syncFlushTimeout;
+    }
+
+
+    public void setSyncFlushTimeout(int syncFlushTimeout) {
+        this.syncFlushTimeout = syncFlushTimeout;
+    }
+
+
+    public String getHaMasterAddress() {
+        return haMasterAddress;
+    }
+
+
+    public void setHaMasterAddress(String haMasterAddress) {
+        this.haMasterAddress = haMasterAddress;
+    }
+
+
+    public String getMessageDelayLevel() {
+        return messageDelayLevel;
+    }
+
+
+    public void setMessageDelayLevel(String messageDelayLevel) {
+        this.messageDelayLevel = messageDelayLevel;
+    }
+
+
+    public long getFlushDelayOffsetInterval() {
+        return flushDelayOffsetInterval;
+    }
+
+
+    public void setFlushDelayOffsetInterval(long flushDelayOffsetInterval) {
+        this.flushDelayOffsetInterval = flushDelayOffsetInterval;
+    }
+
+
+    public boolean isCleanFileForciblyEnable() {
+        return cleanFileForciblyEnable;
+    }
+
+
+    public void setCleanFileForciblyEnable(boolean cleanFileForciblyEnable) {
+        this.cleanFileForciblyEnable = cleanFileForciblyEnable;
+    }
+
+
+    public boolean isMessageIndexSafe() {
+        return messageIndexSafe;
+    }
+
+
+    public void setMessageIndexSafe(boolean messageIndexSafe) {
+        this.messageIndexSafe = messageIndexSafe;
+    }
+
+
+    public boolean isFlushCommitLogTimed() {
+        return flushCommitLogTimed;
+    }
+
+
+    public void setFlushCommitLogTimed(boolean flushCommitLogTimed) {
+        this.flushCommitLogTimed = flushCommitLogTimed;
+    }
+
+
+    public String getStorePathRootDir() {
+        return storePathRootDir;
+    }
+
+
+    public void setStorePathRootDir(String storePathRootDir) {
+        this.storePathRootDir = storePathRootDir;
+    }
+
+
+    public int getFlushLeastPagesWhenWarmMapedFile() {
+        return flushLeastPagesWhenWarmMapedFile;
+    }
+
+
+    public void setFlushLeastPagesWhenWarmMapedFile(int flushLeastPagesWhenWarmMapedFile) {
+        this.flushLeastPagesWhenWarmMapedFile = flushLeastPagesWhenWarmMapedFile;
+    }
+
+
+    public boolean isOffsetCheckInSlave() {
+        return offsetCheckInSlave;
+    }
+
+
+    public void setOffsetCheckInSlave(boolean offsetCheckInSlave) {
+        this.offsetCheckInSlave = offsetCheckInSlave;
+    }
+
+    public int getDefaultQueryMaxNum() {
+        return defaultQueryMaxNum;
+    }
+
+    public void setDefaultQueryMaxNum(int defaultQueryMaxNum) {
+        this.defaultQueryMaxNum = defaultQueryMaxNum;
+    }
+
+    /**
+     * Enable transient commitLog store poll only if transientStorePoolEnable is true and the FlushDiskType is ASYNC_FLUSH
+     * @return <tt>true</tt> or <tt>false</tt>
+     */
+    public boolean isTransientStorePoolEnable() {
+        return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
+                && BrokerRole.SLAVE != getBrokerRole();
+    }
+
+    public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) {
+        this.transientStorePoolEnable = transientStorePoolEnable;
+    }
+
+    public int getTransientStorePoolSize() {
+        return transientStorePoolSize;
+    }
+
+    public void setTransientStorePoolSize(final int transientStorePoolSize) {
+        this.transientStorePoolSize = transientStorePoolSize;
+    }
+
+    public int getCommitIntervalCommitLog() {
+        return commitIntervalCommitLog;
+    }
+
+    public void setCommitIntervalCommitLog(final int commitIntervalCommitLog) {
+        this.commitIntervalCommitLog = commitIntervalCommitLog;
+    }
+
+    public boolean isFastFailIfNoBufferInStorePool() {
+        return fastFailIfNoBufferInStorePool;
+    }
+
+    public void setFastFailIfNoBufferInStorePool(final boolean fastFailIfNoBufferInStorePool) {
+        this.fastFailIfNoBufferInStorePool = fastFailIfNoBufferInStorePool;
+    }
+
+    public boolean isUseReentrantLockWhenPutMessage() {
+        return useReentrantLockWhenPutMessage;
+    }
+
+    public void setUseReentrantLockWhenPutMessage(final boolean useReentrantLockWhenPutMessage) {
+        this.useReentrantLockWhenPutMessage = useReentrantLockWhenPutMessage;
+    }
+
+    public int getCommitCommitLogLeastPages() {
+        return commitCommitLogLeastPages;
+    }
+
+    public void setCommitCommitLogLeastPages(final int commitCommitLogLeastPages) {
+        this.commitCommitLogLeastPages = commitCommitLogLeastPages;
+    }
+
+    public int getCommitCommitLogThoroughInterval() {
+        return commitCommitLogThoroughInterval;
+    }
+
+    public void setCommitCommitLogThoroughInterval(final int commitCommitLogThoroughInterval) {
+        this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/StorePathConfigHelper.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/StorePathConfigHelper.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/StorePathConfigHelper.java
new file mode 100644
index 0000000..d3d77c0
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/config/StorePathConfigHelper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store.config;
+
+import java.io.File;
+
+
+public class StorePathConfigHelper {
+
+    public static String getStorePathConsumeQueue(final String rootDir) {
+        return rootDir + File.separator + "consumequeue";
+    }
+
+
+    public static String getStorePathIndex(final String rootDir) {
+        return rootDir + File.separator + "index";
+    }
+
+
+    public static String getStoreCheckpoint(final String rootDir) {
+        return rootDir + File.separator + "checkpoint";
+    }
+
+
+    public static String getAbortFile(final String rootDir) {
+        return rootDir + File.separator + "abort";
+    }
+
+
+    public static String getDelayOffsetStorePath(final String rootDir) {
+        return rootDir + File.separator + "config" + File.separator + "delayOffset.json";
+    }
+
+
+    public static String getTranStateTableStorePath(final String rootDir) {
+        return rootDir + File.separator + "transaction" + File.separator + "statetable";
+    }
+
+
+    public static String getTranRedoLogStorePath(final String rootDir) {
+        return rootDir + File.separator + "transaction" + File.separator + "redolog";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAConnection.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAConnection.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAConnection.java
new file mode 100644
index 0000000..1773059
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAConnection.java
@@ -0,0 +1,429 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store.ha;
+
+import com.alibaba.rocketmq.common.ServiceThread;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import com.alibaba.rocketmq.store.SelectMappedBufferResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class HAConnection {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final HAService haService;
+    private final SocketChannel socketChannel;
+    private final String clientAddr;
+    private WriteSocketService writeSocketService;
+    private ReadSocketService readSocketService;
+
+    private volatile long slaveRequestOffset = -1;
+    private volatile long slaveAckOffset = -1;
+
+
+    public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
+        this.haService = haService;
+        this.socketChannel = socketChannel;
+        this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
+        this.socketChannel.configureBlocking(false);
+        this.socketChannel.socket().setSoLinger(false, -1);
+        this.socketChannel.socket().setTcpNoDelay(true);
+        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
+        this.socketChannel.socket().setSendBufferSize(1024 * 64);
+        this.writeSocketService = new WriteSocketService(this.socketChannel);
+        this.readSocketService = new ReadSocketService(this.socketChannel);
+        this.haService.getConnectionCount().incrementAndGet();
+    }
+
+
+    public void start() {
+        this.readSocketService.start();
+        this.writeSocketService.start();
+    }
+
+
+    public void shutdown() {
+        this.writeSocketService.shutdown(true);
+        this.readSocketService.shutdown(true);
+        this.close();
+    }
+
+
+    public void close() {
+        if (this.socketChannel != null) {
+            try {
+                this.socketChannel.close();
+            } catch (IOException e) {
+                HAConnection.log.error("", e);
+            }
+        }
+    }
+
+
+    public SocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    /**
+
+     *
+     * @author shijia.wxr
+     */
+    class ReadSocketService extends ServiceThread {
+        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
+        private final Selector selector;
+        private final SocketChannel socketChannel;
+        private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+        private int processPostion = 0;
+        private volatile long lastReadTimestamp = System.currentTimeMillis();
+
+
+        public ReadSocketService(final SocketChannel socketChannel) throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+            this.thread.setDaemon(true);
+        }
+
+
+        @Override
+        public void run() {
+            HAConnection.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+                    boolean ok = this.processReadEvent();
+                    if (!ok) {
+                        HAConnection.log.error("processReadEvent error");
+                        break;
+                    }
+
+
+                    long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
+                    if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
+                        log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
+                        break;
+                    }
+                } catch (Exception e) {
+                    HAConnection.log.error(this.getServiceName() + " service has exception.", e);
+                    break;
+                }
+            }
+
+            this.makeStop();
+
+            writeSocketService.makeStop();
+
+
+            haService.removeConnection(HAConnection.this);
+
+
+            HAConnection.this.haService.getConnectionCount().decrementAndGet();
+
+            SelectionKey sk = this.socketChannel.keyFor(this.selector);
+            if (sk != null) {
+                sk.cancel();
+            }
+
+            try {
+                this.selector.close();
+                this.socketChannel.close();
+            } catch (IOException e) {
+                HAConnection.log.error("", e);
+            }
+
+            HAConnection.log.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            return ReadSocketService.class.getSimpleName();
+        }
+
+        private boolean processReadEvent() {
+            int readSizeZeroTimes = 0;
+
+            if (!this.byteBufferRead.hasRemaining()) {
+                this.byteBufferRead.flip();
+                this.processPostion = 0;
+            }
+
+            while (this.byteBufferRead.hasRemaining()) {
+                try {
+                    int readSize = this.socketChannel.read(this.byteBufferRead);
+                    if (readSize > 0) {
+                        readSizeZeroTimes = 0;
+                        this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
+                        if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
+                            int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
+                            long readOffset = this.byteBufferRead.getLong(pos - 8);
+                            this.processPostion = pos;
+
+
+                            HAConnection.this.slaveAckOffset = readOffset;
+                            if (HAConnection.this.slaveRequestOffset < 0) {
+                                HAConnection.this.slaveRequestOffset = readOffset;
+                                log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
+                            }
+
+
+                            HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
+                        }
+                    } else if (readSize == 0) {
+                        if (++readSizeZeroTimes >= 3) {
+                            break;
+                        }
+                    } else {
+                        log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
+                        return false;
+                    }
+                } catch (IOException e) {
+                    log.error("processReadEvent exception", e);
+                    return false;
+                }
+            }
+
+            return true;
+        }
+    }
+
+    /**
+
+     *
+     * @author shijia.wxr
+     */
+    class WriteSocketService extends ServiceThread {
+        private final Selector selector;
+        private final SocketChannel socketChannel;
+
+        private final int headerSize = 8 + 4;
+        private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);
+        private long nextTransferFromWhere = -1;
+        private SelectMappedBufferResult selectMappedBufferResult;
+        private boolean lastWriteOver = true;
+        private long lastWriteTimestamp = System.currentTimeMillis();
+
+
+        public WriteSocketService(final SocketChannel socketChannel) throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
+            this.thread.setDaemon(true);
+        }
+
+
+        @Override
+        public void run() {
+            HAConnection.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+
+                    if (-1 == HAConnection.this.slaveRequestOffset) {
+                        Thread.sleep(10);
+                        continue;
+                    }
+
+
+
+                    if (-1 == this.nextTransferFromWhere) {
+                        if (0 == HAConnection.this.slaveRequestOffset) {
+                            long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
+                            masterOffset =
+                                    masterOffset
+                                            - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
+                                            .getMapedFileSizeCommitLog());
+
+                            if (masterOffset < 0) {
+                                masterOffset = 0;
+                            }
+
+                            this.nextTransferFromWhere = masterOffset;
+                        } else {
+                            this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
+                        }
+
+                        log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+                                + "], and slave request " + HAConnection.this.slaveRequestOffset);
+                    }
+
+                    if (this.lastWriteOver) {
+
+                        long interval =
+                                HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
+
+                        if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
+                                .getHaSendHeartbeatInterval()) {
+
+                            // Build Header
+                            this.byteBufferHeader.position(0);
+                            this.byteBufferHeader.limit(headerSize);
+                            this.byteBufferHeader.putLong(this.nextTransferFromWhere);
+                            this.byteBufferHeader.putInt(0);
+                            this.byteBufferHeader.flip();
+
+                            this.lastWriteOver = this.transferData();
+                            if (!this.lastWriteOver)
+                                continue;
+                        }
+                    }
+
+                    else {
+                        this.lastWriteOver = this.transferData();
+                        if (!this.lastWriteOver)
+                            continue;
+                    }
+
+                    SelectMappedBufferResult selectResult =
+                            HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
+                    if (selectResult != null) {
+                        int size = selectResult.getSize();
+                        if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
+                            size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
+                        }
+
+                        long thisOffset = this.nextTransferFromWhere;
+                        this.nextTransferFromWhere += size;
+
+                        selectResult.getByteBuffer().limit(size);
+                        this.selectMappedBufferResult = selectResult;
+
+                        // Build Header
+                        this.byteBufferHeader.position(0);
+                        this.byteBufferHeader.limit(headerSize);
+                        this.byteBufferHeader.putLong(thisOffset);
+                        this.byteBufferHeader.putInt(size);
+                        this.byteBufferHeader.flip();
+
+                        this.lastWriteOver = this.transferData();
+                    } else {
+
+                        HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
+                    }
+                } catch (Exception e) {
+
+                    HAConnection.log.error(this.getServiceName() + " service has exception.", e);
+                    break;
+                }
+            }
+
+
+            if (this.selectMappedBufferResult != null) {
+                this.selectMappedBufferResult.release();
+            }
+
+            this.makeStop();
+
+            readSocketService.makeStop();
+
+
+            haService.removeConnection(HAConnection.this);
+
+            SelectionKey sk = this.socketChannel.keyFor(this.selector);
+            if (sk != null) {
+                sk.cancel();
+            }
+
+            try {
+                this.selector.close();
+                this.socketChannel.close();
+            } catch (IOException e) {
+                HAConnection.log.error("", e);
+            }
+
+            HAConnection.log.info(this.getServiceName() + " service end");
+        }
+
+
+        /**
+
+         */
+        private boolean transferData() throws Exception {
+            int writeSizeZeroTimes = 0;
+            // Write Header
+            while (this.byteBufferHeader.hasRemaining()) {
+                int writeSize = this.socketChannel.write(this.byteBufferHeader);
+                if (writeSize > 0) {
+                    writeSizeZeroTimes = 0;
+                    this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
+                } else if (writeSize == 0) {
+                    if (++writeSizeZeroTimes >= 3) {
+                        break;
+                    }
+                } else {
+                    throw new Exception("ha master write header error < 0");
+                }
+            }
+
+            if (null == this.selectMappedBufferResult) {
+                return !this.byteBufferHeader.hasRemaining();
+            }
+
+            writeSizeZeroTimes = 0;
+
+            // Write Body
+            if (!this.byteBufferHeader.hasRemaining()) {
+                while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
+                    int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
+                    if (writeSize > 0) {
+                        writeSizeZeroTimes = 0;
+                        this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
+                    } else if (writeSize == 0) {
+                        if (++writeSizeZeroTimes >= 3) {
+                            break;
+                        }
+                    } else {
+                        throw new Exception("ha master write body error < 0");
+                    }
+                }
+            }
+
+            boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
+
+            if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
+                this.selectMappedBufferResult.release();
+                this.selectMappedBufferResult = null;
+            }
+
+            return result;
+        }
+
+
+        @Override
+        public String getServiceName() {
+            return WriteSocketService.class.getSimpleName();
+        }
+
+
+        @Override
+        public void shutdown() {
+            super.shutdown();
+        }
+    }
+}