You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/01/01 05:03:07 UTC

[rocketmq] branch develop updated: [ISSUE #5714] Fix the issue that broker can't work normally when transientStorePool=true in controller mode (#5722)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e1dfc0db5 [ISSUE #5714] Fix the issue that broker can't work normally when transientStorePool=true in controller mode (#5722)
e1dfc0db5 is described below

commit e1dfc0db5347059edf31237c0e62889644c0dff9
Author: rongtong <ji...@163.com>
AuthorDate: Sun Jan 1 13:02:53 2023 +0800

    [ISSUE #5714] Fix the issue that broker can't work normally when transientStorePool=true in controller mode (#5722)
    
    * Fix the issue that the slave role does not initialize the transientPool in controller mode
    
    * Format the checkstyle
    
    * Remove the useless import
    
    * Fix the HA transmission disconnection issue when transientStorePoolEnable is true
    
    * just test
    
    * just test
    
    * just test
    
    * just test
    
    * just test
    
    * just test
    
    * just test
    
    * just test
    
    * just test
    
    * Format the check style
    
    * Format the check style
---
 .../broker/processor/AdminBrokerProcessor.java     |  4 +-
 .../rocketmq/store/AllocateMappedFileService.java  |  4 +-
 .../java/org/apache/rocketmq/store/CommitLog.java  | 50 ++++++++++------------
 .../apache/rocketmq/store/DefaultMessageStore.java | 16 ++++++-
 .../org/apache/rocketmq/store/FlushManager.java    | 36 ++++++++++++++++
 .../apache/rocketmq/store/TransientStorePool.java  | 22 +++++++---
 .../rocketmq/store/config/MessageStoreConfig.java  |  6 +--
 .../store/ha/autoswitch/AutoSwitchHAService.java   | 22 ++++++++++
 .../rocketmq/store/logfile/DefaultMappedFile.java  | 48 +++++++++++----------
 9 files changed, 140 insertions(+), 68 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 12eab475b..24162022c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -174,6 +174,7 @@ import org.apache.rocketmq.remoting.rpc.RpcException;
 import org.apache.rocketmq.remoting.rpc.RpcRequest;
 import org.apache.rocketmq.remoting.rpc.RpcResponse;
 import org.apache.rocketmq.store.ConsumeQueueExt;
+import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
@@ -2276,7 +2277,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         }
         MessageStore messageStore = this.brokerController.getMessageStore();
         runtimeInfo.put("remainTransientStoreBufferNumbs", String.valueOf(messageStore.remainTransientStoreBufferNumbs()));
-        if (this.brokerController.getMessageStoreConfig().isTransientStorePoolEnable()) {
+        if (this.brokerController.getMessageStore() instanceof DefaultMessageStore && ((DefaultMessageStore) this.brokerController.getMessageStore()).isTransientStorePoolEnable()) {
             runtimeInfo.put("remainHowManyDataToCommit", MixAll.humanReadableByteCount(messageStore.remainHowManyDataToCommit(), false));
         }
         runtimeInfo.put("remainHowManyDataToFlush", MixAll.humanReadableByteCount(messageStore.remainHowManyDataToFlush(), false));
@@ -2606,7 +2607,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         return response;
     }
 
-
     private RemotingCommand resetMasterFlushOffset(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
index 4d2fc5168..dca7d5325 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
@@ -52,7 +52,7 @@ public class AllocateMappedFileService extends ServiceThread {
 
     public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
         int canSubmitRequests = 2;
-        if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+        if (this.messageStore.isTransientStorePoolEnable()) {
             if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
                 && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
                 canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
@@ -171,7 +171,7 @@ public class AllocateMappedFileService extends ServiceThread {
                 long beginTime = System.currentTimeMillis();
 
                 MappedFile mappedFile;
-                if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+                if (messageStore.isTransientStorePoolEnable()) {
                     try {
                         mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                         mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index c38c2168e..d7e141d31 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -531,7 +531,6 @@ public class CommitLog implements Swappable {
 
             return dispatchRequest;
         } catch (Exception e) {
-            log.error("Check message and return size error", e);
         }
 
         return new DispatchRequest(-1, false /* success */);
@@ -1821,24 +1820,13 @@ public class CommitLog implements Swappable {
 
     }
 
-    interface FlushManager {
-        void start();
-
-        void shutdown();
-
-        void wakeUpFlush();
-
-        void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt);
-
-        CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt);
-    }
 
     class DefaultFlushManager implements FlushManager {
 
         private final FlushCommitLogService flushCommitLogService;
 
         //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
-        private final FlushCommitLogService commitLogService;
+        private final FlushCommitLogService commitRealTimeService;
 
         public DefaultFlushManager() {
             if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
@@ -1847,15 +1835,14 @@ public class CommitLog implements Swappable {
                 this.flushCommitLogService = new CommitLog.FlushRealTimeService();
             }
 
-            this.commitLogService = new CommitLog.CommitRealTimeService();
+            this.commitRealTimeService = new CommitLog.CommitRealTimeService();
         }
 
-        @Override
-        public void start() {
+        @Override public void start() {
             this.flushCommitLogService.start();
 
-            if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
-                this.commitLogService.start();
+            if (defaultMessageStore.isTransientStorePoolEnable()) {
+                this.commitRealTimeService.start();
             }
         }
 
@@ -1870,14 +1857,12 @@ public class CommitLog implements Swappable {
                     CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
                     PutMessageStatus flushStatus = null;
                     try {
-                        flushStatus = flushOkFuture.get(CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
-                            TimeUnit.MILLISECONDS);
+                        flushStatus = flushOkFuture.get(CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), TimeUnit.MILLISECONDS);
                     } catch (InterruptedException | ExecutionException | TimeoutException e) {
                         //flushOK=false;
                     }
                     if (flushStatus != PutMessageStatus.PUT_OK) {
-                        log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
-                            + " client address: " + messageExt.getBornHostString());
+                        log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString());
                         putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                     }
                 } else {
@@ -1886,10 +1871,10 @@ public class CommitLog implements Swappable {
             }
             // Asynchronous flush
             else {
-                if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+                if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {
                     flushCommitLogService.wakeup();
                 } else {
-                    commitLogService.wakeup();
+                    commitRealTimeService.wakeup();
                 }
             }
         }
@@ -1911,10 +1896,10 @@ public class CommitLog implements Swappable {
             }
             // Asynchronous flush
             else {
-                if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+                if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {
                     flushCommitLogService.wakeup();
                 } else {
-                    commitLogService.wakeup();
+                    commitRealTimeService.wakeup();
                 }
                 return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
             }
@@ -1926,10 +1911,16 @@ public class CommitLog implements Swappable {
             flushCommitLogService.wakeup();
         }
 
+        @Override
+        public void wakeUpCommit() {
+            // now wake up commit log thread.
+            commitRealTimeService.wakeup();
+        }
+
         @Override
         public void shutdown() {
-            if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
-                this.commitLogService.shutdown();
+            if (defaultMessageStore.isTransientStorePoolEnable()) {
+                this.commitRealTimeService.shutdown();
             }
 
             this.flushCommitLogService.shutdown();
@@ -1963,4 +1954,7 @@ public class CommitLog implements Swappable {
         this.getMappedFileQueue().cleanSwappedMap(forceCleanSwapIntervalMs);
     }
 
+    public FlushManager getFlushManager() {
+        return flushManager;
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index f42960c42..3cf8efdfa 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -223,7 +223,7 @@ public class DefaultMessageStore implements MessageStore {
 
         this.reputMessageService = new ReputMessageService();
 
-        this.transientStorePool = new TransientStorePool(messageStoreConfig);
+        this.transientStorePool = new TransientStorePool(this);
 
         this.scheduledExecutorService =
             Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
@@ -338,7 +338,7 @@ public class DefaultMessageStore implements MessageStore {
             this.haService.init(this);
         }
 
-        if (messageStoreConfig.isTransientStorePoolEnable()) {
+        if (this.isTransientStorePoolEnable()) {
             this.transientStorePool.init();
         }
 
@@ -1067,6 +1067,7 @@ public class DefaultMessageStore implements MessageStore {
         return this.commitLog.getMaxOffset();
     }
 
+
     @Override
     public long getMinPhyOffset() {
         return this.commitLog.getMinOffset();
@@ -2745,4 +2746,15 @@ public class DefaultMessageStore implements MessageStore {
     public void initMetrics(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) {
         DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier, this);
     }
+
+    /**
+     * Enable transient commitLog store pool only if transientStorePoolEnable is true and broker role is not SLAVE or
+     * enableControllerMode is true
+     *
+     * @return <tt>true</tt> or <tt>false</tt>
+     */
+    public boolean isTransientStorePoolEnable() {
+        return this.messageStoreConfig.isTransientStorePoolEnable() &&
+            (this.brokerConfig.isEnableControllerMode() || this.messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE);
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/FlushManager.java b/store/src/main/java/org/apache/rocketmq/store/FlushManager.java
new file mode 100644
index 000000000..fe3951ae7
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/FlushManager.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public interface FlushManager {
+
+    void start();
+
+    void shutdown();
+
+    void wakeUpFlush();
+
+    void wakeUpCommit();
+
+    void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt);
+
+    CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt);
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
index a873fe05b..cab7f931f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
+++ b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.util.LibC;
 import sun.nio.ch.DirectBuffer;
 
@@ -34,12 +33,13 @@ public class TransientStorePool {
     private final int poolSize;
     private final int fileSize;
     private final Deque<ByteBuffer> availableBuffers;
-    private final MessageStoreConfig storeConfig;
+    private final DefaultMessageStore messageStore;
+    private volatile boolean isRealCommit;
 
-    public TransientStorePool(final MessageStoreConfig storeConfig) {
-        this.storeConfig = storeConfig;
-        this.poolSize = storeConfig.getTransientStorePoolSize();
-        this.fileSize = storeConfig.getMappedFileSizeCommitLog();
+    public TransientStorePool(final DefaultMessageStore messageStore) {
+        this.messageStore = messageStore;
+        this.poolSize = messageStore.getMessageStoreConfig().getTransientStorePoolSize();
+        this.fileSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
         this.availableBuffers = new ConcurrentLinkedDeque<>();
     }
 
@@ -81,9 +81,17 @@ public class TransientStorePool {
     }
 
     public int availableBufferNums() {
-        if (storeConfig.isTransientStorePoolEnable()) {
+        if (messageStore.isTransientStorePoolEnable()) {
             return availableBuffers.size();
         }
         return Integer.MAX_VALUE;
     }
+
+    public boolean isRealCommit() {
+        return isRealCommit;
+    }
+
+    public void setRealCommit(boolean realCommit) {
+        isRealCommit = realCommit;
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 91663558e..e29fdc2b0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -965,12 +965,8 @@ public class MessageStoreConfig {
         this.defaultQueryMaxNum = defaultQueryMaxNum;
     }
 
-    /**
-     * Enable transient commitLog store pool only if transientStorePoolEnable is true and broker role is not SLAVE
-     * @return <tt>true</tt> or <tt>false</tt>
-     */
     public boolean isTransientStorePoolEnable() {
-        return transientStorePoolEnable && BrokerRole.SLAVE != getBrokerRole();
+        return transientStorePoolEnable;
     }
 
     public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index c4a9aeb81..f2b421ecd 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -136,6 +136,11 @@ public class AutoSwitchHAService extends DefaultHAService {
             }
         }
 
+        if (defaultMessageStore.isTransientStorePoolEnable()) {
+            waitingForAllCommit();
+            defaultMessageStore.getTransientStorePool().setRealCommit(true);
+        }
+
         LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset is {}", truncateOffset, getConfirmOffset(), this.defaultMessageStore.getMaxPhyOffset());
 
         this.defaultMessageStore.recoverTopicQueueTable();
@@ -162,6 +167,12 @@ public class AutoSwitchHAService extends DefaultHAService {
             this.haClient.updateMasterAddress(newMasterAddr);
             this.haClient.updateHaMasterAddress(null);
             this.haClient.start();
+
+            if (defaultMessageStore.isTransientStorePoolEnable()) {
+                waitingForAllCommit();
+                defaultMessageStore.getTransientStorePool().setRealCommit(false);
+            }
+
             LOGGER.info("Change ha to slave success, newMasterAddress:{}, newMasterEpoch:{}", newMasterAddr, newMasterEpoch);
             return true;
         } catch (final Exception e) {
@@ -170,6 +181,17 @@ public class AutoSwitchHAService extends DefaultHAService {
         }
     }
 
+    public void waitingForAllCommit() {
+        while (getDefaultMessageStore().remainHowManyDataToCommit() > 0) {
+            getDefaultMessageStore().getCommitLog().getFlushManager().wakeUpCommit();
+            try {
+                Thread.sleep(100);
+            } catch (Exception e) {
+
+            }
+        }
+    }
+
     @Override
     public HAClient getHAClient() {
         return this.haClient;
diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index 76ba89eba..7b56150f6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -102,7 +102,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
     }
 
     public DefaultMappedFile(final String fileName, final int fileSize,
-                             final TransientStorePool transientStorePool) throws IOException {
+        final TransientStorePool transientStorePool) throws IOException {
         init(fileName, fileSize, transientStorePool);
     }
 
@@ -116,7 +116,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
 
     @Override
     public void init(final String fileName, final int fileSize,
-                     final TransientStorePool transientStorePool) throws IOException {
+        final TransientStorePool transientStorePool) throws IOException {
         init(fileName, fileSize);
         this.writeBuffer = transientStorePool.borrowBuffer();
         this.transientStorePool = transientStorePool;
@@ -186,11 +186,11 @@ public class DefaultMappedFile extends AbstractMappedFile {
                 }
             } else {
                 log.debug("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
-                        + this.fileFromOffset);
+                    + this.fileFromOffset);
             }
         } else {
             log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
-                    + ", fileFromOffset: " + this.fileFromOffset);
+                + ", fileFromOffset: " + this.fileFromOffset);
         }
 
         return false;
@@ -225,18 +225,18 @@ public class DefaultMappedFile extends AbstractMappedFile {
 
     @Override
     public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,
-                                             PutMessageContext putMessageContext) {
+        PutMessageContext putMessageContext) {
         return appendMessagesInner(msg, cb, putMessageContext);
     }
 
     @Override
     public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb,
-                                              PutMessageContext putMessageContext) {
+        PutMessageContext putMessageContext) {
         return appendMessagesInner(messageExtBatch, cb, putMessageContext);
     }
 
     public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
-                                                   PutMessageContext putMessageContext) {
+        PutMessageContext putMessageContext) {
         assert messageExt != null;
         assert cb != null;
 
@@ -249,11 +249,11 @@ public class DefaultMappedFile extends AbstractMappedFile {
             if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) messageExt).isInnerBatch()) {
                 // traditional batch message
                 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
-                        (MessageExtBatch) messageExt, putMessageContext);
+                    (MessageExtBatch) messageExt, putMessageContext);
             } else if (messageExt instanceof MessageExtBrokerInner) {
                 // traditional single message or newly introduced inner-batch message
                 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
-                        (MessageExtBrokerInner) messageExt, putMessageContext);
+                    (MessageExtBrokerInner) messageExt, putMessageContext);
             } else {
                 return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
             }
@@ -364,7 +364,11 @@ public class DefaultMappedFile extends AbstractMappedFile {
             //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
             return WROTE_POSITION_UPDATER.get(this);
         }
-        if (this.isAbleToCommit(commitLeastPages)) {
+
+        //no need to commit data to file channel, so just set committedPosition to wrotePosition.
+        if (transientStorePool != null && !transientStorePool.isRealCommit()) {
+            COMMITTED_POSITION_UPDATER.set(this, WROTE_POSITION_UPDATER.get(this));
+        } else if (this.isAbleToCommit(commitLeastPages)) {
             if (this.hold()) {
                 commit0();
                 this.release();
@@ -459,11 +463,11 @@ public class DefaultMappedFile extends AbstractMappedFile {
                 return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
             } else {
                 log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
-                        + this.fileFromOffset);
+                    + this.fileFromOffset);
             }
         } else {
             log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
-                    + ", fileFromOffset: " + this.fileFromOffset);
+                + ", fileFromOffset: " + this.fileFromOffset);
         }
 
         return null;
