You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:28 UTC
[11/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ConsumeQueue.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ConsumeQueue.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ConsumeQueue.java
new file mode 100644
index 0000000..f23eb1c
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ConsumeQueue.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumeQueue {
+
+ public static final int CQ_STORE_UNIT_SIZE = 20;
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
+
+ private final DefaultMessageStore defaultMessageStore;
+
+ private final MappedFileQueue mappedFileQueue;
+ private final String topic;
+ private final int queueId;
+ private final ByteBuffer byteBufferIndex;
+
+ private final String storePath;
+ private final int mappedFileSize;
+ private long maxPhysicOffset = -1;
+ private volatile long minLogicOffset = 0;
+
+
+ public ConsumeQueue(
+ final String topic,
+ final int queueId,
+ final String storePath,
+ final int mappedFileSize,
+ final DefaultMessageStore defaultMessageStore) {
+ this.storePath = storePath;
+ this.mappedFileSize = mappedFileSize;
+ this.defaultMessageStore = defaultMessageStore;
+
+ this.topic = topic;
+ this.queueId = queueId;
+
+ String queueDir = this.storePath
+ + File.separator + topic
+ + File.separator + queueId;
+
+ this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
+
+ this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
+ }
+
+
+ public boolean load() {
+ boolean result = this.mappedFileQueue.load();
+ log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
+ return result;
+ }
+
+
+ public void recover() {
+ final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
+ if (!mappedFiles.isEmpty()) {
+
+ int index = mappedFiles.size() - 3;
+ if (index < 0)
+ index = 0;
+
+ int mapedFileSizeLogics = this.mappedFileSize;
+ MappedFile mappedFile = mappedFiles.get(index);
+ ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+ long processOffset = mappedFile.getFileFromOffset();
+ long mapedFileOffset = 0;
+ while (true) {
+ for (int i = 0; i < mapedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
+ long offset = byteBuffer.getLong();
+ int size = byteBuffer.getInt();
+ long tagsCode = byteBuffer.getLong();
+
+ if (offset >= 0 && size > 0) {
+ mapedFileOffset = i + CQ_STORE_UNIT_SIZE;
+ this.maxPhysicOffset = offset;
+ } else {
+ log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
+ + offset + " " + size + " " + tagsCode);
+ break;
+ }
+ }
+
+
+ if (mapedFileOffset == mapedFileSizeLogics) {
+ index++;
+ if (index >= mappedFiles.size()) {
+
+ log.info("recover last consume queue file over, last maped file "
+ + mappedFile.getFileName());
+ break;
+ } else {
+ mappedFile = mappedFiles.get(index);
+ byteBuffer = mappedFile.sliceByteBuffer();
+ processOffset = mappedFile.getFileFromOffset();
+ mapedFileOffset = 0;
+ log.info("recover next consume queue file, " + mappedFile.getFileName());
+ }
+ } else {
+ log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
+ + (processOffset + mapedFileOffset));
+ break;
+ }
+ }
+
+ processOffset += mapedFileOffset;
+ this.mappedFileQueue.setFlushedWhere(processOffset);
+ this.mappedFileQueue.setCommittedWhere(processOffset);
+ this.mappedFileQueue.truncateDirtyFiles(processOffset);
+ }
+ }
+
+ public long getOffsetInQueueByTime(final long timestamp) {
+ MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
+ if (mappedFile != null) {
+ long offset = 0;
+ int low =
+ minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile
+ .getFileFromOffset()) : 0;
+ int high = 0;
+ int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
+ long leftIndexValue = -1L, rightIndexValue = -1L;
+ long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
+ SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
+ if (null != sbr) {
+ ByteBuffer byteBuffer = sbr.getByteBuffer();
+ high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
+ try {
+ while (high >= low) {
+ midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
+ byteBuffer.position(midOffset);
+ long phyOffset = byteBuffer.getLong();
+ int size = byteBuffer.getInt();
+ if (phyOffset < minPhysicOffset) {
+ low = midOffset + CQ_STORE_UNIT_SIZE;
+ leftOffset = midOffset;
+ continue;
+ }
+
+ long storeTime =
+ this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
+ if (storeTime < 0) {
+ return 0;
+ } else if (storeTime == timestamp) {
+ targetOffset = midOffset;
+ break;
+ } else if (storeTime > timestamp) {
+ high = midOffset - CQ_STORE_UNIT_SIZE;
+ rightOffset = midOffset;
+ rightIndexValue = storeTime;
+ } else {
+ low = midOffset + CQ_STORE_UNIT_SIZE;
+ leftOffset = midOffset;
+ leftIndexValue = storeTime;
+ }
+ }
+
+ if (targetOffset != -1) {
+
+ offset = targetOffset;
+ } else {
+ if (leftIndexValue == -1) {
+
+ offset = rightOffset;
+ } else if (rightIndexValue == -1) {
+
+ offset = leftOffset;
+ } else {
+ offset =
+ Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
+ - rightIndexValue) ? rightOffset : leftOffset;
+ }
+ }
+
+ return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
+ } finally {
+ sbr.release();
+ }
+ }
+ }
+ return 0;
+ }
+
+ public void truncateDirtyLogicFiles(long phyOffet) {
+
+ int logicFileSize = this.mappedFileSize;
+
+ this.maxPhysicOffset = phyOffet - 1;
+
+ while (true) {
+ MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
+ if (mappedFile != null) {
+ ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+
+ mappedFile.setWrotePosition(0);
+ mappedFile.setCommittedPosition(0);
+ mappedFile.setFlushedPosition(0);
+
+ for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {
+ long offset = byteBuffer.getLong();
+ int size = byteBuffer.getInt();
+ byteBuffer.getLong();
+
+
+ if (0 == i) {
+ if (offset >= phyOffet) {
+ this.mappedFileQueue.deleteLastMappedFile();
+ break;
+ } else {
+ int pos = i + CQ_STORE_UNIT_SIZE;
+ mappedFile.setWrotePosition(pos);
+ mappedFile.setCommittedPosition(pos);
+ mappedFile.setFlushedPosition(pos);
+ this.maxPhysicOffset = offset;
+ }
+ } else {
+
+ if (offset >= 0 && size > 0) {
+
+ if (offset >= phyOffet) {
+ return;
+ }
+
+ int pos = i + CQ_STORE_UNIT_SIZE;
+ mappedFile.setWrotePosition(pos);
+ mappedFile.setCommittedPosition(pos);
+ mappedFile.setFlushedPosition(pos);
+ this.maxPhysicOffset = offset;
+
+
+ if (pos == logicFileSize) {
+ return;
+ }
+ } else {
+ return;
+ }
+ }
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ public long getLastOffset() {
+ long lastOffset = -1;
+
+ int logicFileSize = this.mappedFileSize;
+
+ MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
+ if (mappedFile != null) {
+
+ int position = mappedFile.getWrotePosition() - CQ_STORE_UNIT_SIZE;
+ if (position < 0)
+ position = 0;
+
+ ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+ byteBuffer.position(position);
+ for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {
+ long offset = byteBuffer.getLong();
+ int size = byteBuffer.getInt();
+ byteBuffer.getLong();
+
+
+ if (offset >= 0 && size > 0) {
+ lastOffset = offset + size;
+ } else {
+ break;
+ }
+ }
+ }
+
+ return lastOffset;
+ }
+
+
+ public boolean flush(final int flushLeastPages) {
+ return this.mappedFileQueue.flush(flushLeastPages);
+ }
+
+
+ public int deleteExpiredFile(long offset) {
+ int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
+ this.correctMinOffset(offset);
+ return cnt;
+ }
+
+ public void correctMinOffset(long phyMinOffset) {
+ MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
+ if (mappedFile != null) {
+ SelectMappedBufferResult result = mappedFile.selectMappedBuffer(0);
+ if (result != null) {
+ try {
+
+ for (int i = 0; i < result.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+ long offsetPy = result.getByteBuffer().getLong();
+ result.getByteBuffer().getInt();
+ result.getByteBuffer().getLong();
+
+ if (offsetPy >= phyMinOffset) {
+ this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;
+ log.info("compute logics min offset: " + this.getMinOffsetInQuque() + ", topic: "
+ + this.topic + ", queueId: " + this.queueId);
+ break;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ result.release();
+ }
+ }
+ }
+ }
+
+
+ public long getMinOffsetInQuque() {
+ return this.minLogicOffset / CQ_STORE_UNIT_SIZE;
+ }
+
+
+ public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
+ long logicOffset) {
+ final int maxRetries = 30;
+ boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
+ for (int i = 0; i < maxRetries && canWrite; i++) {
+ boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
+ if (result) {
+ this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp);
+ return;
+ } else {
+ // XXX: warn and notify me
+ log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset
+ + " failed, retry " + i + " times");
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ log.warn("", e);
+ }
+ }
+ }
+
+ // XXX: warn and notify me
+ log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
+ this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
+ }
+
+ private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
+ final long cqOffset) {
+
+ if (offset <= this.maxPhysicOffset) {
+ return true;
+ }
+
+ this.byteBufferIndex.flip();
+ this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
+ this.byteBufferIndex.putLong(offset);
+ this.byteBufferIndex.putInt(size);
+ this.byteBufferIndex.putLong(tagsCode);
+
+ final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
+
+ MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
+ if (mappedFile != null) {
+
+ if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
+ this.minLogicOffset = expectLogicOffset;
+ this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
+ this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
+ this.fillPreBlank(mappedFile, expectLogicOffset);
+ log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ + mappedFile.getWrotePosition());
+ }
+
+ if (cqOffset != 0) {
+ long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
+ if (expectLogicOffset != currentLogicOffset) {
+ LOG_ERROR.warn(
+ "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
+ expectLogicOffset,
+ currentLogicOffset,
+ this.topic,
+ this.queueId,
+ expectLogicOffset - currentLogicOffset
+ );
+ }
+ }
+ this.maxPhysicOffset = offset;
+ return mappedFile.appendMessage(this.byteBufferIndex.array());
+ }
+ return false;
+ }
+
+
+ private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
+ byteBuffer.putLong(0L);
+ byteBuffer.putInt(Integer.MAX_VALUE);
+ byteBuffer.putLong(0L);
+
+ int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
+ for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
+ mappedFile.appendMessage(byteBuffer.array());
+ }
+ }
+
+ public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
+ int mappedFileSize = this.mappedFileSize;
+ long offset = startIndex * CQ_STORE_UNIT_SIZE;
+ if (offset >= this.getMinLogicOffset()) {
+ MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
+ if (mappedFile != null) {
+ SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
+ return result;
+ }
+ }
+ return null;
+ }
+
+ public long getMinLogicOffset() {
+ return minLogicOffset;
+ }
+
+ public void setMinLogicOffset(long minLogicOffset) {
+ this.minLogicOffset = minLogicOffset;
+ }
+
+ public long rollNextFile(final long index) {
+ int mapedFileSize = this.mappedFileSize;
+ int totalUnitsInFile = mapedFileSize / CQ_STORE_UNIT_SIZE;
+ return index + totalUnitsInFile - index % totalUnitsInFile;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+ public long getMaxPhysicOffset() {
+ return maxPhysicOffset;
+ }
+
+ public void setMaxPhysicOffset(long maxPhysicOffset) {
+ this.maxPhysicOffset = maxPhysicOffset;
+ }
+
+ public void destroy() {
+ this.maxPhysicOffset = -1;
+ this.minLogicOffset = 0;
+ this.mappedFileQueue.destroy();
+ }
+
+ public long getMessageTotalInQueue() {
+ return this.getMaxOffsetInQuque() - this.getMinOffsetInQuque();
+ }
+
+
+ public long getMaxOffsetInQuque() {
+ return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
+ }
+
+
+ public void checkSelf() {
+ mappedFileQueue.checkSelf();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageFilter.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageFilter.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageFilter.java
new file mode 100644
index 0000000..2086438
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageFilter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMessageFilter implements MessageFilter {
+
+ @Override
+ public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) {
+ if (tagsCode == null) {
+ return true;
+ }
+
+ if (null == subscriptionData) {
+ return true;
+ }
+
+ if (subscriptionData.isClassFilterMode())
+ return true;
+
+ if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
+ return true;
+ }
+
+ return subscriptionData.getCodeSet().contains(tagsCode.intValue());
+ }
+
+}