You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 08:13:33 UTC

[08/34] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 6326d4b..d9e2f03 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -16,14 +16,12 @@
  */
 package org.apache.rocketmq.store;
 
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.List;
-
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ConsumeQueue {
 
@@ -43,13 +41,12 @@ public class ConsumeQueue {
     private long maxPhysicOffset = -1;
     private volatile long minLogicOffset = 0;
 
-
     public ConsumeQueue(
-            final String topic,
-            final int queueId,
-            final String storePath,
-            final int mappedFileSize,
-            final DefaultMessageStore defaultMessageStore) {
+        final String topic,
+        final int queueId,
+        final String storePath,
+        final int mappedFileSize,
+        final DefaultMessageStore defaultMessageStore) {
         this.storePath = storePath;
         this.mappedFileSize = mappedFileSize;
         this.defaultMessageStore = defaultMessageStore;
@@ -58,22 +55,20 @@ public class ConsumeQueue {
         this.queueId = queueId;
 
         String queueDir = this.storePath
-                + File.separator + topic
-                + File.separator + queueId;
+            + File.separator + topic
+            + File.separator + queueId;
 
         this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
 
         this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
     }
 
-
     public boolean load() {
         boolean result = this.mappedFileQueue.load();
         log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
         return result;
     }
 
-
     public void recover() {
         final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
         if (!mappedFiles.isEmpty()) {
@@ -98,18 +93,17 @@ public class ConsumeQueue {
                         this.maxPhysicOffset = offset;
                     } else {
                         log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
-                                + offset + " " + size + " " + tagsCode);
+                            + offset + " " + size + " " + tagsCode);
                         break;
                     }
                 }
 
-
                 if (mapedFileOffset == mapedFileSizeLogics) {
                     index++;
                     if (index >= mappedFiles.size()) {
 
                         log.info("recover last consume queue file over, last maped file "
-                                + mappedFile.getFileName());
+                            + mappedFile.getFileName());
                         break;
                     } else {
                         mappedFile = mappedFiles.get(index);
@@ -120,7 +114,7 @@ public class ConsumeQueue {
                     }
                 } else {
                     log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
-                            + (processOffset + mapedFileOffset));
+                        + (processOffset + mapedFileOffset));
                     break;
                 }
             }
@@ -137,8 +131,8 @@ public class ConsumeQueue {
         if (mappedFile != null) {
             long offset = 0;
             int low =
-                    minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile
-                            .getFileFromOffset()) : 0;
+                minLogicOffset > mappedFile.getFileFromOffset() ? (int)(minLogicOffset - mappedFile
+                    .getFileFromOffset()) : 0;
             int high = 0;
             int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
             long leftIndexValue = -1L, rightIndexValue = -1L;
@@ -160,7 +154,7 @@ public class ConsumeQueue {
                         }
 
                         long storeTime =
-                                this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
+                            this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
                         if (storeTime < 0) {
                             return 0;
                         } else if (storeTime == timestamp) {
@@ -189,8 +183,8 @@ public class ConsumeQueue {
                             offset = leftOffset;
                         } else {
                             offset =
-                                    Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
-                                            - rightIndexValue) ? rightOffset : leftOffset;
+                                Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
+                                    - rightIndexValue) ? rightOffset : leftOffset;
                         }
                     }
 
