You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/12/20 15:05:57 UTC
[iotdb] branch master updated: Add interface to set datatype of mqtt (#8548)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7e1a670675 Add interface to set datatype of mqtt (#8548)
7e1a670675 is described below
commit 7e1a67067582d24422b0a43bab6bfb01c8e4b4e6
Author: 马子坤 <55...@users.noreply.github.com>
AuthorDate: Tue Dec 20 23:05:51 2022 +0800
Add interface to set datatype of mqtt (#8548)
---
.../iotdb/db/protocol/mqtt/MPPPublishHandler.java | 18 +++++++++++++++---
.../org/apache/iotdb/db/protocol/mqtt/Message.java | 11 +++++++++++
2 files changed, 26 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index 2e0dea34f4..a55522c185 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.control.clientsession.MqttClientSession;
+import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -138,9 +139,20 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
statement.setDevicePath(new PartialPath(event.getDevice()));
statement.setTime(event.getTimestamp());
statement.setMeasurements(event.getMeasurements().toArray(new String[0]));
- statement.setDataTypes(new TSDataType[event.getMeasurements().size()]);
- statement.setValues(event.getValues().toArray(new Object[0]));
- statement.setNeedInferType(true);
+ if (event.getDataTypes() == null) {
+ statement.setDataTypes(new TSDataType[event.getMeasurements().size()]);
+ statement.setValues(event.getValues().toArray(new Object[0]));
+ statement.setNeedInferType(true);
+ } else {
+ List<TSDataType> dataTypes = event.getDataTypes();
+ List<String> values = event.getValues();
+ Object[] inferredValues = new Object[values.size()];
+ for (int i = 0; i < values.size(); ++i) {
+ inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i), values.get(i));
+ }
+ statement.setDataTypes(dataTypes.toArray(new TSDataType[0]));
+ statement.setValues(inferredValues);
+ }
statement.setAligned(false);
tsStatus = AuthorityChecker.checkAuthority(statement, session);
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
index e0f09a34eb..f0eed5a566 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
@@ -18,6 +18,8 @@
package org.apache.iotdb.db.protocol.mqtt;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
import java.util.List;
/** Message describes the information sometime sent from the devices. */
@@ -25,6 +27,7 @@ public class Message {
private String device;
private Long timestamp;
private List<String> measurements;
+ private List<TSDataType> dataTypes;
private List<String> values;
public String getDevice() {
@@ -51,6 +54,14 @@ public class Message {
this.measurements = measurements;
}
+ public List<TSDataType> getDataTypes() {
+ return dataTypes;
+ }
+
+ public void setDataTypes(List<TSDataType> dataTypes) {
+ this.dataTypes = dataTypes;
+ }
+
public List<String> getValues() {
return values;
}