You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/12/12 09:36:37 UTC

[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5351: [ISSUE #3799]main compaction process

fuyou001 commented on code in PR #5351:
URL: https://github.com/apache/rocketmq/pull/5351#discussion_r1045457078


##########
tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.message;
+
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class DumpCompactionLogCommand implements SubCommand {
+    @Override
+    public String commandDesc() {
+        return "parse compaction log to message";
+    }
+
+    @Override
+    public String commandName() {
+        return "dumpCompactionLog";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("f", "file", true, "to dump file name");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
+            throws SubCommandException {
+        if (commandLine.hasOption("f")) {
+            String fileName = commandLine.getOptionValue("f");
+            Path filePath = Paths.get(fileName);
+            if (!Files.exists(filePath)) {
+                throw new SubCommandException("file " + fileName + " not exist.");
+            }
+
+            if (Files.isDirectory(filePath)) {
+                throw new SubCommandException("file " + fileName + " is a directory.");
+            }
+
+            try {
+                long fileSize = Files.size(filePath);
+                FileChannel fileChannel = new RandomAccessFile(fileName, "rw").getChannel();
+                ByteBuffer buf = fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
+
+                int current = 0;
+                while (current < fileSize) {
+                    buf.position(current);
+                    ByteBuffer bb = buf.slice();
+                    int size = bb.getInt();
+                    if (size > buf.capacity() || size < 0) {
+                        break;
+                    } else {
+                        bb.limit(size);
+                        bb.rewind();
+                    }
+
+                    MessageExt messageExt = MessageDecoder.decode(bb, false, false);
+                    if (messageExt == null) {
+                        break;
+                    } else {
+                        current += size;
+                        System.out.printf(messageExt + "\n");
+                    }
+                }
+
+                UtilAll.cleanBuffer(buf);

Review Comment:
   UtilAll.cleanBuffer(buf) move to try catch finally is better.



##########
tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.message;
+
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class DumpCompactionLogCommand implements SubCommand {
+    @Override
+    public String commandDesc() {
+        return "parse compaction log to message";
+    }
+
+    @Override
+    public String commandName() {
+        return "dumpCompactionLog";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("f", "file", true, "to dump file name");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
+            throws SubCommandException {
+        if (commandLine.hasOption("f")) {
+            String fileName = commandLine.getOptionValue("f");
+            Path filePath = Paths.get(fileName);
+            if (!Files.exists(filePath)) {
+                throw new SubCommandException("file " + fileName + " not exist.");
+            }
+
+            if (Files.isDirectory(filePath)) {
+                throw new SubCommandException("file " + fileName + " is a directory.");
+            }
+
+            try {
+                long fileSize = Files.size(filePath);
+                FileChannel fileChannel = new RandomAccessFile(fileName, "rw").getChannel();

Review Comment:
   rw-->r



##########
store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.kv;
+
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.CleanupPolicy;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.CommitLog;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class CompactionService extends ServiceThread {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+    private final CompactionStore compactionStore;
+    private final DefaultMessageStore defaultMessageStore;
+    private final CommitLog commitLog;
+    private final LinkedBlockingQueue<TopicPartitionOffset> compactionMsgQ = new LinkedBlockingQueue<>();
+
+    public CompactionService(CommitLog commitLog, DefaultMessageStore messageStore, CompactionStore compactionStore) {
+        this.commitLog = commitLog;
+        this.defaultMessageStore = messageStore;
+        this.compactionStore = compactionStore;
+    }
+
+    public void putRequest(DispatchRequest request) {
+        if (request == null) {
+            return;
+        }
+
+        String topic = request.getTopic();
+        Optional<TopicConfig> topicConfig = defaultMessageStore.getTopicConfig(topic);
+        CleanupPolicy policy = CleanupPolicyUtils.getDeletePolicy(topicConfig);
+        //check request topic flag
+        if (Objects.equals(policy, CleanupPolicy.COMPACTION)) {
+            int queueId = request.getQueueId();
+            long physicalOffset = request.getCommitLogOffset();
+            TopicPartitionOffset tpo = new TopicPartitionOffset(topic, queueId, physicalOffset);
+            compactionMsgQ.offer(tpo);

Review Comment:
   oom risk



##########
tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.message;
+
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class DumpCompactionLogCommand implements SubCommand {
+    @Override
+    public String commandDesc() {
+        return "parse compaction log to message";
+    }
+
+    @Override
+    public String commandName() {
+        return "dumpCompactionLog";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("f", "file", true, "to dump file name");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
+            throws SubCommandException {
+        if (commandLine.hasOption("f")) {
+            String fileName = commandLine.getOptionValue("f");
+            Path filePath = Paths.get(fileName);
+            if (!Files.exists(filePath)) {
+                throw new SubCommandException("file " + fileName + " not exist.");
+            }
+
+            if (Files.isDirectory(filePath)) {
+                throw new SubCommandException("file " + fileName + " is a directory.");
+            }
+
+            try {
+                long fileSize = Files.size(filePath);
+                FileChannel fileChannel = new RandomAccessFile(fileName, "rw").getChannel();

Review Comment:
   READ_ONLY is better



##########
store/src/main/java/org/apache/rocketmq/store/kv/CompactionPositionMgr.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.kv;
+
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.io.File;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CompactionPositionMgr extends ConfigManager {
+
+    public static final String CHECKPOINT_FILE = "position-checkpoint";
+
+    private transient String compactionPath;
+    private transient String checkpointFileName;
+
+    private ConcurrentHashMap<String, Long> queueOffsetMap = new ConcurrentHashMap<>();
+
+    private CompactionPositionMgr() {
+
+    }
+
+    public CompactionPositionMgr(final String compactionPath) {
+        this.compactionPath = compactionPath;
+        this.checkpointFileName = compactionPath + File.separator + CHECKPOINT_FILE;
+        this.load();
+    }
+
+    public void setOffset(String topic, int queueId, final long offset) {
+        queueOffsetMap.put(topic + "_" + queueId, offset);
+    }
+
+    public long getOffset(String topic, int queueId) {
+        return queueOffsetMap.getOrDefault(topic + "_" + queueId, -1L);
+    }
+
+    public boolean isEmpty() {
+        return queueOffsetMap.isEmpty();
+    }
+
+    public boolean isCompaction(String topic, int queueId, long offset) {
+        return getOffset(topic, queueId) > offset;
+    }
+
+    @Override
+    public String configFilePath() {
+        return checkpointFileName;
+    }
+
+    @Override
+    public String encode() {
+        return this.encode(false);
+    }
+
+    @Override
+    public String encode(boolean prettyFormat) {
+        return RemotingSerializable.toJson(this, prettyFormat);

Review Comment:
   when topic numbers is large ,broker has fgc risk,like TopicConfigManager 



##########
store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.kv;
+
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class CompactionStore {
+
+    public static final String COMPACTION_DIR = "compaction";
+    public static final String COMPACTION_LOG_DIR = "compactionLog";
+    public static final String COMPACTION_CQ_DIR = "compactionCq";
+
+    private final String compactionPath;
+    private final String compactionLogPath;
+    private final String compactionCqPath;
+    private final MessageStore defaultMessageStore;
+    private final CompactionPositionMgr positionMgr;
+    private final ConcurrentHashMap<String, CompactionLog> compactionLogTable;
+    private final ScheduledExecutorService compactionSchedule;
+    private final int compactionInterval;
+    private final int compactionThreadNum;
+    private final int offsetMapSize;
+    private String masterAddr;
+
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+    public CompactionStore(MessageStore defaultMessageStore) {
+        this.defaultMessageStore = defaultMessageStore;
+        this.compactionLogTable = new ConcurrentHashMap<>();
+        MessageStoreConfig config = defaultMessageStore.getMessageStoreConfig();
+        String storeRootPath = config.getStorePathRootDir();
+        this.compactionPath = Paths.get(storeRootPath, COMPACTION_DIR).toString();
+        this.compactionLogPath = Paths.get(compactionPath, COMPACTION_LOG_DIR).toString();
+        this.compactionCqPath = Paths.get(compactionPath, COMPACTION_CQ_DIR).toString();
+        this.positionMgr = new CompactionPositionMgr(compactionPath);
+        if (config.getCompactionThreadNum() <= 0) {
+            this.compactionThreadNum = Runtime.getRuntime().availableProcessors();

Review Comment:
   compactionThreadNum must has a upper value,when deploy broker on Physical machine



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org