You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:28 UTC

[11/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ConsumeQueue.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ConsumeQueue.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ConsumeQueue.java
new file mode 100644
index 0000000..f23eb1c
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ConsumeQueue.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumeQueue {
+
+    public static final int CQ_STORE_UNIT_SIZE = 20;
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
+
+    private final DefaultMessageStore defaultMessageStore;
+
+    private final MappedFileQueue mappedFileQueue;
+    private final String topic;
+    private final int queueId;
+    private final ByteBuffer byteBufferIndex;
+
+    private final String storePath;
+    private final int mappedFileSize;
+    private long maxPhysicOffset = -1;
+    private volatile long minLogicOffset = 0;
+
+
+    public ConsumeQueue(
+            final String topic,
+            final int queueId,
+            final String storePath,
+            final int mappedFileSize,
+            final DefaultMessageStore defaultMessageStore) {
+        this.storePath = storePath;
+        this.mappedFileSize = mappedFileSize;
+        this.defaultMessageStore = defaultMessageStore;
+
+        this.topic = topic;
+        this.queueId = queueId;
+
+        String queueDir = this.storePath
+                + File.separator + topic
+                + File.separator + queueId;
+
+        this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
+
+        this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
+    }
+
+
+    public boolean load() {
+        boolean result = this.mappedFileQueue.load();
+        log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
+        return result;
+    }
+
+
+    public void recover() {
+        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
+        if (!mappedFiles.isEmpty()) {
+
+            int index = mappedFiles.size() - 3;
+            if (index < 0)
+                index = 0;
+
+            int mapedFileSizeLogics = this.mappedFileSize;
+            MappedFile mappedFile = mappedFiles.get(index);
+            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+            long processOffset = mappedFile.getFileFromOffset();
+            long mapedFileOffset = 0;
+            while (true) {
+                for (int i = 0; i < mapedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
+                    long offset = byteBuffer.getLong();
+                    int size = byteBuffer.getInt();
+                    long tagsCode = byteBuffer.getLong();
+
+                    if (offset >= 0 && size > 0) {
+                        mapedFileOffset = i + CQ_STORE_UNIT_SIZE;
+                        this.maxPhysicOffset = offset;
+                    } else {
+                        log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
+                                + offset + " " + size + " " + tagsCode);
+                        break;
+                    }
+                }
+
+
+                if (mapedFileOffset == mapedFileSizeLogics) {
+                    index++;
+                    if (index >= mappedFiles.size()) {
+
+                        log.info("recover last consume queue file over, last maped file "
+                                + mappedFile.getFileName());
+                        break;
+                    } else {
+                        mappedFile = mappedFiles.get(index);
+                        byteBuffer = mappedFile.sliceByteBuffer();
+                        processOffset = mappedFile.getFileFromOffset();
+                        mapedFileOffset = 0;
+                        log.info("recover next consume queue file, " + mappedFile.getFileName());
+                    }
+                } else {
+                    log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
+                            + (processOffset + mapedFileOffset));
+                    break;
+                }
+            }
+
+            processOffset += mapedFileOffset;
+            this.mappedFileQueue.setFlushedWhere(processOffset);
+            this.mappedFileQueue.setCommittedWhere(processOffset);
+            this.mappedFileQueue.truncateDirtyFiles(processOffset);
+        }
+    }
+
+    public long getOffsetInQueueByTime(final long timestamp) {
+        MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
+        if (mappedFile != null) {
+            long offset = 0;
+            int low =
+                    minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile
+                            .getFileFromOffset()) : 0;
+            int high = 0;
+            int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
+            long leftIndexValue = -1L, rightIndexValue = -1L;
+            long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
+            SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
+            if (null != sbr) {
+                ByteBuffer byteBuffer = sbr.getByteBuffer();
+                high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
+                try {
+                    while (high >= low) {
+                        midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
+                        byteBuffer.position(midOffset);
+                        long phyOffset = byteBuffer.getLong();
+                        int size = byteBuffer.getInt();
+                        if (phyOffset < minPhysicOffset) {
+                            low = midOffset + CQ_STORE_UNIT_SIZE;
+                            leftOffset = midOffset;
+                            continue;
+                        }
+
+                        long storeTime =
+                                this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
+                        if (storeTime < 0) {
+                            return 0;
+                        } else if (storeTime == timestamp) {
+                            targetOffset = midOffset;
+                            break;
+                        } else if (storeTime > timestamp) {
+                            high = midOffset - CQ_STORE_UNIT_SIZE;
+                            rightOffset = midOffset;
+                            rightIndexValue = storeTime;
+                        } else {
+                            low = midOffset + CQ_STORE_UNIT_SIZE;
+                            leftOffset = midOffset;
+                            leftIndexValue = storeTime;
+                        }
+                    }
+
+                    if (targetOffset != -1) {
+
+                        offset = targetOffset;
+                    } else {
+                        if (leftIndexValue == -1) {
+
+                            offset = rightOffset;
+                        } else if (rightIndexValue == -1) {
+
+                            offset = leftOffset;
+                        } else {
+                            offset =
+                                    Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
+                                            - rightIndexValue) ? rightOffset : leftOffset;
+                        }
+                    }
+
+                    return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
+                } finally {
+                    sbr.release();
+                }
+            }
+        }
+        return 0;
+    }
+
+    public void truncateDirtyLogicFiles(long phyOffet) {
+
+        int logicFileSize = this.mappedFileSize;
+
+        this.maxPhysicOffset = phyOffet - 1;
+
+        while (true) {
+            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
+            if (mappedFile != null) {
+                ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+
+                mappedFile.setWrotePosition(0);
+                mappedFile.setCommittedPosition(0);
+                mappedFile.setFlushedPosition(0);
+
+                for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {
+                    long offset = byteBuffer.getLong();
+                    int size = byteBuffer.getInt();
+                    byteBuffer.getLong();
+
+
+                    if (0 == i) {
+                        if (offset >= phyOffet) {
+                            this.mappedFileQueue.deleteLastMappedFile();
+                            break;
+                        } else {
+                            int pos = i + CQ_STORE_UNIT_SIZE;
+                            mappedFile.setWrotePosition(pos);
+                            mappedFile.setCommittedPosition(pos);
+                            mappedFile.setFlushedPosition(pos);
+                            this.maxPhysicOffset = offset;
+                        }
+                    } else {
+
+                        if (offset >= 0 && size > 0) {
+
+                            if (offset >= phyOffet) {
+                                return;
+                            }
+
+                            int pos = i + CQ_STORE_UNIT_SIZE;
+                            mappedFile.setWrotePosition(pos);
+                            mappedFile.setCommittedPosition(pos);
+                            mappedFile.setFlushedPosition(pos);
+                            this.maxPhysicOffset = offset;
+
+
+                            if (pos == logicFileSize) {
+                                return;
+                            }
+                        } else {
+                            return;
+                        }
+                    }
+                }
+            } else {
+                break;
+            }
+        }
+    }
+
+    public long getLastOffset() {
+        long lastOffset = -1;
+
+        int logicFileSize = this.mappedFileSize;
+
+        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
+        if (mappedFile != null) {
+
+            int position = mappedFile.getWrotePosition() - CQ_STORE_UNIT_SIZE;
+            if (position < 0)
+                position = 0;
+
+            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+            byteBuffer.position(position);
+            for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {
+                long offset = byteBuffer.getLong();
+                int size = byteBuffer.getInt();
+                byteBuffer.getLong();
+
+
+                if (offset >= 0 && size > 0) {
+                    lastOffset = offset + size;
+                } else {
+                    break;
+                }
+            }
+        }
+
+        return lastOffset;
+    }
+
+
+    public boolean flush(final int flushLeastPages) {
+        return this.mappedFileQueue.flush(flushLeastPages);
+    }
+
+
+    public int deleteExpiredFile(long offset) {
+        int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
+        this.correctMinOffset(offset);
+        return cnt;
+    }
+
+    public void correctMinOffset(long phyMinOffset) {
+        MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
+        if (mappedFile != null) {
+            SelectMappedBufferResult result = mappedFile.selectMappedBuffer(0);
+            if (result != null) {
+                try {
+
+                    for (int i = 0; i < result.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                        long offsetPy = result.getByteBuffer().getLong();
+                        result.getByteBuffer().getInt();
+                        result.getByteBuffer().getLong();
+
+                        if (offsetPy >= phyMinOffset) {
+                            this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;
+                            log.info("compute logics min offset: " + this.getMinOffsetInQuque() + ", topic: "
+                                    + this.topic + ", queueId: " + this.queueId);
+                            break;
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+                    result.release();
+                }
+            }
+        }
+    }
+
+
+    public long getMinOffsetInQuque() {
+        return this.minLogicOffset / CQ_STORE_UNIT_SIZE;
+    }
+
+
+    public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
+                                              long logicOffset) {
+        final int maxRetries = 30;
+        boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
+        for (int i = 0; i < maxRetries && canWrite; i++) {
+            boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
+            if (result) {
+                this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp);
+                return;
+            } else {
+                // XXX: warn and notify me
+                log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset
+                        + " failed, retry " + i + " times");
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    log.warn("", e);
+                }
+            }
+        }
+
+        // XXX: warn and notify me
+        log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
+        this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
+    }
+
+    private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
+                                           final long cqOffset) {
+
+        if (offset <= this.maxPhysicOffset) {
+            return true;
+        }
+
+        this.byteBufferIndex.flip();
+        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
+        this.byteBufferIndex.putLong(offset);
+        this.byteBufferIndex.putInt(size);
+        this.byteBufferIndex.putLong(tagsCode);
+
+        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
+
+        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
+        if (mappedFile != null) {
+
+            if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
+                this.minLogicOffset = expectLogicOffset;
+                this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
+                this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
+                this.fillPreBlank(mappedFile, expectLogicOffset);
+                log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+                        + mappedFile.getWrotePosition());
+            }
+
+            if (cqOffset != 0) {
+                long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
+                if (expectLogicOffset != currentLogicOffset) {
+                    LOG_ERROR.warn(
+                            "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
+                            expectLogicOffset,
+                            currentLogicOffset,
+                            this.topic,
+                            this.queueId,
+                            expectLogicOffset - currentLogicOffset
+                    );
+                }
+            }
+            this.maxPhysicOffset = offset;
+            return mappedFile.appendMessage(this.byteBufferIndex.array());
+        }
+        return false;
+    }
+
+
+    private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
+        byteBuffer.putLong(0L);
+        byteBuffer.putInt(Integer.MAX_VALUE);
+        byteBuffer.putLong(0L);
+
+        int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
+        for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
+            mappedFile.appendMessage(byteBuffer.array());
+        }
+    }
+
+    public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
+        int mappedFileSize = this.mappedFileSize;
+        long offset = startIndex * CQ_STORE_UNIT_SIZE;
+        if (offset >= this.getMinLogicOffset()) {
+            MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
+            if (mappedFile != null) {
+                SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
+                return result;
+            }
+        }
+        return null;
+    }
+
+    public long getMinLogicOffset() {
+        return minLogicOffset;
+    }
+
+    public void setMinLogicOffset(long minLogicOffset) {
+        this.minLogicOffset = minLogicOffset;
+    }
+
+    public long rollNextFile(final long index) {
+        int mapedFileSize = this.mappedFileSize;
+        int totalUnitsInFile = mapedFileSize / CQ_STORE_UNIT_SIZE;
+        return index + totalUnitsInFile - index % totalUnitsInFile;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public long getMaxPhysicOffset() {
+        return maxPhysicOffset;
+    }
+
+    public void setMaxPhysicOffset(long maxPhysicOffset) {
+        this.maxPhysicOffset = maxPhysicOffset;
+    }
+
+    public void destroy() {
+        this.maxPhysicOffset = -1;
+        this.minLogicOffset = 0;
+        this.mappedFileQueue.destroy();
+    }
+
+    public long getMessageTotalInQueue() {
+        return this.getMaxOffsetInQuque() - this.getMinOffsetInQuque();
+    }
+
+
+    public long getMaxOffsetInQuque() {
+        return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
+    }
+
+
+    public void checkSelf() {
+        mappedFileQueue.checkSelf();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageFilter.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageFilter.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageFilter.java
new file mode 100644
index 0000000..2086438
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageFilter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMessageFilter implements MessageFilter {
+
+    @Override
+    public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) {
+        if (tagsCode == null) {
+            return true;
+        }
+
+        if (null == subscriptionData) {
+            return true;
+        }
+
+        if (subscriptionData.isClassFilterMode())
+            return true;
+
+        if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
+            return true;
+        }
+
+        return subscriptionData.getCodeSet().contains(tagsCode.intValue());
+    }
+
+}