You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/01/01 05:03:07 UTC
[rocketmq] branch develop updated: [ISSUE #5714] Fix the issue that broker can't work normally when transientStorePool=true in controller mode (#5722)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e1dfc0db5 [ISSUE #5714] Fix the issue that broker can't work normally when transientStorePool=true in controller mode (#5722)
e1dfc0db5 is described below
commit e1dfc0db5347059edf31237c0e62889644c0dff9
Author: rongtong <ji...@163.com>
AuthorDate: Sun Jan 1 13:02:53 2023 +0800
[ISSUE #5714] Fix the issue that broker can't work normally when transientStorePool=true in controller mode (#5722)
* Fix the issue that the slave role does not initialize the transientPool in controller mode
* Format the checkstyle
* Remove the useless import
* Fix the HA transmission disconnection issue when transientStorePoolEnable is true
* just test
* just test
* just test
* just test
* just test
* just test
* just test
* just test
* just test
* Format the check style
* Format the check style
---
.../broker/processor/AdminBrokerProcessor.java | 4 +-
.../rocketmq/store/AllocateMappedFileService.java | 4 +-
.../java/org/apache/rocketmq/store/CommitLog.java | 50 ++++++++++------------
.../apache/rocketmq/store/DefaultMessageStore.java | 16 ++++++-
.../org/apache/rocketmq/store/FlushManager.java | 36 ++++++++++++++++
.../apache/rocketmq/store/TransientStorePool.java | 22 +++++++---
.../rocketmq/store/config/MessageStoreConfig.java | 6 +--
.../store/ha/autoswitch/AutoSwitchHAService.java | 22 ++++++++++
.../rocketmq/store/logfile/DefaultMappedFile.java | 48 +++++++++++----------
9 files changed, 140 insertions(+), 68 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 12eab475b..24162022c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -174,6 +174,7 @@ import org.apache.rocketmq.remoting.rpc.RpcException;
import org.apache.rocketmq.remoting.rpc.RpcRequest;
import org.apache.rocketmq.remoting.rpc.RpcResponse;
import org.apache.rocketmq.store.ConsumeQueueExt;
+import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
@@ -2276,7 +2277,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
MessageStore messageStore = this.brokerController.getMessageStore();
runtimeInfo.put("remainTransientStoreBufferNumbs", String.valueOf(messageStore.remainTransientStoreBufferNumbs()));
- if (this.brokerController.getMessageStoreConfig().isTransientStorePoolEnable()) {
+ if (this.brokerController.getMessageStore() instanceof DefaultMessageStore && ((DefaultMessageStore) this.brokerController.getMessageStore()).isTransientStorePoolEnable()) {
runtimeInfo.put("remainHowManyDataToCommit", MixAll.humanReadableByteCount(messageStore.remainHowManyDataToCommit(), false));
}
runtimeInfo.put("remainHowManyDataToFlush", MixAll.humanReadableByteCount(messageStore.remainHowManyDataToFlush(), false));
@@ -2606,7 +2607,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
-
private RemotingCommand resetMasterFlushOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
index 4d2fc5168..dca7d5325 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
@@ -52,7 +52,7 @@ public class AllocateMappedFileService extends ServiceThread {
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2;
- if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+ if (this.messageStore.isTransientStorePoolEnable()) {
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
@@ -171,7 +171,7 @@ public class AllocateMappedFileService extends ServiceThread {
long beginTime = System.currentTimeMillis();
MappedFile mappedFile;
- if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+ if (messageStore.isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index c38c2168e..d7e141d31 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -531,7 +531,6 @@ public class CommitLog implements Swappable {
return dispatchRequest;
} catch (Exception e) {
- log.error("Check message and return size error", e);
}
return new DispatchRequest(-1, false /* success */);
@@ -1821,24 +1820,13 @@ public class CommitLog implements Swappable {
}
- interface FlushManager {
- void start();
-
- void shutdown();
-
- void wakeUpFlush();
-
- void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt);
-
- CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt);
- }
class DefaultFlushManager implements FlushManager {
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
- private final FlushCommitLogService commitLogService;
+ private final FlushCommitLogService commitRealTimeService;
public DefaultFlushManager() {
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
@@ -1847,15 +1835,14 @@ public class CommitLog implements Swappable {
this.flushCommitLogService = new CommitLog.FlushRealTimeService();
}
- this.commitLogService = new CommitLog.CommitRealTimeService();
+ this.commitRealTimeService = new CommitLog.CommitRealTimeService();
}
- @Override
- public void start() {
+ @Override public void start() {
this.flushCommitLogService.start();
- if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
- this.commitLogService.start();
+ if (defaultMessageStore.isTransientStorePoolEnable()) {
+ this.commitRealTimeService.start();
}
}
@@ -1870,14 +1857,12 @@ public class CommitLog implements Swappable {
CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
PutMessageStatus flushStatus = null;
try {
- flushStatus = flushOkFuture.get(CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
- TimeUnit.MILLISECONDS);
+ flushStatus = flushOkFuture.get(CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
if (flushStatus != PutMessageStatus.PUT_OK) {
- log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
- + " client address: " + messageExt.getBornHostString());
+ log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
@@ -1886,10 +1871,10 @@ public class CommitLog implements Swappable {
}
// Asynchronous flush
else {
- if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+ if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
- commitLogService.wakeup();
+ commitRealTimeService.wakeup();
}
}
}
@@ -1911,10 +1896,10 @@ public class CommitLog implements Swappable {
}
// Asynchronous flush
else {
- if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+ if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
- commitLogService.wakeup();
+ commitRealTimeService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
@@ -1926,10 +1911,16 @@ public class CommitLog implements Swappable {
flushCommitLogService.wakeup();
}
+ @Override
+ public void wakeUpCommit() {
+ // now wake up commit log thread.
+ commitRealTimeService.wakeup();
+ }
+
@Override
public void shutdown() {
- if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
- this.commitLogService.shutdown();
+ if (defaultMessageStore.isTransientStorePoolEnable()) {
+ this.commitRealTimeService.shutdown();
}
this.flushCommitLogService.shutdown();
@@ -1963,4 +1954,7 @@ public class CommitLog implements Swappable {
this.getMappedFileQueue().cleanSwappedMap(forceCleanSwapIntervalMs);
}
+ public FlushManager getFlushManager() {
+ return flushManager;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index f42960c42..3cf8efdfa 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -223,7 +223,7 @@ public class DefaultMessageStore implements MessageStore {
this.reputMessageService = new ReputMessageService();
- this.transientStorePool = new TransientStorePool(messageStoreConfig);
+ this.transientStorePool = new TransientStorePool(this);
this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
@@ -338,7 +338,7 @@ public class DefaultMessageStore implements MessageStore {
this.haService.init(this);
}
- if (messageStoreConfig.isTransientStorePoolEnable()) {
+ if (this.isTransientStorePoolEnable()) {
this.transientStorePool.init();
}
@@ -1067,6 +1067,7 @@ public class DefaultMessageStore implements MessageStore {
return this.commitLog.getMaxOffset();
}
+
@Override
public long getMinPhyOffset() {
return this.commitLog.getMinOffset();
@@ -2745,4 +2746,15 @@ public class DefaultMessageStore implements MessageStore {
public void initMetrics(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) {
DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier, this);
}
+
+ /**
+ * Enable transient commitLog store pool only if transientStorePoolEnable is true and broker role is not SLAVE or
+ * enableControllerMode is true
+ *
+ * @return <tt>true</tt> or <tt>false</tt>
+ */
+ public boolean isTransientStorePoolEnable() {
+ return this.messageStoreConfig.isTransientStorePoolEnable() &&
+ (this.brokerConfig.isEnableControllerMode() || this.messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE);
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/FlushManager.java b/store/src/main/java/org/apache/rocketmq/store/FlushManager.java
new file mode 100644
index 000000000..fe3951ae7
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/FlushManager.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public interface FlushManager {
+
+ void start();
+
+ void shutdown();
+
+ void wakeUpFlush();
+
+ void wakeUpCommit();
+
+ void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt);
+
+ CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt);
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
index a873fe05b..cab7f931f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
+++ b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.util.LibC;
import sun.nio.ch.DirectBuffer;
@@ -34,12 +33,13 @@ public class TransientStorePool {
private final int poolSize;
private final int fileSize;
private final Deque<ByteBuffer> availableBuffers;
- private final MessageStoreConfig storeConfig;
+ private final DefaultMessageStore messageStore;
+ private volatile boolean isRealCommit;
- public TransientStorePool(final MessageStoreConfig storeConfig) {
- this.storeConfig = storeConfig;
- this.poolSize = storeConfig.getTransientStorePoolSize();
- this.fileSize = storeConfig.getMappedFileSizeCommitLog();
+ public TransientStorePool(final DefaultMessageStore messageStore) {
+ this.messageStore = messageStore;
+ this.poolSize = messageStore.getMessageStoreConfig().getTransientStorePoolSize();
+ this.fileSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
this.availableBuffers = new ConcurrentLinkedDeque<>();
}
@@ -81,9 +81,17 @@ public class TransientStorePool {
}
public int availableBufferNums() {
- if (storeConfig.isTransientStorePoolEnable()) {
+ if (messageStore.isTransientStorePoolEnable()) {
return availableBuffers.size();
}
return Integer.MAX_VALUE;
}
+
+ public boolean isRealCommit() {
+ return isRealCommit;
+ }
+
+ public void setRealCommit(boolean realCommit) {
+ isRealCommit = realCommit;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 91663558e..e29fdc2b0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -965,12 +965,8 @@ public class MessageStoreConfig {
this.defaultQueryMaxNum = defaultQueryMaxNum;
}
- /**
- * Enable transient commitLog store pool only if transientStorePoolEnable is true and broker role is not SLAVE
- * @return <tt>true</tt> or <tt>false</tt>
- */
public boolean isTransientStorePoolEnable() {
- return transientStorePoolEnable && BrokerRole.SLAVE != getBrokerRole();
+ return transientStorePoolEnable;
}
public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index c4a9aeb81..f2b421ecd 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -136,6 +136,11 @@ public class AutoSwitchHAService extends DefaultHAService {
}
}
+ if (defaultMessageStore.isTransientStorePoolEnable()) {
+ waitingForAllCommit();
+ defaultMessageStore.getTransientStorePool().setRealCommit(true);
+ }
+
LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset is {}", truncateOffset, getConfirmOffset(), this.defaultMessageStore.getMaxPhyOffset());
this.defaultMessageStore.recoverTopicQueueTable();
@@ -162,6 +167,12 @@ public class AutoSwitchHAService extends DefaultHAService {
this.haClient.updateMasterAddress(newMasterAddr);
this.haClient.updateHaMasterAddress(null);
this.haClient.start();
+
+ if (defaultMessageStore.isTransientStorePoolEnable()) {
+ waitingForAllCommit();
+ defaultMessageStore.getTransientStorePool().setRealCommit(false);
+ }
+
LOGGER.info("Change ha to slave success, newMasterAddress:{}, newMasterEpoch:{}", newMasterAddr, newMasterEpoch);
return true;
} catch (final Exception e) {
@@ -170,6 +181,17 @@ public class AutoSwitchHAService extends DefaultHAService {
}
}
+ public void waitingForAllCommit() {
+ while (getDefaultMessageStore().remainHowManyDataToCommit() > 0) {
+ getDefaultMessageStore().getCommitLog().getFlushManager().wakeUpCommit();
+ try {
+ Thread.sleep(100);
+ } catch (Exception e) {
+
+ }
+ }
+ }
+
@Override
public HAClient getHAClient() {
return this.haClient;
diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index 76ba89eba..7b56150f6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -102,7 +102,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
}
public DefaultMappedFile(final String fileName, final int fileSize,
- final TransientStorePool transientStorePool) throws IOException {
+ final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize, transientStorePool);
}
@@ -116,7 +116,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
@Override
public void init(final String fileName, final int fileSize,
- final TransientStorePool transientStorePool) throws IOException {
+ final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
@@ -186,11 +186,11 @@ public class DefaultMappedFile extends AbstractMappedFile {
}
} else {
log.debug("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
- + this.fileFromOffset);
+ + this.fileFromOffset);
}
} else {
log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
- + ", fileFromOffset: " + this.fileFromOffset);
+ + ", fileFromOffset: " + this.fileFromOffset);
}
return false;
@@ -225,18 +225,18 @@ public class DefaultMappedFile extends AbstractMappedFile {
@Override
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,
- PutMessageContext putMessageContext) {
+ PutMessageContext putMessageContext) {
return appendMessagesInner(msg, cb, putMessageContext);
}
@Override
public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb,
- PutMessageContext putMessageContext) {
+ PutMessageContext putMessageContext) {
return appendMessagesInner(messageExtBatch, cb, putMessageContext);
}
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
- PutMessageContext putMessageContext) {
+ PutMessageContext putMessageContext) {
assert messageExt != null;
assert cb != null;
@@ -249,11 +249,11 @@ public class DefaultMappedFile extends AbstractMappedFile {
if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) messageExt).isInnerBatch()) {
// traditional batch message
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
- (MessageExtBatch) messageExt, putMessageContext);
+ (MessageExtBatch) messageExt, putMessageContext);
} else if (messageExt instanceof MessageExtBrokerInner) {
// traditional single message or newly introduced inner-batch message
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
- (MessageExtBrokerInner) messageExt, putMessageContext);
+ (MessageExtBrokerInner) messageExt, putMessageContext);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
@@ -364,7 +364,11 @@ public class DefaultMappedFile extends AbstractMappedFile {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return WROTE_POSITION_UPDATER.get(this);
}
- if (this.isAbleToCommit(commitLeastPages)) {
+
+ //no need to commit data to file channel, so just set committedPosition to wrotePosition.
+ if (transientStorePool != null && !transientStorePool.isRealCommit()) {
+ COMMITTED_POSITION_UPDATER.set(this, WROTE_POSITION_UPDATER.get(this));
+ } else if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
commit0();
this.release();
@@ -459,11 +463,11 @@ public class DefaultMappedFile extends AbstractMappedFile {
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
} else {
log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
- + this.fileFromOffset);
+ + this.fileFromOffset);
}
} else {
log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
- + ", fileFromOffset: " + this.fileFromOffset);
+ + ", fileFromOffset: " + this.fileFromOffset);
}
return null;
@@ -491,13 +495,13 @@ public class DefaultMappedFile extends AbstractMappedFile {
public boolean cleanup(final long currentRef) {
if (this.isAvailable()) {
log.error("this file[REF:" + currentRef + "] " + this.fileName
- + " have not shutdown, stop unmapping.");
+ + " have not shutdown, stop unmapping.");
return false;
}
if (this.isCleanupOver()) {
log.error("this file[REF:" + currentRef + "] " + this.fileName
- + " have cleanup, do not do it again.");
+ + " have cleanup, do not do it again.");
return true;
}
@@ -523,10 +527,10 @@ public class DefaultMappedFile extends AbstractMappedFile {
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
- + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
- + this.getFlushedPosition() + ", "
- + UtilAll.computeElapsedTimeMilliseconds(beginTime)
- + "," + (System.currentTimeMillis() - lastModified));
+ + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ + this.getFlushedPosition() + ", "
+ + UtilAll.computeElapsedTimeMilliseconds(beginTime)
+ + "," + (System.currentTimeMillis() - lastModified));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
@@ -534,7 +538,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
- + " Failed. cleanupOver: " + this.cleanupOver);
+ + " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
@@ -555,7 +559,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
*/
@Override
public int getReadPosition() {
- return this.writeBuffer == null ? WROTE_POSITION_UPDATER.get(this) : COMMITTED_POSITION_UPDATER.get(this);
+ return transientStorePool == null || !transientStorePool.isRealCommit() ? WROTE_POSITION_UPDATER.get(this) : COMMITTED_POSITION_UPDATER.get(this);
}
@Override
@@ -596,11 +600,11 @@ public class DefaultMappedFile extends AbstractMappedFile {
// force flush when prepare load finished
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
- this.getFileName(), System.currentTimeMillis() - beginTime);
+ this.getFileName(), System.currentTimeMillis() - beginTime);
mappedByteBuffer.force();
}
log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
- System.currentTimeMillis() - beginTime);
+ System.currentTimeMillis() - beginTime);
this.mlock();
}