@@ -223,7 +217,6 @@ public class ConsumeQueue {
                     int size = byteBuffer.getInt();
                     byteBuffer.getLong();
 
-
                     if (0 == i) {
                         if (offset >= phyOffet) {
                             this.mappedFileQueue.deleteLastMappedFile();
@@ -249,7 +242,6 @@ public class ConsumeQueue {
                             mappedFile.setFlushedPosition(pos);
                             this.maxPhysicOffset = offset;
 
-
                             if (pos == logicFileSize) {
                                 return;
                             }
@@ -283,7 +275,6 @@ public class ConsumeQueue {
                 int size = byteBuffer.getInt();
                 byteBuffer.getLong();
 
-
                 if (offset >= 0 && size > 0) {
                     lastOffset = offset + size;
                 } else {
@@ -295,12 +286,10 @@ public class ConsumeQueue {
         return lastOffset;
     }
 
-
     public boolean flush(final int flushLeastPages) {
         return this.mappedFileQueue.flush(flushLeastPages);
     }
 
-
     public int deleteExpiredFile(long offset) {
         int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
         this.correctMinOffset(offset);
@@ -322,7 +311,7 @@ public class ConsumeQueue {
                         if (offsetPy >= phyMinOffset) {
                             this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;
                             log.info("compute logics min offset: " + this.getMinOffsetInQuque() + ", topic: "
-                                    + this.topic + ", queueId: " + this.queueId);
+                                + this.topic + ", queueId: " + this.queueId);
                             break;
                         }
                     }
@@ -335,14 +324,12 @@ public class ConsumeQueue {
         }
     }
 
-
     public long getMinOffsetInQuque() {
         return this.minLogicOffset / CQ_STORE_UNIT_SIZE;
     }
 
-
     public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
-                                              long logicOffset) {
+        long logicOffset) {
         final int maxRetries = 30;
         boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
         for (int i = 0; i < maxRetries && canWrite; i++) {
@@ -353,7 +340,7 @@ public class ConsumeQueue {
             } else {
                 // XXX: warn and notify me
                 log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset
-                        + " failed, retry " + i + " times");
+                    + " failed, retry " + i + " times");
 
                 try {
                     Thread.sleep(1000);
@@ -369,7 +356,7 @@ public class ConsumeQueue {
     }
 
     private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
-                                           final long cqOffset) {
+        final long cqOffset) {
 
         if (offset <= this.maxPhysicOffset) {
             return true;
@@ -392,19 +379,19 @@ public class ConsumeQueue {
                 this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
                 this.fillPreBlank(mappedFile, expectLogicOffset);
                 log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
-                        + mappedFile.getWrotePosition());
+                    + mappedFile.getWrotePosition());
             }
 
             if (cqOffset != 0) {
                 long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
                 if (expectLogicOffset != currentLogicOffset) {
                     LOG_ERROR.warn(
-                            "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
-                            expectLogicOffset,
-                            currentLogicOffset,
-                            this.topic,
-                            this.queueId,
-                            expectLogicOffset - currentLogicOffset
+                        "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
+                        expectLogicOffset,
+                        currentLogicOffset,
+                        this.topic,
+                        this.queueId,
+                        expectLogicOffset - currentLogicOffset
                     );
                 }
             }
@@ -414,14 +401,13 @@ public class ConsumeQueue {
         return false;
     }
 
-
     private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
         ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
         byteBuffer.putLong(0L);
         byteBuffer.putInt(Integer.MAX_VALUE);
         byteBuffer.putLong(0L);
 
-        int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
+        int until = (int)(untilWhere % this.mappedFileQueue.getMappedFileSize());
         for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
             mappedFile.appendMessage(byteBuffer.array());
         }
@@ -433,7 +419,7 @@ public class ConsumeQueue {
         if (offset >= this.getMinLogicOffset()) {
             MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
             if (mappedFile != null) {
-                SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
+                SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int)(offset % mappedFileSize));
                 return result;
             }
         }
@@ -480,12 +466,10 @@ public class ConsumeQueue {
         return this.getMaxOffsetInQuque() - this.getMinOffsetInQuque();
     }
 
-
     public long getMaxOffsetInQuque() {
         return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
     }
 
-
     public void checkSelf() {
         mappedFileQueue.checkSelf();
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java
index ac149f4..4ebcb3e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java
@@ -6,19 +6,18 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store;
 
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 
-
 public class DefaultMessageFilter implements MessageFilter {
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 8714055..7e3af19 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -16,7 +16,27 @@
  */
 package org.apache.rocketmq.store;
 
-import org.apache.rocketmq.common.*;
+import java.io.File;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.SystemClock;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -34,21 +54,8 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 import static org.apache.rocketmq.store.config.BrokerRole.SLAVE;
 
-
 public class DefaultMessageStore implements MessageStore {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
@@ -84,7 +91,7 @@ public class DefaultMessageStore implements MessageStore {
     private final SystemClock systemClock = new SystemClock();
 
     private final ScheduledExecutorService scheduledExecutorService =
-            Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
+        Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
     private final BrokerStatsManager brokerStatsManager;
     private final MessageArrivingListener messageArrivingListener;
     private final BrokerConfig brokerConfig;
@@ -96,7 +103,7 @@ public class DefaultMessageStore implements MessageStore {
     private AtomicLong printTimes = new AtomicLong(0);
 
     public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
-                               final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
+        final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
         this.messageArrivingListener = messageArrivingListener;
         this.brokerConfig = brokerConfig;
         this.messageStoreConfig = messageStoreConfig;
@@ -122,13 +129,11 @@ public class DefaultMessageStore implements MessageStore {
             this.transientStorePool.init();
         }
 
-
         this.allocateMappedFileService.start();
 
         this.indexService.start();
     }
 
-
     public void truncateDirtyLogicFiles(long phyOffset) {
         ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
 
@@ -139,7 +144,6 @@ public class DefaultMessageStore implements MessageStore {
         }
     }
 
-
     /**
      * @throws IOException
      */
@@ -162,11 +166,10 @@ public class DefaultMessageStore implements MessageStore {
 
             if (result) {
                 this.storeCheckpoint =
-                        new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
+                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
 
                 this.indexService.load(lastExitOK);
 
-
                 this.recover(lastExitOK);
 
                 log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
@@ -191,7 +194,6 @@ public class DefaultMessageStore implements MessageStore {
         this.commitLog.start();
         this.storeStatsService.start();
 
-
         if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
             this.scheduleMessageService.start();
         }
@@ -293,19 +295,16 @@ public class DefaultMessageStore implements MessageStore {
             this.printTimes.set(0);
         }
 
-
         if (msg.getTopic().length() > Byte.MAX_VALUE) {
             log.warn("putMessage message topic length too long " + msg.getTopic().length());
             return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
         }
 
-
         if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
             log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
             return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
         }
 
-
         if (this.isOSPageCacheBusy()) {
             return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
         }
@@ -332,7 +331,7 @@ public class DefaultMessageStore implements MessageStore {
         long diff = this.systemClock.now() - begin;
 
         if (diff < 10000000 //
-                && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) {
+            && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) {
             return true;
         }
 
@@ -353,7 +352,7 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
-                                       final SubscriptionData subscriptionData) {
+        final SubscriptionData subscriptionData) {
         if (this.shutdown) {
             log.warn("message store has shutdown, so getMessage is forbidden");
             return null;
@@ -366,7 +365,6 @@ public class DefaultMessageStore implements MessageStore {
 
         long beginTime = this.getSystemClock().now();
 
-
         GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
         long nextBeginOffset = offset;
         long minOffset = 0;
@@ -374,7 +372,6 @@ public class DefaultMessageStore implements MessageStore {
 
         GetMessageResult getResult = new GetMessageResult();
 
-
         final long maxOffsetPy = this.commitLog.getMaxOffset();
 
         ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
@@ -417,21 +414,18 @@ public class DefaultMessageStore implements MessageStore {
 
                             maxPhyOffsetPulling = offsetPy;
 
-
                             if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                 if (offsetPy < nextPhyFileStartOffset)
                                     continue;
                             }
 
-
                             boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
 
                             if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
-                                    isInDisk)) {
+                                isInDisk)) {
                                 break;
                             }
 
-
                             if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
                                 SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                                 if (selectResult != null) {
@@ -444,7 +438,6 @@ public class DefaultMessageStore implements MessageStore {
                                         status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                     }
 
-
                                     nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                                 }
                             } else {
@@ -458,7 +451,6 @@ public class DefaultMessageStore implements MessageStore {
                             }
                         }
 
-
                         if (diskFallRecorded) {
                             long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
                             brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
@@ -466,10 +458,9 @@ public class DefaultMessageStore implements MessageStore {
 
                         nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
 
-
                         long diff = maxOffsetPy - maxPhyOffsetPulling;
-                        long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
-                                * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
+                        long memory = (long)(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
+                            * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
                         getResult.setSuggestPullingFromSlave(diff > memory);
                     } finally {
 
@@ -479,7 +470,7 @@ public class DefaultMessageStore implements MessageStore {
                     status = GetMessageStatus.OFFSET_FOUND_NULL;
                     nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
                     log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
-                            + maxOffset + ", but access logic queue failed.");
+                        + maxOffset + ", but access logic queue failed.");
                 }
             }
         } else {
@@ -605,7 +596,6 @@ public class DefaultMessageStore implements MessageStore {
 
         }
 
-
         {
 
             String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
@@ -613,7 +603,6 @@ public class DefaultMessageStore implements MessageStore {
             result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
         }
 
-
         {
             if (this.scheduleMessageService != null) {
                 this.scheduleMessageService.buildRunningStats(result);
@@ -741,7 +730,6 @@ public class DefaultMessageStore implements MessageStore {
                 break;
             }
 
-
             Collections.sort(queryOffsetResult.getPhyOffsets());
 
             queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
@@ -784,12 +772,10 @@ public class DefaultMessageStore implements MessageStore {
                 }
             }
 
-
             if (queryMessageResult.getBufferTotalSize() > 0) {
                 break;
             }
 
-
             if (lastQueryMsgTime < begin) {
                 break;
             }
@@ -825,8 +811,8 @@ public class DefaultMessageStore implements MessageStore {
                 for (ConsumeQueue cq : queueTable.values()) {
                     cq.destroy();
                     log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", //
-                            cq.getTopic(), //
-                            cq.getQueueId() //
+                        cq.getTopic(), //
+                        cq.getQueueId() //
                     );
 
                     this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId());
@@ -854,23 +840,22 @@ public class DefaultMessageStore implements MessageStore {
                     Entry<Integer, ConsumeQueue> nextQT = itQT.next();
                     long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset();
 
-
                     if (maxCLOffsetInConsumeQueue == -1) {
                         log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", //
-                                nextQT.getValue().getTopic(), //
-                                nextQT.getValue().getQueueId(), //
-                                nextQT.getValue().getMaxPhysicOffset(), //
-                                nextQT.getValue().getMinLogicOffset());
+                            nextQT.getValue().getTopic(), //
+                            nextQT.getValue().getQueueId(), //
+                            nextQT.getValue().getMaxPhysicOffset(), //
+                            nextQT.getValue().getMinLogicOffset());
                     } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) {
                         log.info(
-                                "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", //
-                                topic, //
-                                nextQT.getKey(), //
-                                minCommitLogOffset, //
-                                maxCLOffsetInConsumeQueue);
+                            "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", //
+                            topic, //
+                            nextQT.getKey(), //
+                            minCommitLogOffset, //
+                            maxCLOffsetInConsumeQueue);
 
                         DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(),
-                                nextQT.getValue().getQueueId());
+                            nextQT.getValue().getQueueId());
 
                         nextQT.getValue().destroy();
                         itQT.remove();
@@ -910,7 +895,7 @@ public class DefaultMessageStore implements MessageStore {
                             long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                             final ByteBuffer msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
                             String msgId =
-                                    MessageDecoder.createMessageId(msgIdMemory, MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy);
+                                MessageDecoder.createMessageId(msgIdMemory, MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy);
                             messageIds.put(msgId, nextOffset++);
                             if (nextOffset > maxOffset) {
                                 return messageIds;
@@ -1006,11 +991,11 @@ public class DefaultMessageStore implements MessageStore {
         ConsumeQueue logic = map.get(queueId);
         if (null == logic) {
             ConsumeQueue newLogic = new ConsumeQueue(//
-                    topic, //
-                    queueId, //
-                    StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
-                    this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
-                    this);
+                topic, //
+                queueId, //
+                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
+                this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
+                this);
             ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
             if (oldLogic != null) {
                 logic = oldLogic;
@@ -1031,7 +1016,7 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) {
-        long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
+        long memory = (long)(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
         return (maxOffsetPy - offsetPy) > memory;
     }
 
@@ -1045,7 +1030,6 @@ public class DefaultMessageStore implements MessageStore {
             return true;
         }
 
-
         if (isInDisk) {
             if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
                 return true;
@@ -1093,7 +1077,6 @@ public class DefaultMessageStore implements MessageStore {
             }
         }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
 
-
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
@@ -1101,7 +1084,6 @@ public class DefaultMessageStore implements MessageStore {
             }
         }, 1, 10, TimeUnit.MINUTES);
 
-
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
@@ -1113,7 +1095,7 @@ public class DefaultMessageStore implements MessageStore {
 
                                 String stack = UtilAll.jstack();
                                 final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-"
-                                        + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
+                                    + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
                                 MixAll.string2FileNotSafe(stack, fileName);
                             }
                         }
@@ -1174,11 +1156,11 @@ public class DefaultMessageStore implements MessageStore {
                             continue;
                         }
                         ConsumeQueue logic = new ConsumeQueue(
-                                topic,
-                                queueId,
-                                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
-                                this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
-                                this);
+                            topic,
+                            queueId,
+                            StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
+                            this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
+                            this);
                         this.putConsumeQueue(topic, queueId, logic);
                         if (!logic.load()) {
                             return false;
@@ -1196,7 +1178,6 @@ public class DefaultMessageStore implements MessageStore {
     private void recover(final boolean lastExitOK) {
         this.recoverConsumeQueue();
 
-
         if (lastExitOK) {
             this.commitLog.recoverNormally();
         } else {
@@ -1285,7 +1266,7 @@ public class DefaultMessageStore implements MessageStore {
             case MessageSysFlag.TRANSACTION_NOT_TYPE:
             case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                 DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
-                        req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
+                    req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
                 break;
             case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
             case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
@@ -1298,7 +1279,7 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,
-                                       long logicOffset) {
+        long logicOffset) {
         ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
         cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
     }
@@ -1311,23 +1292,21 @@ public class DefaultMessageStore implements MessageStore {
 
         private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;
         private final double diskSpaceWarningLevelRatio =
-                Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
+            Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
 
         private final double diskSpaceCleanForciblyRatio =
-                Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
+            Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
         private long lastRedeleteTimestamp = 0;
 
         private volatile int manualDeleteFileSeveralTimes = 0;
 
         private volatile boolean cleanImmediately = false;
 
-
         public void excuteDeleteFilesManualy() {
             this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
             DefaultMessageStore.log.info("excuteDeleteFilesManualy was invoked");
         }
 
-
         public void run() {
             try {
                 this.deleteExpiredFiles();
@@ -1348,27 +1327,24 @@ public class DefaultMessageStore implements MessageStore {
             boolean spacefull = this.isSpaceToDelete();
             boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
 
-
             if (timeup || spacefull || manualDelete) {
 
                 if (manualDelete)
                     this.manualDeleteFileSeveralTimes--;
 
-
                 boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
 
                 log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", //
-                        fileReservedTime, //
-                        timeup, //
-                        spacefull, //
-                        manualDeleteFileSeveralTimes, //
-                        cleanAtOnce);
-
+                    fileReservedTime, //
+                    timeup, //
+                    spacefull, //
+                    manualDeleteFileSeveralTimes, //
+                    cleanAtOnce);
 
                 fileReservedTime *= 60 * 60 * 1000;
 
                 deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
-                        destroyMapedFileIntervalForcibly, cleanAtOnce);
+                    destroyMapedFileIntervalForcibly, cleanAtOnce);
                 if (deleteCount > 0) {
                 } else if (spacefull) {
                     log.warn("disk space will be full soon, but delete file failed.");
@@ -1382,7 +1358,7 @@ public class DefaultMessageStore implements MessageStore {
             if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {
                 this.lastRedeleteTimestamp = currentTimestamp;
                 int destroyMapedFileIntervalForcibly =
-                        DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
+                    DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
                 if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {
                 }
             }
@@ -1407,7 +1383,6 @@ public class DefaultMessageStore implements MessageStore {
 
             cleanImmediately = false;
 
-
             {
                 String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
                 double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
@@ -1433,10 +1408,9 @@ public class DefaultMessageStore implements MessageStore {
                 }
             }
 
-
             {
                 String storePathLogics = StorePathConfigHelper
-                        .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
+                    .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
                 double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
                 if (logicsRatio > diskSpaceWarningLevelRatio) {
                     boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
@@ -1490,7 +1464,6 @@ public class DefaultMessageStore implements MessageStore {
             if (minOffset > this.lastPhysicalMinOffset) {
                 this.lastPhysicalMinOffset = minOffset;
 
-
                 ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
 
                 for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
@@ -1506,7 +1479,6 @@ public class DefaultMessageStore implements MessageStore {
                     }
                 }
 
-
                 DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
             }
         }
@@ -1520,7 +1492,6 @@ public class DefaultMessageStore implements MessageStore {
         private static final int RETRY_TIMES_OVER = 3;
         private long lastFlushTimestamp = 0;
 
-
         private void doFlush(int retryTimes) {
             int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
 
@@ -1530,7 +1501,6 @@ public class DefaultMessageStore implements MessageStore {
 
             long logicsMsgTimestamp = 0;
 
-
             int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
             long currentTimeMillis = System.currentTimeMillis();
             if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
@@ -1558,7 +1528,6 @@ public class DefaultMessageStore implements MessageStore {
             }
         }
 
-
         public void run() {
             DefaultMessageStore.log.info(this.getServiceName() + " service started");
 
@@ -1572,19 +1541,16 @@ public class DefaultMessageStore implements MessageStore {
                 }
             }
 
-
             this.doFlush(RETRY_TIMES_OVER);
 
             DefaultMessageStore.log.info(this.getServiceName() + " service end");
         }
 
-
         @Override
         public String getServiceName() {
             return FlushConsumeQueueService.class.getSimpleName();
         }
 
-
         @Override
         public long getJointime() {
             return 1000 * 60;
@@ -1610,7 +1576,7 @@ public class DefaultMessageStore implements MessageStore {
 
             if (this.isCommitLogAvailable()) {
                 log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
-                        DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
+                    DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
             }
 
             super.shutdown();
@@ -1624,17 +1590,15 @@ public class DefaultMessageStore implements MessageStore {
             return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
         }
 
-
         private boolean isCommitLogAvailable() {
             return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
         }
 
-
         private void doReput() {
             for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
 
                 if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
-                        && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
+                    && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                     break;
                 }
 
@@ -1645,7 +1609,7 @@ public class DefaultMessageStore implements MessageStore {
 
                         for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                             DispatchRequest dispatchRequest =
-                                    DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
+                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                             int size = dispatchRequest.getMsgSize();
 
                             if (dispatchRequest.isSuccess()) {
@@ -1653,20 +1617,20 @@ public class DefaultMessageStore implements MessageStore {
                                     DefaultMessageStore.this.doDispatch(dispatchRequest);
 
                                     if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
-                                            && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
+                                        && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                         DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
-                                                dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
-                                                dispatchRequest.getTagsCode());
+                                            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                            dispatchRequest.getTagsCode());
                                     }
                                     // FIXED BUG By shijia
                                     this.reputFromOffset += size;
                                     readSize += size;
                                     if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                         DefaultMessageStore.this.storeStatsService
-                                                .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
+                                            .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                         DefaultMessageStore.this.storeStatsService
-                                                .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
-                                                .addAndGet(dispatchRequest.getMsgSize());
+                                            .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                            .addAndGet(dispatchRequest.getMsgSize());
                                     }
                                 } else if (size == 0) {
                                     this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
@@ -1674,7 +1638,6 @@ public class DefaultMessageStore implements MessageStore {
                                 }
                             } else if (!dispatchRequest.isSuccess()) {
 
-
                                 if (size > 0) {
                                     log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                                     this.reputFromOffset += size;
@@ -1682,7 +1645,7 @@ public class DefaultMessageStore implements MessageStore {
                                     doNext = false;
                                     if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                         log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
-                                                this.reputFromOffset);
+                                            this.reputFromOffset);
 
                                         this.reputFromOffset += result.getSize() - readSize;
                                     }
@@ -1698,7 +1661,6 @@ public class DefaultMessageStore implements MessageStore {
             }
         }
 
-
         @Override
         public void run() {
             DefaultMessageStore.log.info(this.getServiceName() + " service started");
@@ -1715,13 +1677,11 @@ public class DefaultMessageStore implements MessageStore {
             DefaultMessageStore.log.info(this.getServiceName() + " service end");
         }
 
-
         @Override
         public String getServiceName() {
             return ReputMessageService.class.getSimpleName();
         }
 
-
     }
 
     public int remainTransientStoreBufferNumbs() {
@@ -1733,7 +1693,6 @@ public class DefaultMessageStore implements MessageStore {
         return remainTransientStoreBufferNumbs() == 0;
     }
 
-
     public void unlockMappedFile(final MappedFile mappedFile) {
         this.scheduledExecutorService.schedule(new Runnable() {
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
index d0855ab..b086aee 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
@@ -31,19 +31,18 @@ public class DispatchRequest {
     private final int sysFlag;
     private final long preparedTransactionOffset;
 
-
     public DispatchRequest(
-            final String topic,
-            final int queueId,
-            final long commitLogOffset,
-            final int msgSize,
-            final long tagsCode,
-            final long storeTimestamp,
-            final long consumeQueueOffset,
-            final String keys,
-            final String uniqKey,
-            final int sysFlag,
-            final long preparedTransactionOffset
+        final String topic,
+        final int queueId,
+        final long commitLogOffset,
+        final int msgSize,
+        final long tagsCode,
+        final long storeTimestamp,
+        final long consumeQueueOffset,
+        final String keys,
+        final String uniqKey,
+        final int sysFlag,
+        final long preparedTransactionOffset
     ) {
         this.topic = topic;
         this.queueId = queueId;
@@ -108,57 +107,46 @@ public class DispatchRequest {
         this.success = success;
     }
 
-
     public String getTopic() {
         return topic;
     }
 
-
     public int getQueueId() {
         return queueId;
     }
 
-
     public long getCommitLogOffset() {
         return commitLogOffset;
     }
 
-
     public int getMsgSize() {
         return msgSize;
     }
 
-
     public long getStoreTimestamp() {
         return storeTimestamp;
     }
 
-
     public long getConsumeQueueOffset() {
         return consumeQueueOffset;
     }
 
-
     public String getKeys() {
         return keys;
     }
 
-
     public long getTagsCode() {
         return tagsCode;
     }
 
-
     public int getSysFlag() {
         return sysFlag;
     }
 
-
     public long getPreparedTransactionOffset() {
         return preparedTransactionOffset;
     }
 
-
     public boolean isSuccess() {
         return success;
     }
@@ -167,5 +155,4 @@ public class DispatchRequest {
         return uniqKey;
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
index 0f1ba8c..b7d33f3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
@@ -6,27 +6,25 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store;
 
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
 public class GetMessageResult {
 
     private final List<SelectMappedBufferResult> messageMapedList =
-            new ArrayList<SelectMappedBufferResult>(100);
+        new ArrayList<SelectMappedBufferResult>(100);
 
     private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);
 
@@ -41,97 +39,79 @@ public class GetMessageResult {
 
     private int msgCount4Commercial = 0;
 
-
     public GetMessageResult() {
     }
 
-
     public GetMessageStatus getStatus() {
         return status;
     }
 
-
     public void setStatus(GetMessageStatus status) {
         this.status = status;
     }
 
-
     public long getNextBeginOffset() {
         return nextBeginOffset;
     }
 
-
     public void setNextBeginOffset(long nextBeginOffset) {
         this.nextBeginOffset = nextBeginOffset;
     }
 
-
     public long getMinOffset() {
         return minOffset;
     }
 
-
     public void setMinOffset(long minOffset) {
         this.minOffset = minOffset;
     }
 
-
     public long getMaxOffset() {
         return maxOffset;
     }
 
-
     public void setMaxOffset(long maxOffset) {
         this.maxOffset = maxOffset;
     }
 
-
     public List<SelectMappedBufferResult> getMessageMapedList() {
         return messageMapedList;
     }
 
-
     public List<ByteBuffer> getMessageBufferList() {
         return messageBufferList;
     }
 
-
     public void addMessage(final SelectMappedBufferResult mapedBuffer) {
         this.messageMapedList.add(mapedBuffer);
         this.messageBufferList.add(mapedBuffer.getByteBuffer());
         this.bufferTotalSize += mapedBuffer.getSize();
-        this.msgCount4Commercial += (int) Math.ceil(
-                mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT);
+        this.msgCount4Commercial += (int)Math.ceil(
+            mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT);
     }
 
-
     public void release() {
         for (SelectMappedBufferResult select : this.messageMapedList) {
             select.release();
         }
     }
 
-
     public int getBufferTotalSize() {
         return bufferTotalSize;
     }
 
-
     public void setBufferTotalSize(int bufferTotalSize) {
         this.bufferTotalSize = bufferTotalSize;
     }
 
-
     public int getMessageCount() {
         return this.messageMapedList.size();
     }
 
-
     public boolean isSuggestPullingFromSlave() {
         return suggestPullingFromSlave;
     }
 
-
     public void setSuggestPullingFromSlave(boolean suggestPullingFromSlave) {
         this.suggestPullingFromSlave = suggestPullingFromSlave;
     }
@@ -144,12 +124,11 @@ public class GetMessageResult {
         this.msgCount4Commercial = msgCount4Commercial;
     }
 
-
     @Override
     public String toString() {
         return "GetMessageResult [status=" + status + ", nextBeginOffset=" + nextBeginOffset + ", minOffset="
-                + minOffset + ", maxOffset=" + maxOffset + ", bufferTotalSize=" + bufferTotalSize
-                + ", suggestPullingFromSlave=" + suggestPullingFromSlave + "]";
+            + minOffset + ", maxOffset=" + maxOffset + ", bufferTotalSize=" + bufferTotalSize
+            + ", suggestPullingFromSlave=" + suggestPullingFromSlave + "]";
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java b/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java
index 003d1d4..f512e12 100644
--- a/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java
+++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store;
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index ce5f570..6803ec9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -16,16 +16,8 @@
  */
 package org.apache.rocketmq.store;
 
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.store.config.FlushDiskType;
-import org.apache.rocketmq.store.util.LibC;
 import com.sun.jna.NativeLong;
 import com.sun.jna.Pointer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import sun.nio.ch.DirectBuffer;
-
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -39,7 +31,13 @@ import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.util.LibC;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.nio.ch.DirectBuffer;
 
 public class MappedFile extends ReferenceResource {
     public static final int OS_PAGE_SIZE = 1024 * 4;
@@ -48,34 +46,23 @@ public class MappedFile extends ReferenceResource {
     private static final AtomicLong TOTAL_MAPED_VITUAL_MEMORY = new AtomicLong(0);
 
     private static final AtomicInteger TOTAL_MAPED_FILES = new AtomicInteger(0);
-
-    private String fileName;
-
-    private long fileFromOffset;
-
-    protected int fileSize;
-
-    private File file;
-
-    private MappedByteBuffer mappedByteBuffer;
-
     protected final AtomicInteger wrotePosition = new AtomicInteger(0);
-
-    private final AtomicInteger flushedPosition = new AtomicInteger(0);
     //ADD BY ChenYang
     protected final AtomicInteger committedPosition = new AtomicInteger(0);
-
-
+    private final AtomicInteger flushedPosition = new AtomicInteger(0);
+    protected int fileSize;
     protected FileChannel fileChannel;
-
-    private volatile long storeTimestamp = 0;
-    private boolean firstCreateInQueue = false;
-
     /**
      * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
      */
     protected ByteBuffer writeBuffer = null;
     protected TransientStorePool transientStorePool = null;
+    private String fileName;
+    private long fileFromOffset;
+    private File file;
+    private MappedByteBuffer mappedByteBuffer;
+    private volatile long storeTimestamp = 0;
+    private boolean firstCreateInQueue = false;
 
     public MappedFile() {
     }
@@ -88,41 +75,6 @@ public class MappedFile extends ReferenceResource {
         init(fileName, fileSize, transientStorePool);
     }
 
-    public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException {
-        init(fileName, fileSize);
-        this.writeBuffer = transientStorePool.borrowBuffer();
-        this.transientStorePool = transientStorePool;
-    }
-
-    private void init(final String fileName, final int fileSize) throws IOException {
-        this.fileName = fileName;
-        this.fileSize = fileSize;
-        this.file = new File(fileName);
-        this.fileFromOffset = Long.parseLong(this.file.getName());
-        boolean ok = false;
-
-        ensureDirOK(this.file.getParent());
-
-        try {
-            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
-            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
-            TOTAL_MAPED_VITUAL_MEMORY.addAndGet(fileSize);
-            TOTAL_MAPED_FILES.incrementAndGet();
-            ok = true;
-        } catch (FileNotFoundException e) {
-            log.error("create file channel " + this.fileName + " Failed. ", e);
-            throw e;
-        } catch (IOException e) {
-            log.error("map file " + this.fileName + " Failed. ", e);
-            throw e;
-        } finally {
-            if (!ok && this.fileChannel != null) {
-                this.fileChannel.close();
-            }
-        }
-    }
-
-
     public static void ensureDirOK(final String dirName) {
         if (dirName != null) {
             File f = new File(dirName);
@@ -133,14 +85,12 @@ public class MappedFile extends ReferenceResource {
         }
     }
 
-
     public static void clean(final ByteBuffer buffer) {
         if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0)
             return;
         invoke(invoke(viewed(buffer), "cleaner"), "clean");
     }
 
-
     private static Object invoke(final Object target, final String methodName, final Class<?>... args) {
         return AccessController.doPrivileged(new PrivilegedAction<Object>() {
             public Object run() {
@@ -155,9 +105,8 @@ public class MappedFile extends ReferenceResource {
         });
     }
 
-
     private static Method method(Object target, String methodName, Class<?>[] args)
-            throws NoSuchMethodException {
+        throws NoSuchMethodException {
         try {
             return target.getClass().getMethod(methodName, args);
         } catch (NoSuchMethodException e) {
@@ -165,11 +114,9 @@ public class MappedFile extends ReferenceResource {
         }
     }
 
-
     private static ByteBuffer viewed(ByteBuffer buffer) {
         String methodName = "viewedBuffer";
 
-
         Method[] methods = buffer.getClass().getMethods();
         for (int i = 0; i < methods.length; i++) {
             if (methods[i].getName().equals("attachment")) {
@@ -178,23 +125,54 @@ public class MappedFile extends ReferenceResource {
             }
         }
 
-        ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName);
+        ByteBuffer viewedBuffer = (ByteBuffer)invoke(buffer, methodName);
         if (viewedBuffer == null)
             return buffer;
         else
             return viewed(viewedBuffer);
     }
 
-
     public static int getTotalmapedfiles() {
         return TOTAL_MAPED_FILES.get();
     }
 
-
     public static long getTotalMapedVitualMemory() {
         return TOTAL_MAPED_VITUAL_MEMORY.get();
     }
 
+    public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException {
+        init(fileName, fileSize);
+        this.writeBuffer = transientStorePool.borrowBuffer();
+        this.transientStorePool = transientStorePool;
+    }
+
+    private void init(final String fileName, final int fileSize) throws IOException {
+        this.fileName = fileName;
+        this.fileSize = fileSize;
+        this.file = new File(fileName);
+        this.fileFromOffset = Long.parseLong(this.file.getName());
+        boolean ok = false;
+
+        ensureDirOK(this.file.getParent());
+
+        try {
+            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
+            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
+            TOTAL_MAPED_VITUAL_MEMORY.addAndGet(fileSize);
+            TOTAL_MAPED_FILES.incrementAndGet();
+            ok = true;
+        } catch (FileNotFoundException e) {
+            log.error("create file channel " + this.fileName + " Failed. ", e);
+            throw e;
+        } catch (IOException e) {
+            log.error("map file " + this.fileName + " Failed. ", e);
+            throw e;
+        } finally {
+            if (!ok && this.fileChannel != null) {
+                this.fileChannel.close();
+            }
+        }
+    }
 
     public long getLastModifiedTimestamp() {
         return this.file.lastModified();
@@ -214,20 +192,18 @@ public class MappedFile extends ReferenceResource {
 
         int currentPos = this.wrotePosition.get();
 
-
         if (currentPos < this.fileSize) {
             ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
             byteBuffer.position(currentPos);
             AppendMessageResult result =
-                    cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
+                cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
             this.wrotePosition.addAndGet(result.getWroteBytes());
             this.storeTimestamp = result.getStoreTimestamp();
             return result;
         }
 
-
         log.error("MappedFile.appendMessage return null, wrotePosition: " + currentPos + " fileSize: "
-                + this.fileSize);
+            + this.fileSize);
         return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
     }
 
@@ -246,7 +222,6 @@ public class MappedFile extends ReferenceResource {
     public boolean appendMessage(final byte[] data) {
         int currentPos = this.wrotePosition.get();
 
-
         if ((currentPos + data.length) <= this.fileSize) {
             try {
                 this.fileChannel.position(currentPos);
@@ -262,11 +237,7 @@ public class MappedFile extends ReferenceResource {
     }
 
     /**
-
-     *
      * @param flushLeastPages
-
-     *
      * @return The current flushed position
      */
     public int flush(final int flushLeastPages) {
@@ -370,12 +341,10 @@ public class MappedFile extends ReferenceResource {
         return flushedPosition.get();
     }
 
-
     public void setFlushedPosition(int pos) {
         this.flushedPosition.set(pos);
     }
 
-
     public boolean isFull() {
         return this.fileSize == this.wrotePosition.get();
     }
@@ -392,14 +361,13 @@ public class MappedFile extends ReferenceResource {
                 return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
             } else {
                 log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
-                        + this.fileFromOffset);
+                    + this.fileFromOffset);
             }
         } else {
             log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
-                    + ", fileFromOffset: " + this.fileFromOffset);
+                + ", fileFromOffset: " + this.fileFromOffset);
         }
 
-
         return null;
     }
 
@@ -419,7 +387,6 @@ public class MappedFile extends ReferenceResource {
             }
         }
 
-
         return null;
     }
 
@@ -427,13 +394,13 @@ public class MappedFile extends ReferenceResource {
     public boolean cleanup(final long currentRef) {
         if (this.isAvailable()) {
             log.error("this file[REF:" + currentRef + "] " + this.fileName
-                    + " have not shutdown, stop unmaping.");
+                + " have not shutdown, stop unmaping.");
             return false;
         }
 
         if (this.isCleanupOver()) {
             log.error("this file[REF:" + currentRef + "] " + this.fileName
-                    + " have cleanup, do not do it again.");
+                + " have cleanup, do not do it again.");
             return true;
         }
 
@@ -455,9 +422,9 @@ public class MappedFile extends ReferenceResource {
                 long beginTime = System.currentTimeMillis();
                 boolean result = this.file.delete();
                 log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
-                        + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
-                        + this.getFlushedPosition() + ", "
-                        + UtilAll.computeEclipseTimeMilliseconds(beginTime));
+                    + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+                    + this.getFlushedPosition() + ", "
+                    + UtilAll.computeEclipseTimeMilliseconds(beginTime));
             } catch (Exception e) {
                 log.warn("close file channel " + this.fileName + " Failed. ", e);
             }
@@ -465,7 +432,7 @@ public class MappedFile extends ReferenceResource {
             return true;
         } else {
             log.warn("destroy maped file[REF:" + this.getRefCount() + "] " + this.fileName
-                    + " Failed. cleanupOver: " + this.cleanupOver);
+                + " Failed. cleanupOver: " + this.cleanupOver);
         }
 
         return false;
@@ -475,18 +442,17 @@ public class MappedFile extends ReferenceResource {
         return wrotePosition.get();
     }
 
+    public void setWrotePosition(int pos) {
+        this.wrotePosition.set(pos);
+    }
+
     /**
-     *
      * @return The max position which have valid data
      */
     public int getReadPosition() {
         return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
     }
 
-    public void setWrotePosition(int pos) {
-        this.wrotePosition.set(pos);
-    }
-
     public void setCommittedPosition(int pos) {
         this.committedPosition.set(pos);
     }
@@ -497,7 +463,7 @@ public class MappedFile extends ReferenceResource {
         int flush = 0;
         long time = System.currentTimeMillis();
         for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
-            byteBuffer.put(i, (byte) 0);
+            byteBuffer.put(i, (byte)0);
             // force flush when flush disk type is sync
             if (type == FlushDiskType.SYNC_FLUSH) {
                 if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
@@ -521,11 +487,11 @@ public class MappedFile extends ReferenceResource {
         // force flush when prepare load finished
         if (type == FlushDiskType.SYNC_FLUSH) {
             log.info("mapped file worm up done, force to disk, mappedFile={}, costTime={}",
-                    this.getFileName(), System.currentTimeMillis() - beginTime);
+                this.getFileName(), System.currentTimeMillis() - beginTime);
             mappedByteBuffer.force();
         }
         log.info("mapped file worm up done. mappedFile={}, costTime={}", this.getFileName(),
-                System.currentTimeMillis() - beginTime);
+            System.currentTimeMillis() - beginTime);
 
         this.mlock();
     }
@@ -542,25 +508,21 @@ public class MappedFile extends ReferenceResource {
         return this.mappedByteBuffer.slice();
     }
 
-
     public long getStoreTimestamp() {
         return storeTimestamp;
     }
 
-
     public boolean isFirstCreateInQueue() {
         return firstCreateInQueue;
     }
 
-
     public void setFirstCreateInQueue(boolean firstCreateInQueue) {
         this.firstCreateInQueue = firstCreateInQueue;
     }
 
-
     public void mlock() {
         final long beginTime = System.currentTimeMillis();
-        final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
+        final long address = ((DirectBuffer)(this.mappedByteBuffer)).address();
         Pointer pointer = new Pointer(address);
         {
             int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
@@ -575,7 +537,7 @@ public class MappedFile extends ReferenceResource {
 
     public void munlock() {
         final long beginTime = System.currentTimeMillis();
-        final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
+        final long address = ((DirectBuffer)(this.mappedByteBuffer)).address();
         Pointer pointer = new Pointer(address);
         int ret = LibC.INSTANCE.munlock(pointer, new NativeLong(this.fileSize));
         log.info("munlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index a208a07..49455c6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -16,17 +16,19 @@
  */
 package org.apache.rocketmq.store;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-
 public class MappedFileQueue {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
     private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
@@ -46,15 +48,13 @@ public class MappedFileQueue {
 
     private volatile long storeTimestamp = 0;
 
-
     public MappedFileQueue(final String storePath, int mappedFileSize,
-                           AllocateMappedFileService allocateMappedFileService) {
+        AllocateMappedFileService allocateMappedFileService) {
         this.storePath = storePath;
         this.mappedFileSize = mappedFileSize;
         this.allocateMappedFileService = allocateMappedFileService;
     }
 
-
     public void checkSelf() {
 
         if (!this.mappedFiles.isEmpty()) {
@@ -66,7 +66,7 @@ public class MappedFileQueue {
                 if (pre != null) {
                     if (cur.getFileFromOffset() - pre.getFileFromOffset() != this.mappedFileSize) {
                         LOG_ERROR.error("[BUG]The mappedFile queue's data is damaged, the adjacent mappedFile's offset don't match. pre file {}, cur file {}",
-                                pre.getFileName(), cur.getFileName());
+                            pre.getFileName(), cur.getFileName());
                     }
                 }
                 pre = cur;
@@ -74,7 +74,6 @@ public class MappedFileQueue {
         }
     }
 
-
     public MappedFile getMappedFileByTime(final long timestamp) {
         Object[] mfs = this.copyMappedFiles(0);
 
@@ -82,16 +81,15 @@ public class MappedFileQueue {
             return null;
 
         for (int i = 0; i < mfs.length; i++) {
-            MappedFile mappedFile = (MappedFile) mfs[i];
+            MappedFile mappedFile = (MappedFile)mfs[i];
             if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
                 return mappedFile;
             }
         }
 
-        return (MappedFile) mfs[mfs.length - 1];
+        return (MappedFile)mfs[mfs.length - 1];
     }
 
-
     private Object[] copyMappedFiles(final int reservedMappedFiles) {
         Object[] mfs;
 
@@ -103,7 +101,6 @@ public class MappedFileQueue {
         return mfs;
     }
 
-
     public void truncateDirtyFiles(long offset) {
         List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
 
@@ -111,9 +108,9 @@ public class MappedFileQueue {
             long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
             if (fileTailOffset > offset) {
                 if (offset >= file.getFileFromOffset()) {
-                    file.setWrotePosition((int) (offset % this.mappedFileSize));
-                    file.setCommittedPosition((int) (offset % this.mappedFileSize));
-                    file.setFlushedPosition((int) (offset % this.mappedFileSize));
+                    file.setWrotePosition((int)(offset % this.mappedFileSize));
+                    file.setCommittedPosition((int)(offset % this.mappedFileSize));
+                    file.setFlushedPosition((int)(offset % this.mappedFileSize));
                 } else {
                     file.destroy(1000);
                     willRemoveFiles.add(file);
@@ -124,7 +121,6 @@ public class MappedFileQueue {
         this.deleteExpiredFile(willRemoveFiles);
     }
 
-
     private void deleteExpiredFile(List<MappedFile> files) {
 
         if (!files.isEmpty()) {
@@ -148,7 +144,6 @@ public class MappedFileQueue {
         }
     }
 
-
     public boolean load() {
         File dir = new File(this.storePath);
         File[] files = dir.listFiles();
@@ -159,11 +154,10 @@ public class MappedFileQueue {
 
                 if (file.length() != this.mappedFileSize) {
                     log.warn(file + "\t" + file.length()
-                            + " length not matched message store config value, ignore it");
+                        + " length not matched message store config value, ignore it");
                     return true;
                 }
 
-
                 try {
                     MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
 
@@ -182,7 +176,6 @@ public class MappedFileQueue {
         return true;
     }
 
-
     public long howMuchFallBehind() {
         if (this.mappedFiles.isEmpty())
             return 0;
@@ -198,7 +191,6 @@ public class MappedFileQueue {
         return 0;
     }
 
-
     public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
         long createOffset = -1;
         MappedFile mappedFileLast = getLastMappedFile();
@@ -214,12 +206,12 @@ public class MappedFileQueue {
         if (createOffset != -1 && needCreate) {
             String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
             String nextNextFilePath = this.storePath + File.separator
-                    + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
+                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
             MappedFile mappedFile = null;
 
             if (this.allocateMappedFileService != null) {
                 mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
-                        nextNextFilePath, this.mappedFileSize);
+                    nextNextFilePath, this.mappedFileSize);
             } else {
                 try {
                     mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
@@ -268,11 +260,12 @@ public class MappedFileQueue {
 
         if (mappedFileLast != null) {
             long lastOffset = mappedFileLast.getFileFromOffset() +
-                    mappedFileLast.getWrotePosition();
+                mappedFileLast.getWrotePosition();
             long diff = lastOffset - offset;
 
             final int maxDiff = this.mappedFileSize * 2;
-            if (diff > maxDiff) return false;
+            if (diff > maxDiff)
+                return false;
         }
 
         ListIterator<MappedFile> iterator = this.mappedFiles.listIterator();
@@ -280,7 +273,7 @@ public class MappedFileQueue {
         while (iterator.hasPrevious()) {
             mappedFileLast = iterator.previous();
             if (offset >= mappedFileLast.getFileFromOffset()) {
-                int where = (int) (offset % mappedFileLast.getFileSize());
+                int where = (int)(offset % mappedFileLast.getFileSize());
                 mappedFileLast.setFlushedPosition(where);
                 mappedFileLast.setWrotePosition(where);
                 mappedFileLast.setCommittedPosition(where);
@@ -306,7 +299,6 @@ public class MappedFileQueue {
         return -1;
     }
 
-
     public long getMaxOffset() {
         MappedFile mappedFile = getLastMappedFile();
         if (mappedFile != null) {
@@ -342,9 +334,9 @@ public class MappedFileQueue {
     }
 
     public int deleteExpiredFileByTime(final long expiredTime,
-                                       final int deleteFilesInterval,
-                                       final long intervalForcibly,
-                                       final boolean cleanImmediately) {
+        final int deleteFilesInterval,
+        final long intervalForcibly,
+        final boolean cleanImmediately) {
         Object[] mfs = this.copyMappedFiles(0);
 
         if (null == mfs)
@@ -355,7 +347,7 @@ public class MappedFileQueue {
         List<MappedFile> files = new ArrayList<MappedFile>();
         if (null != mfs) {
             for (int i = 0; i < mfsLength; i++) {
-                MappedFile mappedFile = (MappedFile) mfs[i];
+                MappedFile mappedFile = (MappedFile)mfs[i];
                 long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
                 if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                     if (mappedFile.destroy(intervalForcibly)) {
@@ -384,7 +376,6 @@ public class MappedFileQueue {
         return deleteCount;
     }
 
-
     public int deleteExpiredFileByOffset(long offset, int unitSize) {
         Object[] mfs = this.copyMappedFiles(0);
 
@@ -396,7 +387,7 @@ public class MappedFileQueue {
 
             for (int i = 0; i < mfsLength; i++) {
                 boolean destroy;
-                MappedFile mappedFile = (MappedFile) mfs[i];
+                MappedFile mappedFile = (MappedFile)mfs[i];
                 SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
                 if (result != null) {
                     long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
@@ -404,7 +395,7 @@ public class MappedFileQueue {
                     destroy = maxOffsetInLogicQueue < offset;
                     if (destroy) {
                         log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
-                                + maxOffsetInLogicQueue + ", delete it");
+                            + maxOffsetInLogicQueue + ", delete it");
                     }
                 } else {
                     log.warn("this being not executed forever.");
@@ -425,7 +416,6 @@ public class MappedFileQueue {
         return deleteCount;
     }
 
-
     public boolean flush(final int flushLeastPages) {
         boolean result = true;
         MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, false);
@@ -467,10 +457,10 @@ public class MappedFileQueue {
         try {
             MappedFile mappedFile = this.getFirstMappedFile();
             if (mappedFile != null) {
-                int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
+                int index = (int)((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
                 if (index < 0 || index >= this.mappedFiles.size()) {
                     LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " +
-                        "mappedFileSize: {}, mappedFiles count: {}",
+                            "mappedFileSize: {}, mappedFiles count: {}",
                         mappedFile,
                         offset,
                         index,
@@ -494,7 +484,6 @@ public class MappedFileQueue {
         return null;
     }
 
-
     public MappedFile getFirstMappedFile() {
         MappedFile mappedFileFirst = null;
 
@@ -515,14 +504,13 @@ public class MappedFileQueue {
         return findMappedFileByOffset(offset, false);
     }
 
-
     public long getMappedMemorySize() {
         long size = 0;
 
         Object[] mfs = this.copyMappedFiles(0);
         if (mfs != null) {
             for (Object mf : mfs) {
-                if (((ReferenceResource) mf).isAvailable()) {
+                if (((ReferenceResource)mf).isAvailable()) {
                     size += this.mappedFileSize;
                 }
             }
@@ -531,7 +519,6 @@ public class MappedFileQueue {
         return size;
     }
 
-
     public boolean retryDeleteFirstFile(final long intervalForcibly) {
         MappedFile mappedFile = this.getFirstMappedFile();
         if (mappedFile != null) {
@@ -554,14 +541,12 @@ public class MappedFileQueue {
         return false;
     }
 
-
     public void shutdown(final long intervalForcibly) {
         for (MappedFile mf : this.mappedFiles) {
             mf.shutdown(intervalForcibly);
         }
     }
 
-
     public void destroy() {
         for (MappedFile mf : this.mappedFiles) {
             mf.destroy(1000 * 3);
@@ -576,27 +561,22 @@ public class MappedFileQueue {
         }
     }
 
-
     public long getFlushedWhere() {
         return flushedWhere;
     }
 
-
     public void setFlushedWhere(long flushedWhere) {
         this.flushedWhere = flushedWhere;
     }
 
-
     public long getStoreTimestamp() {
         return storeTimestamp;
     }
 
-
     public List<MappedFile> getMappedFiles() {
         return mappedFiles;
     }
 
-
     public int getMappedFileSize() {
         return mappedFileSize;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
index 25304b9..ebc57a7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.store;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
index dabb418..4cbdacf 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
@@ -6,20 +6,19 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store;
 
 import org.apache.rocketmq.common.TopicFilterType;
 import org.apache.rocketmq.common.message.MessageExt;
 
-
 public class MessageExtBrokerInner extends MessageExt {
     private static final long serialVersionUID = 7256001576878700634L;
     private String propertiesString;
@@ -32,22 +31,18 @@ public class MessageExtBrokerInner extends MessageExt {
         return tags.hashCode();
     }
 
-
     public String getPropertiesString() {
         return propertiesString;
     }
 
-
     public void setPropertiesString(String propertiesString) {
         this.propertiesString = propertiesString;
     }
 
-
     public long getTagsCode() {
         return tagsCode;
     }
 
-
     public void setTagsCode(long tagsCode) {
         this.tagsCode = tagsCode;
     }