You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 08:13:31 UTC

[06/34] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 84a3af6..82fe8f4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -16,19 +16,15 @@
  */
 package org.apache.rocketmq.store.ha;
 
-import org.apache.rocketmq.common.ServiceThread;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.CommitLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.*;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
@@ -36,7 +32,13 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.CommitLog;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HAService {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -56,38 +58,33 @@ public class HAService {
 
     private final HAClient haClient;
 
-
     public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
         this.defaultMessageStore = defaultMessageStore;
         this.acceptSocketService =
-                new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
+            new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
         this.groupTransferService = new GroupTransferService();
         this.haClient = new HAClient();
     }
 
-
     public void updateMasterAddress(final String newAddr) {
         if (this.haClient != null) {
             this.haClient.updateMasterAddress(newAddr);
         }
     }
 
-
     public void putRequest(final CommitLog.GroupCommitRequest request) {
         this.groupTransferService.putRequest(request);
     }
 
-
     public boolean isSlaveOK(final long masterPutWhere) {
         boolean result = this.connectionCount.get() > 0;
         result =
-                result
-                        && ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore
-                        .getMessageStoreConfig().getHaSlaveFallbehindMax());
+            result
+                && ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore
+                .getMessageStoreConfig().getHaSlaveFallbehindMax());
         return result;
     }
 
-
     /**
 
      */
@@ -103,12 +100,10 @@ public class HAService {
         }
     }
 
-
     public AtomicInteger getConnectionCount() {
         return connectionCount;
     }
 
-
     // public void notifyTransferSome() {
     // this.groupTransferService.notifyTransferSome();
     // }
@@ -120,21 +115,18 @@ public class HAService {
         this.haClient.start();
     }
 
-
     public void addConnection(final HAConnection conn) {
         synchronized (this.connectionList) {
             this.connectionList.add(conn);
         }
     }
 
-
     public void removeConnection(final HAConnection conn) {
         synchronized (this.connectionList) {
             this.connectionList.remove(conn);
         }
     }
 