@@ -491,13 +495,13 @@ public class DefaultMappedFile extends AbstractMappedFile {
     public boolean cleanup(final long currentRef) {
         if (this.isAvailable()) {
             log.error("this file[REF:" + currentRef + "] " + this.fileName
-                    + " have not shutdown, stop unmapping.");
+                + " have not shutdown, stop unmapping.");
             return false;
         }
 
         if (this.isCleanupOver()) {
             log.error("this file[REF:" + currentRef + "] " + this.fileName
-                    + " have cleanup, do not do it again.");
+                + " have cleanup, do not do it again.");
             return true;
         }
 
@@ -523,10 +527,10 @@ public class DefaultMappedFile extends AbstractMappedFile {
                 long beginTime = System.currentTimeMillis();
                 boolean result = this.file.delete();
                 log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
-                        + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
-                        + this.getFlushedPosition() + ", "
-                        + UtilAll.computeElapsedTimeMilliseconds(beginTime)
-                        + "," + (System.currentTimeMillis() - lastModified));
+                    + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+                    + this.getFlushedPosition() + ", "
+                    + UtilAll.computeElapsedTimeMilliseconds(beginTime)
+                    + "," + (System.currentTimeMillis() - lastModified));
             } catch (Exception e) {
                 log.warn("close file channel " + this.fileName + " Failed. ", e);
             }
@@ -534,7 +538,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
             return true;
         } else {
             log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
-                    + " Failed. cleanupOver: " + this.cleanupOver);
+                + " Failed. cleanupOver: " + this.cleanupOver);
         }
 
         return false;
@@ -555,7 +559,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
      */
     @Override
     public int getReadPosition() {
-        return this.writeBuffer == null ? WROTE_POSITION_UPDATER.get(this) : COMMITTED_POSITION_UPDATER.get(this);
+        return transientStorePool == null || !transientStorePool.isRealCommit() ? WROTE_POSITION_UPDATER.get(this) : COMMITTED_POSITION_UPDATER.get(this);
     }
 
     @Override
@@ -596,11 +600,11 @@ public class DefaultMappedFile extends AbstractMappedFile {
         // force flush when prepare load finished
         if (type == FlushDiskType.SYNC_FLUSH) {
             log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
-                    this.getFileName(), System.currentTimeMillis() - beginTime);
+                this.getFileName(), System.currentTimeMillis() - beginTime);
             mappedByteBuffer.force();
         }
         log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
-                System.currentTimeMillis() - beginTime);
+            System.currentTimeMillis() - beginTime);
 
         this.mlock();
     }