You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/12/20 15:05:57 UTC

[iotdb] branch master updated: Add interface to set datatype of mqtt (#8548)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e1a670675 Add interface to set datatype of mqtt (#8548)
7e1a670675 is described below

commit 7e1a67067582d24422b0a43bab6bfb01c8e4b4e6
Author: 马子坤 <55...@users.noreply.github.com>
AuthorDate: Tue Dec 20 23:05:51 2022 +0800

    Add interface to set datatype of mqtt (#8548)
---
 .../iotdb/db/protocol/mqtt/MPPPublishHandler.java      | 18 +++++++++++++++---
 .../org/apache/iotdb/db/protocol/mqtt/Message.java     | 11 +++++++++++
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index 2e0dea34f4..a55522c185 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.clientsession.MqttClientSession;
+import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -138,9 +139,20 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
         statement.setDevicePath(new PartialPath(event.getDevice()));
         statement.setTime(event.getTimestamp());
         statement.setMeasurements(event.getMeasurements().toArray(new String[0]));
-        statement.setDataTypes(new TSDataType[event.getMeasurements().size()]);
-        statement.setValues(event.getValues().toArray(new Object[0]));
-        statement.setNeedInferType(true);
+        if (event.getDataTypes() == null) {
+          statement.setDataTypes(new TSDataType[event.getMeasurements().size()]);
+          statement.setValues(event.getValues().toArray(new Object[0]));
+          statement.setNeedInferType(true);
+        } else {
+          List<TSDataType> dataTypes = event.getDataTypes();
+          List<String> values = event.getValues();
+          Object[] inferredValues = new Object[values.size()];
+          for (int i = 0; i < values.size(); ++i) {
+            inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i), values.get(i));
+          }
+          statement.setDataTypes(dataTypes.toArray(new TSDataType[0]));
+          statement.setValues(inferredValues);
+        }
         statement.setAligned(false);
 
         tsStatus = AuthorityChecker.checkAuthority(statement, session);
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
index e0f09a34eb..f0eed5a566 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
@@ -18,6 +18,8 @@
 
 package org.apache.iotdb.db.protocol.mqtt;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
 import java.util.List;
 
 /** Message describes the information sometime sent from the devices. */
@@ -25,6 +27,7 @@ public class Message {
   private String device;
   private Long timestamp;
   private List<String> measurements;
+  private List<TSDataType> dataTypes;
   private List<String> values;
 
   public String getDevice() {
@@ -51,6 +54,14 @@ public class Message {
     this.measurements = measurements;
   }
 
+  public List<TSDataType> getDataTypes() {
+    return dataTypes;
+  }
+
+  public void setDataTypes(List<TSDataType> dataTypes) {
+    this.dataTypes = dataTypes;
+  }
+
   public List<String> getValues() {
     return values;
   }