You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 08:13:33 UTC
[08/34] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 6326d4b..d9e2f03 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -16,14 +16,12 @@
*/
package org.apache.rocketmq.store;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
-
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ConsumeQueue {
@@ -43,13 +41,12 @@ public class ConsumeQueue {
private long maxPhysicOffset = -1;
private volatile long minLogicOffset = 0;
-
public ConsumeQueue(
- final String topic,
- final int queueId,
- final String storePath,
- final int mappedFileSize,
- final DefaultMessageStore defaultMessageStore) {
+ final String topic,
+ final int queueId,
+ final String storePath,
+ final int mappedFileSize,
+ final DefaultMessageStore defaultMessageStore) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.defaultMessageStore = defaultMessageStore;
@@ -58,22 +55,20 @@ public class ConsumeQueue {
this.queueId = queueId;
String queueDir = this.storePath
- + File.separator + topic
- + File.separator + queueId;
+ + File.separator + topic
+ + File.separator + queueId;
this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
}
-
public boolean load() {
boolean result = this.mappedFileQueue.load();
log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
return result;
}
-
public void recover() {
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
@@ -98,18 +93,17 @@ public class ConsumeQueue {
this.maxPhysicOffset = offset;
} else {
log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
- + offset + " " + size + " " + tagsCode);
+ + offset + " " + size + " " + tagsCode);
break;
}
}
-
if (mapedFileOffset == mapedFileSizeLogics) {
index++;
if (index >= mappedFiles.size()) {
log.info("recover last consume queue file over, last maped file "
- + mappedFile.getFileName());
+ + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
@@ -120,7 +114,7 @@ public class ConsumeQueue {
}
} else {
log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
- + (processOffset + mapedFileOffset));
+ + (processOffset + mapedFileOffset));
break;
}
}
@@ -137,8 +131,8 @@ public class ConsumeQueue {
if (mappedFile != null) {
long offset = 0;
int low =
- minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile
- .getFileFromOffset()) : 0;
+ minLogicOffset > mappedFile.getFileFromOffset() ? (int)(minLogicOffset - mappedFile
+ .getFileFromOffset()) : 0;
int high = 0;
int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
long leftIndexValue = -1L, rightIndexValue = -1L;
@@ -160,7 +154,7 @@ public class ConsumeQueue {
}
long storeTime =
- this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
+ this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
if (storeTime < 0) {
return 0;
} else if (storeTime == timestamp) {
@@ -189,8 +183,8 @@ public class ConsumeQueue {
offset = leftOffset;
} else {
offset =
- Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
- - rightIndexValue) ? rightOffset : leftOffset;
+ Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
+ - rightIndexValue) ? rightOffset : leftOffset;
}
}
@@ -223,7 +217,6 @@ public class ConsumeQueue {
int size = byteBuffer.getInt();
byteBuffer.getLong();
-
if (0 == i) {
if (offset >= phyOffet) {
this.mappedFileQueue.deleteLastMappedFile();
@@ -249,7 +242,6 @@ public class ConsumeQueue {
mappedFile.setFlushedPosition(pos);
this.maxPhysicOffset = offset;
-
if (pos == logicFileSize) {
return;
}
@@ -283,7 +275,6 @@ public class ConsumeQueue {
int size = byteBuffer.getInt();
byteBuffer.getLong();
-
if (offset >= 0 && size > 0) {
lastOffset = offset + size;
} else {
@@ -295,12 +286,10 @@ public class ConsumeQueue {
return lastOffset;
}
-
public boolean flush(final int flushLeastPages) {
return this.mappedFileQueue.flush(flushLeastPages);
}
-
public int deleteExpiredFile(long offset) {
int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
this.correctMinOffset(offset);
@@ -322,7 +311,7 @@ public class ConsumeQueue {
if (offsetPy >= phyMinOffset) {
this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;
log.info("compute logics min offset: " + this.getMinOffsetInQuque() + ", topic: "
- + this.topic + ", queueId: " + this.queueId);
+ + this.topic + ", queueId: " + this.queueId);
break;
}
}
@@ -335,14 +324,12 @@ public class ConsumeQueue {
}
}
-
public long getMinOffsetInQuque() {
return this.minLogicOffset / CQ_STORE_UNIT_SIZE;
}
-
public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
- long logicOffset) {
+ long logicOffset) {
final int maxRetries = 30;
boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
@@ -353,7 +340,7 @@ public class ConsumeQueue {
} else {
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset
- + " failed, retry " + i + " times");
+ + " failed, retry " + i + " times");
try {
Thread.sleep(1000);
@@ -369,7 +356,7 @@ public class ConsumeQueue {
}
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
- final long cqOffset) {
+ final long cqOffset) {
if (offset <= this.maxPhysicOffset) {
return true;
@@ -392,19 +379,19 @@ public class ConsumeQueue {
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
- + mappedFile.getWrotePosition());
+ + mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
- "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
- expectLogicOffset,
- currentLogicOffset,
- this.topic,
- this.queueId,
- expectLogicOffset - currentLogicOffset
+ "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
+ expectLogicOffset,
+ currentLogicOffset,
+ this.topic,
+ this.queueId,
+ expectLogicOffset - currentLogicOffset
);
}
}
@@ -414,14 +401,13 @@ public class ConsumeQueue {
return false;
}
-
private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
byteBuffer.putLong(0L);
byteBuffer.putInt(Integer.MAX_VALUE);
byteBuffer.putLong(0L);
- int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
+ int until = (int)(untilWhere % this.mappedFileQueue.getMappedFileSize());
for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
mappedFile.appendMessage(byteBuffer.array());
}
@@ -433,7 +419,7 @@ public class ConsumeQueue {
if (offset >= this.getMinLogicOffset()) {
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
if (mappedFile != null) {
- SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
+ SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int)(offset % mappedFileSize));
return result;
}
}
@@ -480,12 +466,10 @@ public class ConsumeQueue {
return this.getMaxOffsetInQuque() - this.getMinOffsetInQuque();
}
-
public long getMaxOffsetInQuque() {
return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
}
-
public void checkSelf() {
mappedFileQueue.checkSelf();
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java
index ac149f4..4ebcb3e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java
@@ -6,19 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-
public class DefaultMessageFilter implements MessageFilter {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 8714055..7e3af19 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -16,7 +16,27 @@
*/
package org.apache.rocketmq.store;
-import org.apache.rocketmq.common.*;
+import java.io.File;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.SystemClock;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
@@ -34,21 +54,8 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
import static org.apache.rocketmq.store.config.BrokerRole.SLAVE;
-
public class DefaultMessageStore implements MessageStore {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -84,7 +91,7 @@ public class DefaultMessageStore implements MessageStore {
private final SystemClock systemClock = new SystemClock();
private final ScheduledExecutorService scheduledExecutorService =
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
+ Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
private final BrokerStatsManager brokerStatsManager;
private final MessageArrivingListener messageArrivingListener;
private final BrokerConfig brokerConfig;
@@ -96,7 +103,7 @@ public class DefaultMessageStore implements MessageStore {
private AtomicLong printTimes = new AtomicLong(0);
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
- final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
+ final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
this.brokerConfig = brokerConfig;
this.messageStoreConfig = messageStoreConfig;
@@ -122,13 +129,11 @@ public class DefaultMessageStore implements MessageStore {
this.transientStorePool.init();
}
-
this.allocateMappedFileService.start();
this.indexService.start();
}
-
public void truncateDirtyLogicFiles(long phyOffset) {
ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
@@ -139,7 +144,6 @@ public class DefaultMessageStore implements MessageStore {
}
}
-
/**
* @throws IOException
*/
@@ -162,11 +166,10 @@ public class DefaultMessageStore implements MessageStore {
if (result) {
this.storeCheckpoint =
- new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
+ new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
this.indexService.load(lastExitOK);
-
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
@@ -191,7 +194,6 @@ public class DefaultMessageStore implements MessageStore {
this.commitLog.start();
this.storeStatsService.start();
-
if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
this.scheduleMessageService.start();
}
@@ -293,19 +295,16 @@ public class DefaultMessageStore implements MessageStore {
this.printTimes.set(0);
}
-
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
-
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
-
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
@@ -332,7 +331,7 @@ public class DefaultMessageStore implements MessageStore {
long diff = this.systemClock.now() - begin;
if (diff < 10000000 //
- && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) {
+ && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) {
return true;
}
@@ -353,7 +352,7 @@ public class DefaultMessageStore implements MessageStore {
}
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
- final SubscriptionData subscriptionData) {
+ final SubscriptionData subscriptionData) {
if (this.shutdown) {
log.warn("message store has shutdown, so getMessage is forbidden");
return null;
@@ -366,7 +365,6 @@ public class DefaultMessageStore implements MessageStore {
long beginTime = this.getSystemClock().now();
-
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset;
long minOffset = 0;
@@ -374,7 +372,6 @@ public class DefaultMessageStore implements MessageStore {
GetMessageResult getResult = new GetMessageResult();
-
final long maxOffsetPy = this.commitLog.getMaxOffset();
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
@@ -417,21 +414,18 @@ public class DefaultMessageStore implements MessageStore {
maxPhyOffsetPulling = offsetPy;
-
if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset)
continue;
}
-
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
- isInDisk)) {
+ isInDisk)) {
break;
}
-
if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (selectResult != null) {
@@ -444,7 +438,6 @@ public class DefaultMessageStore implements MessageStore {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
-
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
}
} else {
@@ -458,7 +451,6 @@ public class DefaultMessageStore implements MessageStore {
}
}
-
if (diskFallRecorded) {
long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
@@ -466,10 +458,9 @@ public class DefaultMessageStore implements MessageStore {
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
-
long diff = maxOffsetPy - maxPhyOffsetPulling;
- long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
- * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
+ long memory = (long)(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
+ * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {
@@ -479,7 +470,7 @@ public class DefaultMessageStore implements MessageStore {
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
- + maxOffset + ", but access logic queue failed.");
+ + maxOffset + ", but access logic queue failed.");
}
}
} else {
@@ -605,7 +596,6 @@ public class DefaultMessageStore implements MessageStore {
}
-
{
String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
@@ -613,7 +603,6 @@ public class DefaultMessageStore implements MessageStore {
result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
}
-
{
if (this.scheduleMessageService != null) {
this.scheduleMessageService.buildRunningStats(result);
@@ -741,7 +730,6 @@ public class DefaultMessageStore implements MessageStore {
break;
}
-
Collections.sort(queryOffsetResult.getPhyOffsets());
queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
@@ -784,12 +772,10 @@ public class DefaultMessageStore implements MessageStore {
}
}
-
if (queryMessageResult.getBufferTotalSize() > 0) {
break;
}
-
if (lastQueryMsgTime < begin) {
break;
}
@@ -825,8 +811,8 @@ public class DefaultMessageStore implements MessageStore {
for (ConsumeQueue cq : queueTable.values()) {
cq.destroy();
log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", //
- cq.getTopic(), //
- cq.getQueueId() //
+ cq.getTopic(), //
+ cq.getQueueId() //
);
this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId());
@@ -854,23 +840,22 @@ public class DefaultMessageStore implements MessageStore {
Entry<Integer, ConsumeQueue> nextQT = itQT.next();
long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset();
-
if (maxCLOffsetInConsumeQueue == -1) {
log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", //
- nextQT.getValue().getTopic(), //
- nextQT.getValue().getQueueId(), //
- nextQT.getValue().getMaxPhysicOffset(), //
- nextQT.getValue().getMinLogicOffset());
+ nextQT.getValue().getTopic(), //
+ nextQT.getValue().getQueueId(), //
+ nextQT.getValue().getMaxPhysicOffset(), //
+ nextQT.getValue().getMinLogicOffset());
} else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) {
log.info(
- "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", //
- topic, //
- nextQT.getKey(), //
- minCommitLogOffset, //
- maxCLOffsetInConsumeQueue);
+ "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", //
+ topic, //
+ nextQT.getKey(), //
+ minCommitLogOffset, //
+ maxCLOffsetInConsumeQueue);
DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(),
- nextQT.getValue().getQueueId());
+ nextQT.getValue().getQueueId());
nextQT.getValue().destroy();
itQT.remove();
@@ -910,7 +895,7 @@ public class DefaultMessageStore implements MessageStore {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
final ByteBuffer msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
String msgId =
- MessageDecoder.createMessageId(msgIdMemory, MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy);
+ MessageDecoder.createMessageId(msgIdMemory, MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy);
messageIds.put(msgId, nextOffset++);
if (nextOffset > maxOffset) {
return messageIds;
@@ -1006,11 +991,11 @@ public class DefaultMessageStore implements MessageStore {
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(//
- topic, //
- queueId, //
- StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
- this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
- this);
+ topic, //
+ queueId, //
+ StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
+ this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
+ this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
@@ -1031,7 +1016,7 @@ public class DefaultMessageStore implements MessageStore {
}
private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) {
- long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
+ long memory = (long)(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
return (maxOffsetPy - offsetPy) > memory;
}
@@ -1045,7 +1030,6 @@ public class DefaultMessageStore implements MessageStore {
return true;
}
-
if (isInDisk) {
if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
return true;
@@ -1093,7 +1077,6 @@ public class DefaultMessageStore implements MessageStore {
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
-
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
@@ -1101,7 +1084,6 @@ public class DefaultMessageStore implements MessageStore {
}
}, 1, 10, TimeUnit.MINUTES);
-
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
@@ -1113,7 +1095,7 @@ public class DefaultMessageStore implements MessageStore {
String stack = UtilAll.jstack();
final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-"
- + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
+ + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
MixAll.string2FileNotSafe(stack, fileName);
}
}
@@ -1174,11 +1156,11 @@ public class DefaultMessageStore implements MessageStore {
continue;
}
ConsumeQueue logic = new ConsumeQueue(
- topic,
- queueId,
- StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
- this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
- this);
+ topic,
+ queueId,
+ StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
+ this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
+ this);
this.putConsumeQueue(topic, queueId, logic);
if (!logic.load()) {
return false;
@@ -1196,7 +1178,6 @@ public class DefaultMessageStore implements MessageStore {
private void recover(final boolean lastExitOK) {
this.recoverConsumeQueue();
-
if (lastExitOK) {
this.commitLog.recoverNormally();
} else {
@@ -1285,7 +1266,7 @@ public class DefaultMessageStore implements MessageStore {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
- req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
+ req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
@@ -1298,7 +1279,7 @@ public class DefaultMessageStore implements MessageStore {
}
public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,
- long logicOffset) {
+ long logicOffset) {
ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
}
@@ -1311,23 +1292,21 @@ public class DefaultMessageStore implements MessageStore {
private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;
private final double diskSpaceWarningLevelRatio =
- Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
+ Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
private final double diskSpaceCleanForciblyRatio =
- Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
+ Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
private long lastRedeleteTimestamp = 0;
private volatile int manualDeleteFileSeveralTimes = 0;
private volatile boolean cleanImmediately = false;
-
public void excuteDeleteFilesManualy() {
this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
DefaultMessageStore.log.info("excuteDeleteFilesManualy was invoked");
}
-
public void run() {
try {
this.deleteExpiredFiles();
@@ -1348,27 +1327,24 @@ public class DefaultMessageStore implements MessageStore {
boolean spacefull = this.isSpaceToDelete();
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
-
if (timeup || spacefull || manualDelete) {
if (manualDelete)
this.manualDeleteFileSeveralTimes--;
-
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", //
- fileReservedTime, //
- timeup, //
- spacefull, //
- manualDeleteFileSeveralTimes, //
- cleanAtOnce);
-
+ fileReservedTime, //
+ timeup, //
+ spacefull, //
+ manualDeleteFileSeveralTimes, //
+ cleanAtOnce);
fileReservedTime *= 60 * 60 * 1000;
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
- destroyMapedFileIntervalForcibly, cleanAtOnce);
+ destroyMapedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {
} else if (spacefull) {
log.warn("disk space will be full soon, but delete file failed.");
@@ -1382,7 +1358,7 @@ public class DefaultMessageStore implements MessageStore {
if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {
this.lastRedeleteTimestamp = currentTimestamp;
int destroyMapedFileIntervalForcibly =
- DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
+ DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {
}
}
@@ -1407,7 +1383,6 @@ public class DefaultMessageStore implements MessageStore {
cleanImmediately = false;
-
{
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
@@ -1433,10 +1408,9 @@ public class DefaultMessageStore implements MessageStore {
}
}
-
{
String storePathLogics = StorePathConfigHelper
- .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
+ .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
if (logicsRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
@@ -1490,7 +1464,6 @@ public class DefaultMessageStore implements MessageStore {
if (minOffset > this.lastPhysicalMinOffset) {
this.lastPhysicalMinOffset = minOffset;
-
ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
@@ -1506,7 +1479,6 @@ public class DefaultMessageStore implements MessageStore {
}
}
-
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
}
}
@@ -1520,7 +1492,6 @@ public class DefaultMessageStore implements MessageStore {
private static final int RETRY_TIMES_OVER = 3;
private long lastFlushTimestamp = 0;
-
private void doFlush(int retryTimes) {
int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
@@ -1530,7 +1501,6 @@ public class DefaultMessageStore implements MessageStore {
long logicsMsgTimestamp = 0;
-
int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
@@ -1558,7 +1528,6 @@ public class DefaultMessageStore implements MessageStore {
}
}
-
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
@@ -1572,19 +1541,16 @@ public class DefaultMessageStore implements MessageStore {
}
}
-
this.doFlush(RETRY_TIMES_OVER);
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
-
@Override
public String getServiceName() {
return FlushConsumeQueueService.class.getSimpleName();
}
-
@Override
public long getJointime() {
return 1000 * 60;
@@ -1610,7 +1576,7 @@ public class DefaultMessageStore implements MessageStore {
if (this.isCommitLogAvailable()) {
log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
- DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
+ DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
}
super.shutdown();
@@ -1624,17 +1590,15 @@ public class DefaultMessageStore implements MessageStore {
return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
}
-
private boolean isCommitLogAvailable() {
return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}
-
private void doReput() {
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
- && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
+ && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
@@ -1645,7 +1609,7 @@ public class DefaultMessageStore implements MessageStore {
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
DispatchRequest dispatchRequest =
- DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
+ DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
@@ -1653,20 +1617,20 @@ public class DefaultMessageStore implements MessageStore {
DefaultMessageStore.this.doDispatch(dispatchRequest);
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
- && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
+ && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
- dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
- dispatchRequest.getTagsCode());
+ dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+ dispatchRequest.getTagsCode());
}
// FIXED BUG By shijia
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
- .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
+ .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
- .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
- .addAndGet(dispatchRequest.getMsgSize());
+ .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+ .addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
@@ -1674,7 +1638,6 @@ public class DefaultMessageStore implements MessageStore {
}
} else if (!dispatchRequest.isSuccess()) {
-
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
@@ -1682,7 +1645,7 @@ public class DefaultMessageStore implements MessageStore {
doNext = false;
if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
- this.reputFromOffset);
+ this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
@@ -1698,7 +1661,6 @@ public class DefaultMessageStore implements MessageStore {
}
}
-
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
@@ -1715,13 +1677,11 @@ public class DefaultMessageStore implements MessageStore {
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
-
@Override
public String getServiceName() {
return ReputMessageService.class.getSimpleName();
}
-
}
public int remainTransientStoreBufferNumbs() {
@@ -1733,7 +1693,6 @@ public class DefaultMessageStore implements MessageStore {
return remainTransientStoreBufferNumbs() == 0;
}
-
public void unlockMappedFile(final MappedFile mappedFile) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
index d0855ab..b086aee 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
@@ -31,19 +31,18 @@ public class DispatchRequest {
private final int sysFlag;
private final long preparedTransactionOffset;
-
public DispatchRequest(
- final String topic,
- final int queueId,
- final long commitLogOffset,
- final int msgSize,
- final long tagsCode,
- final long storeTimestamp,
- final long consumeQueueOffset,
- final String keys,
- final String uniqKey,
- final int sysFlag,
- final long preparedTransactionOffset
+ final String topic,
+ final int queueId,
+ final long commitLogOffset,
+ final int msgSize,
+ final long tagsCode,
+ final long storeTimestamp,
+ final long consumeQueueOffset,
+ final String keys,
+ final String uniqKey,
+ final int sysFlag,
+ final long preparedTransactionOffset
) {
this.topic = topic;
this.queueId = queueId;
@@ -108,57 +107,46 @@ public class DispatchRequest {
this.success = success;
}
-
public String getTopic() {
return topic;
}
-
public int getQueueId() {
return queueId;
}
-
public long getCommitLogOffset() {
return commitLogOffset;
}
-
public int getMsgSize() {
return msgSize;
}
-
public long getStoreTimestamp() {
return storeTimestamp;
}
-
public long getConsumeQueueOffset() {
return consumeQueueOffset;
}
-
public String getKeys() {
return keys;
}
-
public long getTagsCode() {
return tagsCode;
}
-
public int getSysFlag() {
return sysFlag;
}
-
public long getPreparedTransactionOffset() {
return preparedTransactionOffset;
}
-
public boolean isSuccess() {
return success;
}
@@ -167,5 +155,4 @@ public class DispatchRequest {
return uniqKey;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
index 0f1ba8c..b7d33f3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
@@ -6,27 +6,25 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
public class GetMessageResult {
private final List<SelectMappedBufferResult> messageMapedList =
- new ArrayList<SelectMappedBufferResult>(100);
+ new ArrayList<SelectMappedBufferResult>(100);
private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);
@@ -41,97 +39,79 @@ public class GetMessageResult {
private int msgCount4Commercial = 0;
-
public GetMessageResult() {
}
-
public GetMessageStatus getStatus() {
return status;
}
-
public void setStatus(GetMessageStatus status) {
this.status = status;
}
-
public long getNextBeginOffset() {
return nextBeginOffset;
}
-
public void setNextBeginOffset(long nextBeginOffset) {
this.nextBeginOffset = nextBeginOffset;
}
-
public long getMinOffset() {
return minOffset;
}
-
public void setMinOffset(long minOffset) {
this.minOffset = minOffset;
}
-
public long getMaxOffset() {
return maxOffset;
}
-
public void setMaxOffset(long maxOffset) {
this.maxOffset = maxOffset;
}
-
public List<SelectMappedBufferResult> getMessageMapedList() {
return messageMapedList;
}
-
public List<ByteBuffer> getMessageBufferList() {
return messageBufferList;
}
-
public void addMessage(final SelectMappedBufferResult mapedBuffer) {
this.messageMapedList.add(mapedBuffer);
this.messageBufferList.add(mapedBuffer.getByteBuffer());
this.bufferTotalSize += mapedBuffer.getSize();
- this.msgCount4Commercial += (int) Math.ceil(
- mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT);
+ this.msgCount4Commercial += (int)Math.ceil(
+ mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT);
}
-
public void release() {
for (SelectMappedBufferResult select : this.messageMapedList) {
select.release();
}
}
-
public int getBufferTotalSize() {
return bufferTotalSize;
}
-
public void setBufferTotalSize(int bufferTotalSize) {
this.bufferTotalSize = bufferTotalSize;
}
-
public int getMessageCount() {
return this.messageMapedList.size();
}
-
public boolean isSuggestPullingFromSlave() {
return suggestPullingFromSlave;
}
-
public void setSuggestPullingFromSlave(boolean suggestPullingFromSlave) {
this.suggestPullingFromSlave = suggestPullingFromSlave;
}
@@ -144,12 +124,11 @@ public class GetMessageResult {
this.msgCount4Commercial = msgCount4Commercial;
}
-
@Override
public String toString() {
return "GetMessageResult [status=" + status + ", nextBeginOffset=" + nextBeginOffset + ", minOffset="
- + minOffset + ", maxOffset=" + maxOffset + ", bufferTotalSize=" + bufferTotalSize
- + ", suggestPullingFromSlave=" + suggestPullingFromSlave + "]";
+ + minOffset + ", maxOffset=" + maxOffset + ", bufferTotalSize=" + bufferTotalSize
+ + ", suggestPullingFromSlave=" + suggestPullingFromSlave + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java b/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java
index 003d1d4..f512e12 100644
--- a/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java
+++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index ce5f570..6803ec9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -16,16 +16,8 @@
*/
package org.apache.rocketmq.store;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.store.config.FlushDiskType;
-import org.apache.rocketmq.store.util.LibC;
import com.sun.jna.NativeLong;
import com.sun.jna.Pointer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import sun.nio.ch.DirectBuffer;
-
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -39,7 +31,13 @@ import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.util.LibC;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.nio.ch.DirectBuffer;
public class MappedFile extends ReferenceResource {
public static final int OS_PAGE_SIZE = 1024 * 4;
@@ -48,34 +46,23 @@ public class MappedFile extends ReferenceResource {
private static final AtomicLong TOTAL_MAPED_VITUAL_MEMORY = new AtomicLong(0);
private static final AtomicInteger TOTAL_MAPED_FILES = new AtomicInteger(0);
-
- private String fileName;
-
- private long fileFromOffset;
-
- protected int fileSize;
-
- private File file;
-
- private MappedByteBuffer mappedByteBuffer;
-
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
-
- private final AtomicInteger flushedPosition = new AtomicInteger(0);
//ADD BY ChenYang
protected final AtomicInteger committedPosition = new AtomicInteger(0);
-
-
+ private final AtomicInteger flushedPosition = new AtomicInteger(0);
+ protected int fileSize;
protected FileChannel fileChannel;
-
- private volatile long storeTimestamp = 0;
- private boolean firstCreateInQueue = false;
-
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
protected ByteBuffer writeBuffer = null;
protected TransientStorePool transientStorePool = null;
+ private String fileName;
+ private long fileFromOffset;
+ private File file;
+ private MappedByteBuffer mappedByteBuffer;
+ private volatile long storeTimestamp = 0;
+ private boolean firstCreateInQueue = false;
public MappedFile() {
}
@@ -88,41 +75,6 @@ public class MappedFile extends ReferenceResource {
init(fileName, fileSize, transientStorePool);
}
- public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException {
- init(fileName, fileSize);
- this.writeBuffer = transientStorePool.borrowBuffer();
- this.transientStorePool = transientStorePool;
- }
-
- private void init(final String fileName, final int fileSize) throws IOException {
- this.fileName = fileName;
- this.fileSize = fileSize;
- this.file = new File(fileName);
- this.fileFromOffset = Long.parseLong(this.file.getName());
- boolean ok = false;
-
- ensureDirOK(this.file.getParent());
-
- try {
- this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
- this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
- TOTAL_MAPED_VITUAL_MEMORY.addAndGet(fileSize);
- TOTAL_MAPED_FILES.incrementAndGet();
- ok = true;
- } catch (FileNotFoundException e) {
- log.error("create file channel " + this.fileName + " Failed. ", e);
- throw e;
- } catch (IOException e) {
- log.error("map file " + this.fileName + " Failed. ", e);
- throw e;
- } finally {
- if (!ok && this.fileChannel != null) {
- this.fileChannel.close();
- }
- }
- }
-
-
public static void ensureDirOK(final String dirName) {
if (dirName != null) {
File f = new File(dirName);
@@ -133,14 +85,12 @@ public class MappedFile extends ReferenceResource {
}
}
-
public static void clean(final ByteBuffer buffer) {
if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0)
return;
invoke(invoke(viewed(buffer), "cleaner"), "clean");
}
-
private static Object invoke(final Object target, final String methodName, final Class<?>... args) {
return AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
@@ -155,9 +105,8 @@ public class MappedFile extends ReferenceResource {
});
}
-
private static Method method(Object target, String methodName, Class<?>[] args)
- throws NoSuchMethodException {
+ throws NoSuchMethodException {
try {
return target.getClass().getMethod(methodName, args);
} catch (NoSuchMethodException e) {
@@ -165,11 +114,9 @@ public class MappedFile extends ReferenceResource {
}
}
-
private static ByteBuffer viewed(ByteBuffer buffer) {
String methodName = "viewedBuffer";
-
Method[] methods = buffer.getClass().getMethods();
for (int i = 0; i < methods.length; i++) {
if (methods[i].getName().equals("attachment")) {
@@ -178,23 +125,54 @@ public class MappedFile extends ReferenceResource {
}
}
- ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName);
+ ByteBuffer viewedBuffer = (ByteBuffer)invoke(buffer, methodName);
if (viewedBuffer == null)
return buffer;
else
return viewed(viewedBuffer);
}
-
public static int getTotalmapedfiles() {
return TOTAL_MAPED_FILES.get();
}
-
public static long getTotalMapedVitualMemory() {
return TOTAL_MAPED_VITUAL_MEMORY.get();
}
+ public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException {
+ init(fileName, fileSize);
+ this.writeBuffer = transientStorePool.borrowBuffer();
+ this.transientStorePool = transientStorePool;
+ }
+
+ private void init(final String fileName, final int fileSize) throws IOException {
+ this.fileName = fileName;
+ this.fileSize = fileSize;
+ this.file = new File(fileName);
+ this.fileFromOffset = Long.parseLong(this.file.getName());
+ boolean ok = false;
+
+ ensureDirOK(this.file.getParent());
+
+ try {
+ this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
+ this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
+ TOTAL_MAPED_VITUAL_MEMORY.addAndGet(fileSize);
+ TOTAL_MAPED_FILES.incrementAndGet();
+ ok = true;
+ } catch (FileNotFoundException e) {
+ log.error("create file channel " + this.fileName + " Failed. ", e);
+ throw e;
+ } catch (IOException e) {
+ log.error("map file " + this.fileName + " Failed. ", e);
+ throw e;
+ } finally {
+ if (!ok && this.fileChannel != null) {
+ this.fileChannel.close();
+ }
+ }
+ }
public long getLastModifiedTimestamp() {
return this.file.lastModified();
@@ -214,20 +192,18 @@ public class MappedFile extends ReferenceResource {
int currentPos = this.wrotePosition.get();
-
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result =
- cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
+ cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
-
log.error("MappedFile.appendMessage return null, wrotePosition: " + currentPos + " fileSize: "
- + this.fileSize);
+ + this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
@@ -246,7 +222,6 @@ public class MappedFile extends ReferenceResource {
public boolean appendMessage(final byte[] data) {
int currentPos = this.wrotePosition.get();
-
if ((currentPos + data.length) <= this.fileSize) {
try {
this.fileChannel.position(currentPos);
@@ -262,11 +237,7 @@ public class MappedFile extends ReferenceResource {
}
/**
-
- *
* @param flushLeastPages
-
- *
* @return The current flushed position
*/
public int flush(final int flushLeastPages) {
@@ -370,12 +341,10 @@ public class MappedFile extends ReferenceResource {
return flushedPosition.get();
}
-
public void setFlushedPosition(int pos) {
this.flushedPosition.set(pos);
}
-
public boolean isFull() {
return this.fileSize == this.wrotePosition.get();
}
@@ -392,14 +361,13 @@ public class MappedFile extends ReferenceResource {
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
} else {
log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
- + this.fileFromOffset);
+ + this.fileFromOffset);
}
} else {
log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
- + ", fileFromOffset: " + this.fileFromOffset);
+ + ", fileFromOffset: " + this.fileFromOffset);
}
-
return null;
}
@@ -419,7 +387,6 @@ public class MappedFile extends ReferenceResource {
}
}
-
return null;
}
@@ -427,13 +394,13 @@ public class MappedFile extends ReferenceResource {
public boolean cleanup(final long currentRef) {
if (this.isAvailable()) {
log.error("this file[REF:" + currentRef + "] " + this.fileName
- + " have not shutdown, stop unmaping.");
+ + " have not shutdown, stop unmaping.");
return false;
}
if (this.isCleanupOver()) {
log.error("this file[REF:" + currentRef + "] " + this.fileName
- + " have cleanup, do not do it again.");
+ + " have cleanup, do not do it again.");
return true;
}
@@ -455,9 +422,9 @@ public class MappedFile extends ReferenceResource {
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
- + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
- + this.getFlushedPosition() + ", "
- + UtilAll.computeEclipseTimeMilliseconds(beginTime));
+ + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ + this.getFlushedPosition() + ", "
+ + UtilAll.computeEclipseTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
@@ -465,7 +432,7 @@ public class MappedFile extends ReferenceResource {
return true;
} else {
log.warn("destroy maped file[REF:" + this.getRefCount() + "] " + this.fileName
- + " Failed. cleanupOver: " + this.cleanupOver);
+ + " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
@@ -475,18 +442,17 @@ public class MappedFile extends ReferenceResource {
return wrotePosition.get();
}
+ public void setWrotePosition(int pos) {
+ this.wrotePosition.set(pos);
+ }
+
/**
- *
* @return The max position which have valid data
*/
public int getReadPosition() {
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}
- public void setWrotePosition(int pos) {
- this.wrotePosition.set(pos);
- }
-
public void setCommittedPosition(int pos) {
this.committedPosition.set(pos);
}
@@ -497,7 +463,7 @@ public class MappedFile extends ReferenceResource {
int flush = 0;
long time = System.currentTimeMillis();
for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
- byteBuffer.put(i, (byte) 0);
+ byteBuffer.put(i, (byte)0);
// force flush when flush disk type is sync
if (type == FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
@@ -521,11 +487,11 @@ public class MappedFile extends ReferenceResource {
// force flush when prepare load finished
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file worm up done, force to disk, mappedFile={}, costTime={}",
- this.getFileName(), System.currentTimeMillis() - beginTime);
+ this.getFileName(), System.currentTimeMillis() - beginTime);
mappedByteBuffer.force();
}
log.info("mapped file worm up done. mappedFile={}, costTime={}", this.getFileName(),
- System.currentTimeMillis() - beginTime);
+ System.currentTimeMillis() - beginTime);
this.mlock();
}
@@ -542,25 +508,21 @@ public class MappedFile extends ReferenceResource {
return this.mappedByteBuffer.slice();
}
-
public long getStoreTimestamp() {
return storeTimestamp;
}
-
public boolean isFirstCreateInQueue() {
return firstCreateInQueue;
}
-
public void setFirstCreateInQueue(boolean firstCreateInQueue) {
this.firstCreateInQueue = firstCreateInQueue;
}
-
public void mlock() {
final long beginTime = System.currentTimeMillis();
- final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
+ final long address = ((DirectBuffer)(this.mappedByteBuffer)).address();
Pointer pointer = new Pointer(address);
{
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
@@ -575,7 +537,7 @@ public class MappedFile extends ReferenceResource {
public void munlock() {
final long beginTime = System.currentTimeMillis();
- final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
+ final long address = ((DirectBuffer)(this.mappedByteBuffer)).address();
Pointer pointer = new Pointer(address);
int ret = LibC.INSTANCE.munlock(pointer, new NativeLong(this.fileSize));
log.info("munlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index a208a07..49455c6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -16,17 +16,19 @@
*/
package org.apache.rocketmq.store;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-
public class MappedFileQueue {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
@@ -46,15 +48,13 @@ public class MappedFileQueue {
private volatile long storeTimestamp = 0;
-
public MappedFileQueue(final String storePath, int mappedFileSize,
- AllocateMappedFileService allocateMappedFileService) {
+ AllocateMappedFileService allocateMappedFileService) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.allocateMappedFileService = allocateMappedFileService;
}
-
public void checkSelf() {
if (!this.mappedFiles.isEmpty()) {
@@ -66,7 +66,7 @@ public class MappedFileQueue {
if (pre != null) {
if (cur.getFileFromOffset() - pre.getFileFromOffset() != this.mappedFileSize) {
LOG_ERROR.error("[BUG]The mappedFile queue's data is damaged, the adjacent mappedFile's offset don't match. pre file {}, cur file {}",
- pre.getFileName(), cur.getFileName());
+ pre.getFileName(), cur.getFileName());
}
}
pre = cur;
@@ -74,7 +74,6 @@ public class MappedFileQueue {
}
}
-
public MappedFile getMappedFileByTime(final long timestamp) {
Object[] mfs = this.copyMappedFiles(0);
@@ -82,16 +81,15 @@ public class MappedFileQueue {
return null;
for (int i = 0; i < mfs.length; i++) {
- MappedFile mappedFile = (MappedFile) mfs[i];
+ MappedFile mappedFile = (MappedFile)mfs[i];
if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
return mappedFile;
}
}
- return (MappedFile) mfs[mfs.length - 1];
+ return (MappedFile)mfs[mfs.length - 1];
}
-
private Object[] copyMappedFiles(final int reservedMappedFiles) {
Object[] mfs;
@@ -103,7 +101,6 @@ public class MappedFileQueue {
return mfs;
}
-
public void truncateDirtyFiles(long offset) {
List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
@@ -111,9 +108,9 @@ public class MappedFileQueue {
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
if (fileTailOffset > offset) {
if (offset >= file.getFileFromOffset()) {
- file.setWrotePosition((int) (offset % this.mappedFileSize));
- file.setCommittedPosition((int) (offset % this.mappedFileSize));
- file.setFlushedPosition((int) (offset % this.mappedFileSize));
+ file.setWrotePosition((int)(offset % this.mappedFileSize));
+ file.setCommittedPosition((int)(offset % this.mappedFileSize));
+ file.setFlushedPosition((int)(offset % this.mappedFileSize));
} else {
file.destroy(1000);
willRemoveFiles.add(file);
@@ -124,7 +121,6 @@ public class MappedFileQueue {
this.deleteExpiredFile(willRemoveFiles);
}
-
private void deleteExpiredFile(List<MappedFile> files) {
if (!files.isEmpty()) {
@@ -148,7 +144,6 @@ public class MappedFileQueue {
}
}
-
public boolean load() {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
@@ -159,11 +154,10 @@ public class MappedFileQueue {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
- + " length not matched message store config value, ignore it");
+ + " length not matched message store config value, ignore it");
return true;
}
-
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
@@ -182,7 +176,6 @@ public class MappedFileQueue {
return true;
}
-
public long howMuchFallBehind() {
if (this.mappedFiles.isEmpty())
return 0;
@@ -198,7 +191,6 @@ public class MappedFileQueue {
return 0;
}
-
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
@@ -214,12 +206,12 @@ public class MappedFileQueue {
if (createOffset != -1 && needCreate) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
- + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
+ + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
- nextNextFilePath, this.mappedFileSize);
+ nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
@@ -268,11 +260,12 @@ public class MappedFileQueue {
if (mappedFileLast != null) {
long lastOffset = mappedFileLast.getFileFromOffset() +
- mappedFileLast.getWrotePosition();
+ mappedFileLast.getWrotePosition();
long diff = lastOffset - offset;
final int maxDiff = this.mappedFileSize * 2;
- if (diff > maxDiff) return false;
+ if (diff > maxDiff)
+ return false;
}
ListIterator<MappedFile> iterator = this.mappedFiles.listIterator();
@@ -280,7 +273,7 @@ public class MappedFileQueue {
while (iterator.hasPrevious()) {
mappedFileLast = iterator.previous();
if (offset >= mappedFileLast.getFileFromOffset()) {
- int where = (int) (offset % mappedFileLast.getFileSize());
+ int where = (int)(offset % mappedFileLast.getFileSize());
mappedFileLast.setFlushedPosition(where);
mappedFileLast.setWrotePosition(where);
mappedFileLast.setCommittedPosition(where);
@@ -306,7 +299,6 @@ public class MappedFileQueue {
return -1;
}
-
public long getMaxOffset() {
MappedFile mappedFile = getLastMappedFile();
if (mappedFile != null) {
@@ -342,9 +334,9 @@ public class MappedFileQueue {
}
public int deleteExpiredFileByTime(final long expiredTime,
- final int deleteFilesInterval,
- final long intervalForcibly,
- final boolean cleanImmediately) {
+ final int deleteFilesInterval,
+ final long intervalForcibly,
+ final boolean cleanImmediately) {
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
@@ -355,7 +347,7 @@ public class MappedFileQueue {
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
for (int i = 0; i < mfsLength; i++) {
- MappedFile mappedFile = (MappedFile) mfs[i];
+ MappedFile mappedFile = (MappedFile)mfs[i];
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
if (mappedFile.destroy(intervalForcibly)) {
@@ -384,7 +376,6 @@ public class MappedFileQueue {
return deleteCount;
}
-
public int deleteExpiredFileByOffset(long offset, int unitSize) {
Object[] mfs = this.copyMappedFiles(0);
@@ -396,7 +387,7 @@ public class MappedFileQueue {
for (int i = 0; i < mfsLength; i++) {
boolean destroy;
- MappedFile mappedFile = (MappedFile) mfs[i];
+ MappedFile mappedFile = (MappedFile)mfs[i];
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
if (result != null) {
long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
@@ -404,7 +395,7 @@ public class MappedFileQueue {
destroy = maxOffsetInLogicQueue < offset;
if (destroy) {
log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
- + maxOffsetInLogicQueue + ", delete it");
+ + maxOffsetInLogicQueue + ", delete it");
}
} else {
log.warn("this being not executed forever.");
@@ -425,7 +416,6 @@ public class MappedFileQueue {
return deleteCount;
}
-
public boolean flush(final int flushLeastPages) {
boolean result = true;
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, false);
@@ -467,10 +457,10 @@ public class MappedFileQueue {
try {
MappedFile mappedFile = this.getFirstMappedFile();
if (mappedFile != null) {
- int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
+ int index = (int)((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
if (index < 0 || index >= this.mappedFiles.size()) {
LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " +
- "mappedFileSize: {}, mappedFiles count: {}",
+ "mappedFileSize: {}, mappedFiles count: {}",
mappedFile,
offset,
index,
@@ -494,7 +484,6 @@ public class MappedFileQueue {
return null;
}
-
public MappedFile getFirstMappedFile() {
MappedFile mappedFileFirst = null;
@@ -515,14 +504,13 @@ public class MappedFileQueue {
return findMappedFileByOffset(offset, false);
}
-
public long getMappedMemorySize() {
long size = 0;
Object[] mfs = this.copyMappedFiles(0);
if (mfs != null) {
for (Object mf : mfs) {
- if (((ReferenceResource) mf).isAvailable()) {
+ if (((ReferenceResource)mf).isAvailable()) {
size += this.mappedFileSize;
}
}
@@ -531,7 +519,6 @@ public class MappedFileQueue {
return size;
}
-
public boolean retryDeleteFirstFile(final long intervalForcibly) {
MappedFile mappedFile = this.getFirstMappedFile();
if (mappedFile != null) {
@@ -554,14 +541,12 @@ public class MappedFileQueue {
return false;
}
-
public void shutdown(final long intervalForcibly) {
for (MappedFile mf : this.mappedFiles) {
mf.shutdown(intervalForcibly);
}
}
-
public void destroy() {
for (MappedFile mf : this.mappedFiles) {
mf.destroy(1000 * 3);
@@ -576,27 +561,22 @@ public class MappedFileQueue {
}
}
-
public long getFlushedWhere() {
return flushedWhere;
}
-
public void setFlushedWhere(long flushedWhere) {
this.flushedWhere = flushedWhere;
}
-
public long getStoreTimestamp() {
return storeTimestamp;
}
-
public List<MappedFile> getMappedFiles() {
return mappedFiles;
}
-
public int getMappedFileSize() {
return mappedFileSize;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
index 25304b9..ebc57a7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
index dabb418..4cbdacf 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
@@ -6,20 +6,19 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.message.MessageExt;
-
public class MessageExtBrokerInner extends MessageExt {
private static final long serialVersionUID = 7256001576878700634L;
private String propertiesString;
@@ -32,22 +31,18 @@ public class MessageExtBrokerInner extends MessageExt {
return tags.hashCode();
}
-
public String getPropertiesString() {
return propertiesString;
}
-
public void setPropertiesString(String propertiesString) {
this.propertiesString = propertiesString;
}
-
public long getTagsCode() {
return tagsCode;
}
-
public void setTagsCode(long tagsCode) {
this.tagsCode = tagsCode;
}