You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/11/17 08:02:53 UTC

[GitHub] [rocketmq-connect] odbozhou commented on a diff in pull request #359: add mqtt source connector

odbozhou commented on code in PR #359:
URL: https://github.com/apache/rocketmq-connect/pull/359#discussion_r1024856480


##########
connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/source/Replicator.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.source;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+import org.apache.rocketmq.connect.mqtt.util.MqttConnectionUtil;
+import org.apache.rocketmq.connect.mqtt.util.Utils;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Replicator {
+
+    private final Logger log = LoggerFactory.getLogger(Replicator.class);
+
+    private MqttClient mqttClient;
+    private SourceConnectorConfig sourceConnectConfig;
+    private BlockingQueue<MqttMessage> queue = new LinkedBlockingQueue<>();
+
+    public Replicator(SourceConnectorConfig sourceConnectConfig) {
+        this.sourceConnectConfig = sourceConnectConfig;
+    }
+
+    public void start() throws Exception {
+        String brokerUrl = sourceConnectConfig.getMqttBrokerUrl();
+        MemoryPersistence memoryPersistence = new MemoryPersistence();
+        String sourceTopic = sourceConnectConfig.getSourceTopic();
+        String recvClientId = Utils.createClientId("recv");
+        MqttConnectOptions mqttConnectOptions = MqttConnectionUtil.buildMqttConnectOptions(recvClientId, sourceConnectConfig);
+        mqttClient = new MqttClient(brokerUrl, recvClientId, memoryPersistence);
+        mqttClient.setCallback(new MqttCallbackExtended() {
+            @Override
+            public void connectComplete(boolean reconnect, String serverURI) {
+                log.info("{} connect success to {}", recvClientId, serverURI);
+                try {
+                    mqttClient.subscribe(sourceTopic, 1);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+
+            @Override
+            public void connectionLost(Throwable throwable) {
+                throwable.printStackTrace();
+            }
+
+            @Override
+            public void messageArrived(String topic, MqttMessage mqttMessage) {
+                try {
+                    commit(mqttMessage, true);
+                } catch (Exception e) {
+                    throw new RuntimeException("commit MqttMessage failed", e);
+                }
+            }
+
+            @Override
+            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+            }
+        });
+
+        try {
+            mqttClient.connect(mqttConnectOptions);
+            log.info("Replicator start succeed");
+        } catch (Exception e) {
+            e.printStackTrace();
+            System.out.println("connect fail");

Review Comment:
   please use log.error replease  system



##########
connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/source/Replicator.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.source;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+import org.apache.rocketmq.connect.mqtt.util.MqttConnectionUtil;
+import org.apache.rocketmq.connect.mqtt.util.Utils;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Replicator {
+
+    private final Logger log = LoggerFactory.getLogger(Replicator.class);
+
+    private MqttClient mqttClient;
+    private SourceConnectorConfig sourceConnectConfig;
+    private BlockingQueue<MqttMessage> queue = new LinkedBlockingQueue<>();
+
+    public Replicator(SourceConnectorConfig sourceConnectConfig) {
+        this.sourceConnectConfig = sourceConnectConfig;
+    }
+
+    public void start() throws Exception {
+        String brokerUrl = sourceConnectConfig.getMqttBrokerUrl();
+        MemoryPersistence memoryPersistence = new MemoryPersistence();
+        String sourceTopic = sourceConnectConfig.getSourceTopic();
+        String recvClientId = Utils.createClientId("recv");
+        MqttConnectOptions mqttConnectOptions = MqttConnectionUtil.buildMqttConnectOptions(recvClientId, sourceConnectConfig);
+        mqttClient = new MqttClient(brokerUrl, recvClientId, memoryPersistence);
+        mqttClient.setCallback(new MqttCallbackExtended() {
+            @Override
+            public void connectComplete(boolean reconnect, String serverURI) {
+                log.info("{} connect success to {}", recvClientId, serverURI);
+                try {
+                    mqttClient.subscribe(sourceTopic, 1);
+                } catch (Exception e) {
+                    e.printStackTrace();

Review Comment:
   Whether to print the promise error log



##########
connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/source/Replicator.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.source;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+import org.apache.rocketmq.connect.mqtt.util.MqttConnectionUtil;
+import org.apache.rocketmq.connect.mqtt.util.Utils;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Replicator {
+
+    private final Logger log = LoggerFactory.getLogger(Replicator.class);
+
+    private MqttClient mqttClient;
+    private SourceConnectorConfig sourceConnectConfig;
+    private BlockingQueue<MqttMessage> queue = new LinkedBlockingQueue<>();
+
+    public Replicator(SourceConnectorConfig sourceConnectConfig) {
+        this.sourceConnectConfig = sourceConnectConfig;
+    }
+
+    public void start() throws Exception {
+        String brokerUrl = sourceConnectConfig.getMqttBrokerUrl();
+        MemoryPersistence memoryPersistence = new MemoryPersistence();
+        String sourceTopic = sourceConnectConfig.getSourceTopic();
+        String recvClientId = Utils.createClientId("recv");
+        MqttConnectOptions mqttConnectOptions = MqttConnectionUtil.buildMqttConnectOptions(recvClientId, sourceConnectConfig);
+        mqttClient = new MqttClient(brokerUrl, recvClientId, memoryPersistence);
+        mqttClient.setCallback(new MqttCallbackExtended() {
+            @Override
+            public void connectComplete(boolean reconnect, String serverURI) {
+                log.info("{} connect success to {}", recvClientId, serverURI);
+                try {
+                    mqttClient.subscribe(sourceTopic, 1);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+
+            @Override
+            public void connectionLost(Throwable throwable) {
+                throwable.printStackTrace();

Review Comment:
   Whether to print the promise error log



##########
connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/connector/MqttSourceConnector.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+
+public class MqttSourceConnector extends SourceConnector {
+
+    private KeyValue config;
+
+    @Override public void start(KeyValue config) {
+        for (String requestKey : SourceConnectorConfig.REQUEST_CONFIG) {
+            if (!config.containsKey(requestKey)) {
+                throw new RuntimeException("Request config key: " + requestKey);

Review Comment:
   ConnetException may be better



##########
connectors/rocketmq-connect-mqtt/src/main/java/org/apache/rocketmq/connect/mqtt/connector/MqttSourceTask.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.mqtt.connector;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.connect.mqtt.config.SourceConnectorConfig;
+import org.apache.rocketmq.connect.mqtt.source.Replicator;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MqttSourceTask extends SourceTask {
+
+    private static final Logger log = LoggerFactory.getLogger(MqttSourceTask.class);
+
+    private Replicator replicator;
+
+    private SourceConnectorConfig sourceConnectConfig;
+
+    @Override
+    public List<ConnectRecord> poll() {
+        List<ConnectRecord> res = new ArrayList<>();
+        try {
+            MqttMessage message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS);
+            if (message != null) {
+                res.add(message2ConnectRecord(message));
+            }
+        } catch (Exception e) {
+            log.error("mqtt task poll error, current config:" + JSON.toJSONString(sourceConnectConfig), e);
+        }
+        return res;
+    }
+
+    @Override
+    public void start(KeyValue props) {
+        try {
+            this.sourceConnectConfig = new SourceConnectorConfig();
+            this.sourceConnectConfig.load(props);
+            this.replicator = new Replicator(sourceConnectConfig);
+            this.replicator.start();
+        } catch (Exception e) {
+            log.error("mqtt task start failed.", e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            replicator.stop();
+        } catch (Exception e) {
+            log.error("mqtt task stop failed.", e);
+        }
+    }
+
+    private ConnectRecord message2ConnectRecord(MqttMessage message) {
+        Schema schema = SchemaBuilder.struct().name("mqtt").build();

Review Comment:
   Should struct().name fill in topicname?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org