You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/08/01 07:29:31 UTC

[GitHub] [rocketmq-mqtt] DongyuanPan commented on a diff in pull request #131: Retained message based on raft state machine

DongyuanPan commented on code in PR #131:
URL: https://github.com/apache/rocketmq-mqtt/pull/131#discussion_r934191130


##########
mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/hook/AbstractUpstreamHook.java:
##########
@@ -63,6 +65,6 @@ public CompletableFuture<HookResult> doHook(MqttMessageUpContext context, MqttMe
 
     public abstract void register();
 
-    public abstract CompletableFuture<HookResult> processMqttMessage(MqttMessageUpContext context, MqttMessage message);
+    public abstract CompletableFuture<HookResult> processMqttMessage(MqttMessageUpContext context, MqttMessage message) throws RemotingException, com.alipay.sofa.jraft.error.RemotingException, ExecutionException, InterruptedException;

Review Comment:
   Ensure that non-fatal errors are exception-handled to prevent the program from terminating because of a tractable exception 



##########
mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/model/Constants.java:
##########
@@ -41,4 +41,6 @@ public class Constants {
 
     public static final String MQTT_TAG = "MQTT_COMMON";
 
+    public static final String PROPERTY_ORIGIN_MQTT_ISEMPTY_MSG = "isEmptyMsg";

Review Comment:
   It's better to write them all in capitals. IS_EMPTY_MSG



##########
mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java:
##########
@@ -39,22 +41,28 @@
 public class MessageUtil {
     public static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
 
-    public static MqttPublishMessage toMqttMessage(String topicName, byte[] body, int qos, int mqttId) {
+
+
+    public static final String EMPTYSTRING = "%@!@%";

Review Comment:
   Use special notation, such as /n /r



##########
meta/src/test/java/org/apache/rocketmq/mqtt/meta/raft/RetainedMsgClient.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.meta.raft;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alipay.sofa.jraft.RouteTable;
+import com.alipay.sofa.jraft.conf.Configuration;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.error.RemotingException;
+import com.alipay.sofa.jraft.option.CliOptions;
+import com.alipay.sofa.jraft.rpc.InvokeCallback;
+import com.alipay.sofa.jraft.rpc.impl.GrpcRaftRpcFactory;
+import com.alipay.sofa.jraft.rpc.impl.MarshallerRegistry;
+import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl;
+import com.alipay.sofa.jraft.util.RpcFactoryHelper;
+import org.apache.rocketmq.mqtt.common.model.Message;
+import org.apache.rocketmq.mqtt.common.model.Trie;
+import org.apache.rocketmq.mqtt.common.model.consistency.ReadRequest;
+import org.apache.rocketmq.mqtt.common.model.consistency.Response;
+import org.apache.rocketmq.mqtt.common.model.consistency.WriteRequest;
+import org.apache.rocketmq.mqtt.common.util.TopicUtils;
+import org.apache.rocketmq.mqtt.meta.raft.processor.Constants;
+
+import java.util.HashMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+public class RetainedMsgClient {

Review Comment:
   Please use unit test format



##########
mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/UpstreamProcessorManager.java:
##########
@@ -64,7 +66,7 @@ public void register() {
     }
 
     @Override
-    public CompletableFuture<HookResult> processMqttMessage(MqttMessageUpContext context, MqttMessage message) {
+    public CompletableFuture<HookResult> processMqttMessage(MqttMessageUpContext context, MqttMessage message) throws RemotingException, com.alipay.sofa.jraft.error.RemotingException, ExecutionException, InterruptedException {

Review Comment:
   Is this exception thrown handled? Whether this will cause the program to exit。If the exception is thrown outside and causes the program to exit, I think you should try the catch at the exact point where it is thrown 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org