-
     public void shutdown() {
         this.haClient.shutdown();
         this.acceptSocketService.shutdown(true);
@@ -142,7 +134,6 @@ public class HAService {
         this.groupTransferService.shutdown();
     }
 
-
     public void destroyConnections() {
         synchronized (this.connectionList) {
             for (HAConnection c : this.connectionList) {
@@ -153,12 +144,10 @@ public class HAService {
         }
     }
 
-
     public DefaultMessageStore getDefaultMessageStore() {
         return defaultMessageStore;
     }
 
-
     public WaitNotifyObject getWaitNotifyObject() {
         return waitNotifyObject;
     }
@@ -171,9 +160,9 @@ public class HAService {
      * Listens to slave connections to create {@link HAConnection}.
      */
     class AcceptSocketService extends ServiceThread {
+        private final SocketAddress socketAddressListen;
         private ServerSocketChannel serverSocketChannel;
         private Selector selector;
-        private final SocketAddress socketAddressListen;
 
         public AcceptSocketService(final int port) {
             this.socketAddressListen = new InetSocketAddress(port);
@@ -181,6 +170,7 @@ public class HAService {
 
         /**
          * Starts listening to slave connections.
+         *
          * @throws Exception If fails.
          */
         public void beginAccept() throws Exception {
@@ -199,8 +189,7 @@ public class HAService {
             try {
                 this.serverSocketChannel.close();
                 this.selector.close();
-            }
-            catch (IOException e) {
+            } catch (IOException e) {
                 log.error("AcceptSocketService shutdown exception", e);
             }
         }
@@ -218,11 +207,11 @@ public class HAService {
                     if (selected != null) {
                         for (SelectionKey k : selected) {
                             if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
-                                SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
+                                SocketChannel sc = ((ServerSocketChannel)k.channel()).accept();
 
                                 if (sc != null) {
                                     HAService.log.info("HAService receive new connection, "
-                                            + sc.socket().getRemoteSocketAddress());
+                                        + sc.socket().getRemoteSocketAddress());
 
                                     try {
                                         HAConnection conn = new HAConnection(HAService.this, sc);
@@ -264,7 +253,6 @@ public class HAService {
         private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>();
         private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>();
 
-
         public void putRequest(final CommitLog.GroupCommitRequest request) {
             synchronized (this) {
                 this.requestsWrite.add(request);
@@ -274,19 +262,16 @@ public class HAService {
             }
         }
 
-
         public void notifyTransferSome() {
             this.notifyTransferObject.wakeup();
         }
 
-
         private void swapRequests() {
             List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
             this.requestsWrite = this.requestsRead;
             this.requestsRead = tmp;
         }
 
-
         private void doWaitTransfer() {
             if (!this.requestsRead.isEmpty()) {
                 for (CommitLog.GroupCommitRequest req : this.requestsRead) {
@@ -307,7 +292,6 @@ public class HAService {
             }
         }
 
-
         public void run() {
             log.info(this.getServiceName() + " service started");
 
@@ -323,13 +307,11 @@ public class HAService {
             log.info(this.getServiceName() + " service end");
         }
 
-
         @Override
         protected void onWaitEnd() {
             this.swapRequests();
         }
 
-
         @Override
         public String getServiceName() {
             return GroupTransferService.class.getSimpleName();
@@ -349,12 +331,10 @@ public class HAService {
         private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
         private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
 
-
         public HAClient() throws IOException {
             this.selector = RemotingUtil.openSelector();
         }
 
-
         public void updateMasterAddress(final String newAddr) {
             String currentAddr = this.masterAddress.get();
             if (currentAddr == null || !currentAddr.equals(newAddr)) {
@@ -363,17 +343,15 @@ public class HAService {
             }
         }
 
-
         private boolean isTimeToReportOffset() {
             long interval =
-                    HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
+                HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
             boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
-                    .getHaSendHeartbeatInterval();
+                .getHaSendHeartbeatInterval();
 
             return needHeart;
         }
 
-
         private boolean reportSlaveMaxOffset(final long maxOffset) {
             this.reportOffset.position(0);
             this.reportOffset.limit(8);
@@ -386,7 +364,7 @@ public class HAService {
                     this.socketChannel.write(this.reportOffset);
                 } catch (IOException e) {
                     log.error(this.getServiceName()
-                            + "reportSlaveMaxOffset this.socketChannel.write exception", e);
+                        + "reportSlaveMaxOffset this.socketChannel.write exception", e);
                     return false;
                 }
             }
@@ -394,7 +372,6 @@ public class HAService {
             return !this.reportOffset.hasRemaining();
         }
 
-
         // private void reallocateByteBuffer() {
         // ByteBuffer bb = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
         // int remain = this.byteBufferRead.limit() - this.dispatchPostion;
@@ -423,14 +400,12 @@ public class HAService {
             this.dispatchPostion = 0;
         }
 
-
         private void swapByteBuffer() {
             ByteBuffer tmp = this.byteBufferRead;
             this.byteBufferRead = this.byteBufferBackup;
             this.byteBufferBackup = tmp;
         }
 
-
         private boolean processReadEvent() {
             int readSizeZeroTimes = 0;
             while (this.byteBufferRead.hasRemaining()) {
@@ -462,7 +437,6 @@ public class HAService {
             return true;
         }
 
-
         private boolean dispatchReadRequest() {
             final int msgHeaderSize = 8 + 4; // phyoffset + size
             int readSocketPos = this.byteBufferRead.position();
@@ -475,22 +449,19 @@ public class HAService {
 
                     long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
 
-
                     if (slavePhyOffset != 0) {
                         if (slavePhyOffset != masterPhyOffset) {
                             log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
-                                    + slavePhyOffset + " MASTER: " + masterPhyOffset);
+                                + slavePhyOffset + " MASTER: " + masterPhyOffset);
                             return false;
                         }
                     }
 
-
                     if (diff >= (msgHeaderSize + bodySize)) {
                         byte[] bodyData = new byte[bodySize];
                         this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
                         this.byteBufferRead.get(bodyData);
 
-
                         HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
 
                         this.byteBufferRead.position(readSocketPos);
@@ -514,7 +485,6 @@ public class HAService {
             return true;
         }
 
-
         private boolean reportSlaveMaxOffsetPlus() {
             boolean result = true;
             long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
@@ -530,7 +500,6 @@ public class HAService {
             return result;
         }
 
-
         private boolean connectMaster() throws ClosedChannelException {
             if (null == socketChannel) {
                 String addr = this.masterAddress.get();
@@ -553,7 +522,6 @@ public class HAService {
             return this.socketChannel != null;
         }
 
-
         private void closeMaster() {
             if (null != this.socketChannel) {
                 try {
@@ -581,7 +549,6 @@ public class HAService {
             }
         }
 
-
         @Override
         public void run() {
             log.info(this.getServiceName() + " service started");
@@ -597,10 +564,8 @@ public class HAService {
                             }
                         }
 
-
                         this.selector.select(1000);
 
-
                         boolean ok = this.processReadEvent();
                         if (!ok) {
                             this.closeMaster();
@@ -610,14 +575,13 @@ public class HAService {
                             continue;
                         }
 
-
                         long interval =
-                                HAService.this.getDefaultMessageStore().getSystemClock().now()
-                                        - this.lastWriteTimestamp;
+                            HAService.this.getDefaultMessageStore().getSystemClock().now()
+                                - this.lastWriteTimestamp;
                         if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
-                                .getHaHousekeepingInterval()) {
+                            .getHaHousekeepingInterval()) {
                             log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
-                                    + "] expired, " + interval);
+                                + "] expired, " + interval);
                             this.closeMaster();
                             log.warn("HAClient, master not response some time, so close connection");
                         }
@@ -633,7 +597,6 @@ public class HAService {
             log.info(this.getServiceName() + " service end");
         }
 
-
         //
         // private void disableWriteFlag() {
         // if (this.socketChannel != null) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
index c059e10..a96af5e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
@@ -6,27 +6,25 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store.ha;
 
 import java.util.HashMap;
 
-
 public class WaitNotifyObject {
 
     protected final HashMap<Long/* thread id */, Boolean/* notified */> waitingThreadTable =
-            new HashMap<Long, Boolean>(16);
+        new HashMap<Long, Boolean>(16);
 
     protected volatile boolean hasNotified = false;
 
-
     public void wakeup() {
         synchronized (this) {
             if (!this.hasNotified) {
@@ -36,7 +34,6 @@ public class WaitNotifyObject {
         }
     }
 
-
     protected void waitForRunning(long interval) {
         synchronized (this) {
             if (this.hasNotified) {
@@ -56,7 +53,6 @@ public class WaitNotifyObject {
         }
     }
 
-
     protected void onWaitEnd() {
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
index d6a223d..de1a31d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
@@ -6,28 +6,26 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store.index;
 
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.store.MappedFile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.util.List;
-
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.store.MappedFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class IndexFile {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -41,11 +39,10 @@ public class IndexFile {
     private final MappedByteBuffer mappedByteBuffer;
     private final IndexHeader indexHeader;
 
-
     public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
-                     final long endPhyOffset, final long endTimestamp) throws IOException {
+        final long endPhyOffset, final long endTimestamp) throws IOException {
         int fileTotalSize =
-                IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
+            IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
         this.mappedFile = new MappedFile(fileName, fileTotalSize);
         this.fileChannel = this.mappedFile.getFileChannel();
         this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
@@ -66,17 +63,14 @@ public class IndexFile {
         }
     }
 
-
     public String getFileName() {
         return this.mappedFile.getFileName();
     }
 
-
     public void load() {
         this.indexHeader.load();
     }
 
-
     public void flush() {
         long beginTime = System.currentTimeMillis();
         if (this.mappedFile.hold()) {
@@ -114,10 +108,8 @@ public class IndexFile {
 
                 long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
 
-
                 timeDiff = timeDiff / 1000;
 
-
                 if (this.indexHeader.getBeginTimestamp() <= 0) {
                     timeDiff = 0;
                 } else if (timeDiff > Integer.MAX_VALUE) {
@@ -127,19 +119,16 @@ public class IndexFile {
                 }
 
                 int absIndexPos =
-                        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
-                                + this.indexHeader.getIndexCount() * indexSize;
-
+                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+                        + this.indexHeader.getIndexCount() * indexSize;
 
                 this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                 this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
-                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
+                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int)timeDiff);
                 this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
 
-
                 this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
 
-
                 if (this.indexHeader.getIndexCount() <= 1) {
                     this.indexHeader.setBeginPhyOffset(phyOffset);
                     this.indexHeader.setBeginTimestamp(storeTimestamp);
@@ -192,22 +181,22 @@ public class IndexFile {
 
     public boolean isTimeMatched(final long begin, final long end) {
         boolean result =
-                begin < this.indexHeader.getBeginTimestamp() && end > this.indexHeader.getEndTimestamp();
+            begin < this.indexHeader.getBeginTimestamp() && end > this.indexHeader.getEndTimestamp();
 
         result =
-                result
-                        || (begin >= this.indexHeader.getBeginTimestamp() && begin <= this.indexHeader
-                        .getEndTimestamp());
+            result
+                || (begin >= this.indexHeader.getBeginTimestamp() && begin <= this.indexHeader
+                .getEndTimestamp());
 
         result =
-                result
-                        || (end >= this.indexHeader.getBeginTimestamp() && end <= this.indexHeader
-                        .getEndTimestamp());
+            result
+                || (end >= this.indexHeader.getBeginTimestamp() && end <= this.indexHeader
+                .getEndTimestamp());
         return result;
     }
 
     public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
-                                final long begin, final long end, boolean lock) {
+        final long begin, final long end, boolean lock) {
         if (this.mappedFile.hold()) {
             int keyHash = indexKeyHashMethod(key);
             int slotPos = keyHash % this.hashSlotNum;
@@ -227,25 +216,24 @@ public class IndexFile {
                 // }
 
                 if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
-                        || this.indexHeader.getIndexCount() <= 1) {
+                    || this.indexHeader.getIndexCount() <= 1) {
                     // TODO NOTFOUND
                 } else {
-                    for (int nextIndexToRead = slotValue;;) {
+                    for (int nextIndexToRead = slotValue; ; ) {
                         if (phyOffsets.size() >= maxNum) {
                             break;
                         }
 
                         int absIndexPos =
-                                IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
-                                        + nextIndexToRead * indexSize;
+                            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+                                + nextIndexToRead * indexSize;
 
                         int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                         long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
 
-                        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
+                        long timeDiff = (long)this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                         int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
 
-
                         if (timeDiff < 0) {
                             break;
                         }
@@ -260,8 +248,8 @@ public class IndexFile {
                         }
 
                         if (prevIndexRead <= invalidIndex
-                                || prevIndexRead > this.indexHeader.getIndexCount()
-                                || prevIndexRead == nextIndexToRead || timeRead < begin) {
+                            || prevIndexRead > this.indexHeader.getIndexCount()
+                            || prevIndexRead == nextIndexToRead || timeRead < begin) {
                             break;
                         }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java
index 130f08e..0c00abd 100644
--- a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java
+++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java
@@ -20,7 +20,6 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-
 /**
 
  *
@@ -43,12 +42,10 @@ public class IndexHeader {
 
     private AtomicInteger indexCount = new AtomicInteger(1);
 
-
     public IndexHeader(final ByteBuffer byteBuffer) {
         this.byteBuffer = byteBuffer;
     }
 
-
     public void load() {
         this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex));
         this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex));
@@ -72,67 +69,55 @@ public class IndexHeader {
         this.byteBuffer.putInt(indexCountIndex, this.indexCount.get());
     }
 
-
     public long getBeginTimestamp() {
         return beginTimestamp.get();
     }
 
-
     public void setBeginTimestamp(long beginTimestamp) {
         this.beginTimestamp.set(beginTimestamp);
         this.byteBuffer.putLong(beginTimestampIndex, beginTimestamp);
     }
 
-
     public long getEndTimestamp() {
         return endTimestamp.get();
     }
 
-
     public void setEndTimestamp(long endTimestamp) {
         this.endTimestamp.set(endTimestamp);
         this.byteBuffer.putLong(endTimestampIndex, endTimestamp);
     }
 
-
     public long getBeginPhyOffset() {
         return beginPhyOffset.get();
     }
 
-
     public void setBeginPhyOffset(long beginPhyOffset) {
         this.beginPhyOffset.set(beginPhyOffset);
         this.byteBuffer.putLong(beginPhyoffsetIndex, beginPhyOffset);
     }
 
-
     public long getEndPhyOffset() {
         return endPhyOffset.get();
     }
 
-
     public void setEndPhyOffset(long endPhyOffset) {
         this.endPhyOffset.set(endPhyOffset);
         this.byteBuffer.putLong(endPhyoffsetIndex, endPhyOffset);
     }
 
-
     public AtomicInteger getHashSlotCount() {
         return hashSlotCount;
     }
 
-
     public void incHashSlotCount() {
         int value = this.hashSlotCount.incrementAndGet();
         this.byteBuffer.putInt(hashSlotcountIndex, value);
     }
 
-
     public int getIndexCount() {
         return indexCount.get();
     }
 
-
     public void incIndexCount() {
         int value = this.indexCount.incrementAndGet();
         this.byteBuffer.putInt(indexCountIndex, value);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
index 939ba2d..b72ffe9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
@@ -16,6 +16,13 @@
  */
 package org.apache.rocketmq.store.index;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -26,39 +33,25 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-
 public class IndexService {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    /** Maximum times to attempt index file creation. */
+    private static final int MAX_TRY_IDX_CREATE = 3;
     private final DefaultMessageStore defaultMessageStore;
-
     private final int hashSlotNum;
     private final int indexNum;
     private final String storePath;
-
     private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>();
     private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 
-    /** Maximum times to attempt index file creation. */
-    private static final int MAX_TRY_IDX_CREATE = 3;
-
-
     public IndexService(final DefaultMessageStore store) {
         this.defaultMessageStore = store;
         this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
         this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
         this.storePath =
-                StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
+            StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
     }
 
-
     public boolean load(final boolean lastExitOK) {
         File dir = new File(this.storePath);
         File[] files = dir.listFiles();
@@ -72,7 +65,7 @@ public class IndexService {
 
                     if (!lastExitOK) {
                         if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
-                                .getIndexMsgTimestamp()) {
+                            .getIndexMsgTimestamp()) {
                             f.destroy(0);
                             continue;
                         }
@@ -113,7 +106,7 @@ public class IndexService {
         if (files != null) {
             List<IndexFile> fileList = new ArrayList<IndexFile>();
             for (int i = 0; i < (files.length - 1); i++) {
-                IndexFile f = (IndexFile) files[i];
+                IndexFile f = (IndexFile)files[i];
                 if (f.getEndPhyOffset() < offset) {
                     fileList.add(f);
                 } else {
@@ -145,7 +138,6 @@ public class IndexService {
         }
     }
 
-
     public void destroy() {
         try {
             this.readWriteLock.readLock().lock();
@@ -160,7 +152,6 @@ public class IndexService {
         }
     }
 
-
     public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
         List<Long> phyOffsets = new ArrayList<Long>(maxNum);
 
@@ -183,7 +174,6 @@ public class IndexService {
                         f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
                     }
 
-
                     if (f.getBeginTimestamp() < begin) {
                         break;
                     }
@@ -202,12 +192,10 @@ public class IndexService {
         return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
     }
 
-
     private String buildKey(final String topic, final String key) {
         return topic + "#" + key;
     }
 
-
     public void buildIndex(DispatchRequest req) {
         IndexFile indexFile = retryGetAndCreateIndexFile();
         if (indexFile != null) {
@@ -255,7 +243,6 @@ public class IndexService {
         }
     }
 
-
     private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
         for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
             log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
@@ -300,7 +287,6 @@ public class IndexService {
         return indexFile;
     }
 
-
     public IndexFile getAndCreateLastIndexFile() {
         IndexFile indexFile = null;
         IndexFile prevIndexFile = null;
@@ -323,15 +309,14 @@ public class IndexService {
             this.readWriteLock.readLock().unlock();
         }
 
-
         if (indexFile == null) {
             try {
                 String fileName =
-                        this.storePath + File.separator
-                                + UtilAll.timeMillisToHumanString(System.currentTimeMillis());
+                    this.storePath + File.separator
+                        + UtilAll.timeMillisToHumanString(System.currentTimeMillis());
                 indexFile =
-                        new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
-                                lastUpdateIndexTimestamp);
+                    new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
+                        lastUpdateIndexTimestamp);
                 this.readWriteLock.writeLock().lock();
                 this.indexFileList.add(indexFile);
             } catch (Exception e) {
@@ -340,7 +325,6 @@ public class IndexService {
                 this.readWriteLock.writeLock().unlock();
             }
 
-
             if (indexFile != null) {
                 final IndexFile flushThisFile = prevIndexFile;
                 Thread flushThread = new Thread(new Runnable() {
@@ -358,7 +342,6 @@ public class IndexService {
         return indexFile;
     }
 
-
     public void flush(final IndexFile f) {
         if (null == f)
             return;
@@ -377,12 +360,10 @@ public class IndexService {
         }
     }
 
-
     public void start() {
 
     }
 
-
     public void shutdown() {
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/index/QueryOffsetResult.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/index/QueryOffsetResult.java b/store/src/main/java/org/apache/rocketmq/store/index/QueryOffsetResult.java
index e126aee..a864c89 100644
--- a/store/src/main/java/org/apache/rocketmq/store/index/QueryOffsetResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/index/QueryOffsetResult.java
@@ -6,43 +6,38 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store.index;
 
 import java.util.List;
 
-
 public class QueryOffsetResult {
     private final List<Long> phyOffsets;
     private final long indexLastUpdateTimestamp;
     private final long indexLastUpdatePhyoffset;
 
-
     public QueryOffsetResult(List<Long> phyOffsets, long indexLastUpdateTimestamp,
-                             long indexLastUpdatePhyoffset) {
+        long indexLastUpdatePhyoffset) {
         this.phyOffsets = phyOffsets;
         this.indexLastUpdateTimestamp = indexLastUpdateTimestamp;
         this.indexLastUpdatePhyoffset = indexLastUpdatePhyoffset;
     }
 
-
     public List<Long> getPhyOffsets() {
         return phyOffsets;
     }
 
-
     public long getIndexLastUpdateTimestamp() {
         return indexLastUpdateTimestamp;
     }
 
-
     public long getIndexLastUpdatePhyoffset() {
         return indexLastUpdatePhyoffset;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
index b1520e1..ebe3ffe 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
@@ -6,34 +6,30 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store.schedule;
 
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-
 import java.util.concurrent.ConcurrentHashMap;
-
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 /**
  *
  */
 public class DelayOffsetSerializeWrapper extends RemotingSerializable {
     private ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable =
-            new ConcurrentHashMap<Integer, Long>(32);
-
+        new ConcurrentHashMap<Integer, Long>(32);
 
     public ConcurrentHashMap<Integer, Long> getOffsetTable() {
         return offsetTable;
     }
 
-
     public void setOffsetTable(ConcurrentHashMap<Integer, Long> offsetTable) {
         this.offsetTable = offsetTable;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index c03c181..3df4806 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -6,16 +6,23 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store.schedule;
 
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.TopicFilterType;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -24,16 +31,16 @@ import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.running.RunningStats;
-import org.apache.rocketmq.store.*;
+import org.apache.rocketmq.store.ConsumeQueue;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
 /**
  *
  */
@@ -45,10 +52,10 @@ public class ScheduleMessageService extends ConfigManager {
     private static final long DELAY_FOR_A_PERIOD = 10000L;
 
     private final ConcurrentHashMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
-            new ConcurrentHashMap<Integer, Long>(32);
+        new ConcurrentHashMap<Integer, Long>(32);
 
     private final ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable =
-            new ConcurrentHashMap<Integer, Long>(32);
+        new ConcurrentHashMap<Integer, Long>(32);
 
     private final Timer timer = new Timer("ScheduleMessageTimerThread", true);
 
@@ -56,7 +63,6 @@ public class ScheduleMessageService extends ConfigManager {
 
     private int maxDelayLevel;
 
-
     public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) {
         this.defaultMessageStore = defaultMessageStore;
     }
@@ -65,6 +71,10 @@ public class ScheduleMessageService extends ConfigManager {
         return queueId + 1;
     }
 
+    public static int delayLevel2QueueId(final int delayLevel) {
+        return delayLevel - 1;
+    }
+
     public void buildRunningStats(HashMap<String, String> stats) {
         Iterator<Entry<Integer, Long>> it = this.offsetTable.entrySet().iterator();
         while (it.hasNext()) {
@@ -78,16 +88,10 @@ public class ScheduleMessageService extends ConfigManager {
         }
     }
 
-    public static int delayLevel2QueueId(final int delayLevel) {
-        return delayLevel - 1;
-    }
-
-
     private void updateOffset(int delayLevel, long offset) {
         this.offsetTable.put(delayLevel, offset);
     }
 
-
     public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
         Long time = this.delayLevelTable.get(delayLevel);
         if (time != null) {
@@ -97,7 +101,6 @@ public class ScheduleMessageService extends ConfigManager {
         return storeTimestamp + 1000;
     }
 
-
     public void start() {
 
         for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
@@ -126,17 +129,14 @@ public class ScheduleMessageService extends ConfigManager {
         }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
     }
 
-
     public void shutdown() {
         this.timer.cancel();
     }
 
-
     public int getMaxDelayLevel() {
         return maxDelayLevel;
     }
 
-
     public String encode() {
         return this.encode(false);
     }
@@ -150,14 +150,14 @@ public class ScheduleMessageService extends ConfigManager {
     @Override
     public String configFilePath() {
         return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig()
-                .getStorePathRootDir());
+            .getStorePathRootDir());
     }
 
     @Override
     public void decode(String jsonString) {
         if (jsonString != null) {
             DelayOffsetSerializeWrapper delayOffsetSerializeWrapper =
-                    DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class);
+                DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class);
             if (delayOffsetSerializeWrapper != null) {
                 this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable());
             }
@@ -206,13 +206,11 @@ public class ScheduleMessageService extends ConfigManager {
         private final int delayLevel;
         private final long offset;
 
-
         public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
             this.delayLevel = delayLevel;
             this.offset = offset;
         }
 
-
         @Override
         public void run() {
             try {
@@ -221,11 +219,10 @@ public class ScheduleMessageService extends ConfigManager {
                 // XXX: warn and notify me
                 log.error("ScheduleMessageService, executeOnTimeup exception", e);
                 ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
-                        this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
+                    this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
             }
         }
 
-
         /**
 
          *
@@ -243,11 +240,10 @@ public class ScheduleMessageService extends ConfigManager {
             return result;
         }
 
-
         public void executeOnTimeup() {
             ConsumeQueue cq =
-                    ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
-                            delayLevel2QueueId(delayLevel));
+                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
+                    delayLevel2QueueId(delayLevel));
 
             long failScheduleOffset = offset;
 
@@ -262,7 +258,6 @@ public class ScheduleMessageService extends ConfigManager {
                             int sizePy = bufferCQ.getByteBuffer().getInt();
                             long tagsCode = bufferCQ.getByteBuffer().getLong();
 
-
                             long now = System.currentTimeMillis();
                             long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
 
@@ -272,31 +267,29 @@ public class ScheduleMessageService extends ConfigManager {
 
                             if (countdown <= 0) {
                                 MessageExt msgExt =
-                                        ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
-                                                offsetPy, sizePy);
+                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
+                                        offsetPy, sizePy);
 
                                 if (msgExt != null) {
                                     try {
                                         MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                         PutMessageResult putMessageResult =
-                                                ScheduleMessageService.this.defaultMessageStore
-                                                        .putMessage(msgInner);
+                                            ScheduleMessageService.this.defaultMessageStore
+                                                .putMessage(msgInner);
 
                                         if (putMessageResult != null
-                                                && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                             continue;
-                                        }
-
-                                        else {
+                                        } else {
                                             // XXX: warn and notify me
                                             log.error(
-                                                    "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
-                                                    msgExt.getTopic(), msgExt.getMsgId());
+                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
+                                                msgExt.getTopic(), msgExt.getMsgId());
                                             ScheduleMessageService.this.timer.schedule(
-                                                    new DeliverDelayedMessageTimerTask(this.delayLevel,
-                                                            nextOffset), DELAY_FOR_A_PERIOD);
+                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
+                                                    nextOffset), DELAY_FOR_A_PERIOD);
                                             ScheduleMessageService.this.updateOffset(this.delayLevel,
-                                                    nextOffset);
+                                                nextOffset);
                                             return;
                                         }
                                     } catch (Exception e) {
@@ -307,17 +300,15 @@ public class ScheduleMessageService extends ConfigManager {
 
                                          */
                                         log.error(
-                                                "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
-                                                        + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
-                                                        + offsetPy + ",sizePy=" + sizePy, e);
+                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+                                                + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+                                                + offsetPy + ",sizePy=" + sizePy, e);
                                     }
                                 }
-                            }
-
-                            else {
+                            } else {
                                 ScheduleMessageService.this.timer.schedule(
-                                        new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
-                                        countdown);
+                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
+                                    countdown);
                                 ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                 return;
                             }
@@ -325,7 +316,7 @@ public class ScheduleMessageService extends ConfigManager {
 
                         nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
-                                this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
+                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                         ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                         return;
                     } finally {
@@ -342,16 +333,15 @@ public class ScheduleMessageService extends ConfigManager {
                     if (offset < cqMinOffset) {
                         failScheduleOffset = cqMinOffset;
                         log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
-                                + cqMinOffset + ", queueId=" + cq.getQueueId());
+                            + cqMinOffset + ", queueId=" + cq.getQueueId());
                     }
                 }
             } // end of if (cq != null)
 
             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
-                    failScheduleOffset), DELAY_FOR_A_WHILE);
+                failScheduleOffset), DELAY_FOR_A_WHILE);
         }
 
-
         private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
             MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
             msgInner.setBody(msgExt.getBody());
@@ -360,7 +350,7 @@ public class ScheduleMessageService extends ConfigManager {
 
             TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
             long tagsCodeValue =
-                    MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
+                MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
             msgInner.setTagsCode(tagsCodeValue);
             msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
index dd4f6df..cd87b0d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store.stats;
 
@@ -21,7 +21,6 @@ import org.apache.rocketmq.store.DefaultMessageStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class BrokerStats {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final DefaultMessageStore defaultMessageStore;
@@ -34,12 +33,10 @@ public class BrokerStats {
 
     private volatile long msgGetTotalTodayMorning;
 
-
     public BrokerStats(DefaultMessageStore defaultMessageStore) {
         this.defaultMessageStore = defaultMessageStore;
     }
 
-
     /**
 
      */
@@ -48,60 +45,50 @@ public class BrokerStats {
         this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning;
 
         this.msgPutTotalTodayMorning =
-                this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
+            this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
         this.msgGetTotalTodayMorning =
-                this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get();
+            this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get();
 
         log.info("yesterday put message total: {}", msgPutTotalTodayMorning - msgPutTotalYesterdayMorning);
         log.info("yesterday get message total: {}", msgGetTotalTodayMorning - msgGetTotalYesterdayMorning);
     }
 
-
     public long getMsgPutTotalYesterdayMorning() {
         return msgPutTotalYesterdayMorning;
     }
 
-
     public void setMsgPutTotalYesterdayMorning(long msgPutTotalYesterdayMorning) {
         this.msgPutTotalYesterdayMorning = msgPutTotalYesterdayMorning;
     }
 
-
     public long getMsgPutTotalTodayMorning() {
         return msgPutTotalTodayMorning;
     }
 
-
     public void setMsgPutTotalTodayMorning(long msgPutTotalTodayMorning) {
         this.msgPutTotalTodayMorning = msgPutTotalTodayMorning;
     }
 
-
     public long getMsgGetTotalYesterdayMorning() {
         return msgGetTotalYesterdayMorning;
     }
 
-
     public void setMsgGetTotalYesterdayMorning(long msgGetTotalYesterdayMorning) {
         this.msgGetTotalYesterdayMorning = msgGetTotalYesterdayMorning;
     }
 
-
     public long getMsgGetTotalTodayMorning() {
         return msgGetTotalTodayMorning;
     }
 
-
     public void setMsgGetTotalTodayMorning(long msgGetTotalTodayMorning) {
         this.msgGetTotalTodayMorning = msgGetTotalTodayMorning;
     }
 
-
     public long getMsgPutTotalTodayNow() {
         return this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
     }
 
-
     public long getMsgGetTotalTodayNow() {
         return this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get();
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index b14780b..f128b09 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -6,16 +6,19 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store.stats;
 
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.stats.MomentStatsItemSet;
@@ -24,11 +27,6 @@ import org.apache.rocketmq.common.stats.StatsItemSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-
 public class BrokerStatsManager {
 
     public static final String TOPIC_PUT_NUMS = "TOPIC_PUT_NUMS";
@@ -65,9 +63,9 @@ public class BrokerStatsManager {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_STATS_LOGGER_NAME);
     private static final Logger COMMERCIAL_LOG = LoggerFactory.getLogger(LoggerName.COMMERCIAL_LOGGER_NAME);
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
-            "BrokerStatsThread"));
+        "BrokerStatsThread"));
     private final ScheduledExecutorService commercialExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
-            "CommercialStatsThread"));
+        "CommercialStatsThread"));
     private final HashMap<String, StatsItemSet> statsTable = new HashMap<String, StatsItemSet>();
     private final String clusterName;
     private final MomentStatsItemSet momentStatsItemSetFallSize = new MomentStatsItemSet(GROUP_GET_FALL_SIZE, scheduledExecutorService, log);
@@ -89,7 +87,6 @@ public class BrokerStatsManager {
         this.statsTable.put(BROKER_GET_FROM_DISK_NUMS, new StatsItemSet(BROKER_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log));
         this.statsTable.put(BROKER_GET_FROM_DISK_SIZE, new StatsItemSet(BROKER_GET_FROM_DISK_SIZE, this.scheduledExecutorService, log));
 
-
         this.statsTable.put(COMMERCIAL_SEND_TIMES, new StatsItemSet(COMMERCIAL_SEND_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
         this.statsTable.put(COMMERCIAL_RCV_TIMES, new StatsItemSet(COMMERCIAL_RCV_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
         this.statsTable.put(COMMERCIAL_SEND_SIZE, new StatsItemSet(COMMERCIAL_SEND_SIZE, this.commercialExecutor, COMMERCIAL_LOG));
@@ -154,42 +151,36 @@ public class BrokerStatsManager {
         this.statsTable.get(GROUP_GET_LATENCY).addValue(statsKey, incValue, 1);
     }
 
-
     public void incBrokerPutNums() {
         this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet();
     }
 
-
     public void incBrokerGetNums(final int incValue) {
         this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue);
     }
 
-
     public void incSendBackNums(final String group, final String topic) {
         final String statsKey = buildStatsKey(topic, group);
         this.statsTable.get(SNDBCK_PUT_NUMS).addValue(statsKey, 1, 1);
     }
 
-
     public double tpsGroupGetNums(final String group, final String topic) {
         final String statsKey = buildStatsKey(topic, group);
         return this.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps();
     }
 
-
     public void recordDiskFallBehindTime(final String group, final String topic, final int queueId, final long fallBehind) {
         final String statsKey = String.format("%d@%s@%s", queueId, topic, group);
         this.momentStatsItemSetFallTime.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
     }
 
-
     public void recordDiskFallBehindSize(final String group, final String topic, final int queueId, final long fallBehind) {
         final String statsKey = String.format("%d@%s@%s", queueId, topic, group);
         this.momentStatsItemSetFallSize.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
     }
 
     public void incCommercialValue(final String key, final String owner, final String group,
-                                   final String topic, final String type, final int incValue) {
+        final String topic, final String type, final int incValue) {
         final String statsKey = buildCommercialStatsKey(owner, topic, group, type);
         this.statsTable.get(key).addValue(statsKey, incValue, 1);
     }
@@ -206,7 +197,6 @@ public class BrokerStatsManager {
         return strBuilder.toString();
     }
 
-
     public enum StatsType {
         SEND_SUCCESS,
         SEND_FAILURE,

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/LibC.java b/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
index ee9e68b..dc5d6a9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
+++ b/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store.util;
 
@@ -22,9 +22,8 @@ import com.sun.jna.NativeLong;
 import com.sun.jna.Platform;
 import com.sun.jna.Pointer;
 
-
 public interface LibC extends Library {
-    LibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
+    LibC INSTANCE = (LibC)Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
 
     int MADV_WILLNEED = 3;
     int MADV_DONTNEED = 4;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index bfcb33e..5c6bde2 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -6,17 +6,21 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.store;
 
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -26,17 +30,11 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import static org.junit.Assert.assertTrue;
 
-
 public class DefaultMessageStoreTest {
     private static final Logger logger = LoggerFactory.getLogger(DefaultMessageStoreTest.class);
-    
+
     private static final String StoreMessage = "Once, there was a chance for me!";
 
     private static int QUEUE_TOTAL = 100;
@@ -82,7 +80,7 @@ public class DefaultMessageStoreTest {
                 PutMessageResult result = master.putMessage(buildMessage());
                 logger.debug(i + "\t" + result.getAppendMessageResult().getMsgId());
             }
-    
+
             for (long i = 0; i < totalMsgs; i++) {
                 GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
                 if (result == null) {
@@ -133,7 +131,7 @@ public class DefaultMessageStoreTest {
                 PutMessageResult result = master.putMessage(buildMessage());
                 logger.debug(i + "\t" + result.getAppendMessageResult().getMsgId());
             }
-    
+
             for (long i = 0; i < totalMsgs; i++) {
                 GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
                 if (result == null) {
@@ -142,7 +140,7 @@ public class DefaultMessageStoreTest {
                 assertTrue(result != null);
                 result.release();
                 logger.debug("read " + i + " OK");
-        
+
             }
         } finally {
             master.shutdown();
@@ -150,9 +148,9 @@ public class DefaultMessageStoreTest {
         }
         logger.debug("================================================================");
     }
-    
+
     private class MyMessageArrivingListener implements MessageArrivingListener {
-    
+
         @Override
         public void arriving(String topic, int queueId, long logicOffset, long tagsCode) {
             // Do nothing here

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
index ac1e016..018cf99 100644
--- a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
@@ -6,13 +6,15 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: MappedFileQueueTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
  */
 
 /**
@@ -20,12 +22,17 @@
  */
 package org.apache.rocketmq.store;
 
-import org.junit.*;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class MappedFileQueueTest {
     private static final Logger logger = LoggerFactory.getLogger(MappedFileQueueTest.class);
@@ -55,7 +62,7 @@ public class MappedFileQueueTest {
 
         logger.debug("================================================================");
         MappedFileQueue mappedFileQueue =
-                new MappedFileQueue("target/unit_test_store/a/", 1024, null);
+            new MappedFileQueue("target/unit_test_store/a/", 1024, null);
 
         for (int i = 0; i < 1024; i++) {
             MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
@@ -73,7 +80,6 @@ public class MappedFileQueueTest {
         logger.debug("MappedFileQueue.getLastMappedFile() OK");
     }
 
-
     @Test
     public void test_findMapedFileByOffset() {
         // four-byte string.
@@ -81,7 +87,7 @@ public class MappedFileQueueTest {
 
         logger.debug("================================================================");
         MappedFileQueue mappedFileQueue =
-                new MappedFileQueue("target/unit_test_store/b/", 1024, null);
+            new MappedFileQueue("target/unit_test_store/b/", 1024, null);
 
         for (int i = 0; i < 1024; i++) {
             MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
@@ -96,23 +102,23 @@ public class MappedFileQueueTest {
         MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(0);
         assertTrue(mappedFile != null);
         assertEquals(mappedFile.getFileFromOffset(), 0);
-        
+
         mappedFile = mappedFileQueue.findMappedFileByOffset(100);
         assertTrue(mappedFile != null);
         assertEquals(mappedFile.getFileFromOffset(), 0);
-        
+
         mappedFile = mappedFileQueue.findMappedFileByOffset(1024);
         assertTrue(mappedFile != null);
         assertEquals(mappedFile.getFileFromOffset(), 1024);
-        
+
         mappedFile = mappedFileQueue.findMappedFileByOffset(1024 + 100);
         assertTrue(mappedFile != null);
         assertEquals(mappedFile.getFileFromOffset(), 1024);
-        
+
         mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2);
         assertTrue(mappedFile != null);
         assertEquals(mappedFile.getFileFromOffset(), 1024 * 2);
-        
+
         mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2 + 100);
         assertTrue(mappedFile != null);
         assertEquals(mappedFile.getFileFromOffset(), 1024 * 2);
@@ -135,7 +141,7 @@ public class MappedFileQueueTest {
 
         logger.debug("================================================================");
         MappedFileQueue mappedFileQueue =
-                new MappedFileQueue("target/unit_test_store/c/", 1024, null);
+            new MappedFileQueue("target/unit_test_store/c/", 1024, null);
 
         for (int i = 0; i < 1024; i++) {
             MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
@@ -148,27 +154,27 @@ public class MappedFileQueueTest {
         boolean result = mappedFileQueue.flush(0);
         assertFalse(result);
         assertEquals(1024 * 1, mappedFileQueue.getFlushedWhere());
-        
+
         result = mappedFileQueue.flush(0);
         assertFalse(result);
         assertEquals(1024 * 2, mappedFileQueue.getFlushedWhere());
-        
+
         result = mappedFileQueue.flush(0);
         assertFalse(result);
         assertEquals(1024 * 3, mappedFileQueue.getFlushedWhere());
-        
+
         result = mappedFileQueue.flush(0);
         assertFalse(result);
         assertEquals(1024 * 4, mappedFileQueue.getFlushedWhere());
-        
+
         result = mappedFileQueue.flush(0);
         assertFalse(result);
         assertEquals(1024 * 5, mappedFileQueue.getFlushedWhere());
-        
+
         result = mappedFileQueue.flush(0);
         assertFalse(result);
         assertEquals(1024 * 6, mappedFileQueue.getFlushedWhere());
-        
+
         mappedFileQueue.shutdown(1000);
         mappedFileQueue.destroy();
         logger.debug("MappedFileQueue.flush() OK");
@@ -180,7 +186,7 @@ public class MappedFileQueueTest {
 
         logger.debug("================================================================");
         MappedFileQueue mappedFileQueue =
-                new MappedFileQueue("target/unit_test_store/d/", 1024, null);
+            new MappedFileQueue("target/unit_test_store/d/", 1024, null);
 
         for (int i = 0; i < 1024; i++) {
             MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java
index bfa09e9..a10f4eb 100644
--- a/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java
@@ -6,13 +6,15 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: MappedFileTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
  */
 
 /**
@@ -20,6 +22,7 @@
  */
 package org.apache.rocketmq.store;
 
+import java.io.IOException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -27,16 +30,12 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 import static org.junit.Assert.assertTrue;
 
-
 public class MappedFileTest {
-    
+
     private static final Logger logger = LoggerFactory.getLogger(MappedFileTest.class);
-    
-    
+
     private static final String StoreMessage = "Once, there was a chance for me!";
 
     @BeforeClass
@@ -54,15 +53,15 @@ public class MappedFileTest {
         boolean result = mappedFile.appendMessage(StoreMessage.getBytes());
         assertTrue(result);
         logger.debug("write OK");
-    
+
         SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
         byte[] data = new byte[StoreMessage.length()];
         selectMappedBufferResult.getByteBuffer().get(data);
         String readString = new String(data);
-    
+
         logger.debug("Read: " + readString);
         assertTrue(readString.equals(StoreMessage));
-    
+
         mappedFile.shutdown(1000);
         assertTrue(!mappedFile.isAvailable());
         selectMappedBufferResult.release();
@@ -76,11 +75,11 @@ public class MappedFileTest {
         boolean result = mappedFile.appendMessage(StoreMessage.getBytes());
         assertTrue(result);
         logger.debug("write OK");
-    
+
         SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
         selectMappedBufferResult.release();
         mappedFile.shutdown(1000);
-    
+
         byte[] data = new byte[StoreMessage.length()];
         selectMappedBufferResult.getByteBuffer().get(data);
         String readString = new String(data);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java b/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java
index f0245a9..2fe4d46 100644
--- a/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java
@@ -6,13 +6,15 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: StoreCheckpointTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
  */
 
 /**
@@ -20,15 +22,13 @@
  */
 package org.apache.rocketmq.store;
 
+import java.io.IOException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
-
 import static org.junit.Assert.assertTrue;
 
-
 public class StoreCheckpointTest {
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
@@ -47,7 +47,7 @@ public class StoreCheckpointTest {
         storeCheckpoint.setPhysicMsgTimestamp(physicMsgTimestamp);
         storeCheckpoint.setLogicsMsgTimestamp(logicsMsgTimestamp);
         storeCheckpoint.flush();
-    
+
         long diff = physicMsgTimestamp - storeCheckpoint.getMinTimestamp();
         assertTrue(diff == 3000);
         storeCheckpoint.shutdown();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java b/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java
index 95cf0c8..cdecb20 100644
--- a/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java
@@ -6,13 +6,15 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: IndexFileTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
  */
 
 /**
@@ -20,16 +22,14 @@
  */
 package org.apache.rocketmq.store.index;
 
-import org.junit.Test;
-
 import java.util.ArrayList;
 import java.util.List;
+import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-
 public class IndexFileTest {
     private static final int HASH_SLOT_NUM = 100;
     private static final int INDEX_NUM = 400;
@@ -45,15 +45,14 @@ public class IndexFileTest {
         // put over index file capacity.
         boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
         assertFalse(putResult);
-    
+
         indexFile.destroy(0);
     }
 
-
     @Test
     public void test_put_get_index() throws Exception {
         IndexFile indexFile = new IndexFile("200", HASH_SLOT_NUM, INDEX_NUM, 0, 0);
-    
+
         for (long i = 0; i < (INDEX_NUM - 1); i++) {
             boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
             assertTrue(putResult);
@@ -62,7 +61,7 @@ public class IndexFileTest {
         // put over index file capacity.
         boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
         assertFalse(putResult);
-    
+
         final List<Long> phyOffsets = new ArrayList<Long>();
         indexFile.selectPhyOffset(phyOffsets, "60", 10, 0, Long.MAX_VALUE, true);
         assertFalse(phyOffsets.isEmpty());

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java
index 7ac8233..c5d9756 100644
--- a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java
@@ -6,13 +6,15 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: ScheduleMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
  */
 
 /**
@@ -20,19 +22,21 @@
  */
 package org.apache.rocketmq.store.schedule;
 
-import org.apache.rocketmq.store.*;
-import org.apache.rocketmq.store.*;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import static org.junit.Assert.assertTrue;
 
 @Ignore
@@ -65,7 +69,6 @@ public class ScheduleMessageTest {
         long totalMsgs = 10000;
         QUEUE_TOTAL = 32;
 
-
         MessageBody = StoreMessage.getBytes();
 
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
@@ -79,7 +82,6 @@ public class ScheduleMessageTest {
         boolean load = master.load();
         assertTrue(load);
 
-
         master.start();
         for (int i = 0; i < totalMsgs; i++) {
             MessageExtBrokerInner msg = buildMessage();
@@ -92,7 +94,6 @@ public class ScheduleMessageTest {
         System.out.println("write message over, wait time up");
         Thread.sleep(1000 * 20);
 
-
         for (long i = 0; i < totalMsgs; i++) {
             try {
                 GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
@@ -110,10 +111,8 @@ public class ScheduleMessageTest {
 
         Thread.sleep(1000 * 15);
 
-
         master.shutdown();
 
-
         master.destroy();
         System.out.println("================================================================");
     }