You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2024/03/05 02:29:46 UTC

(iotdb) 15/22: [IOTDB-6296] Fix memory leak in MQTTService

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rc/1.3.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6216d3ebbc945feeb6bfc23a15928f6b97ebf8d5
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Mon Feb 19 20:22:52 2024 +0800

    [IOTDB-6296] Fix memory leak in MQTTService
---
 .../iotdb/db/protocol/mqtt/MPPPublishHandler.java  | 151 +++++++++++----------
 1 file changed, 78 insertions(+), 73 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index a7aa43f1687..3c0f3c05726 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -102,84 +102,89 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
 
   @Override
   public void onPublish(InterceptPublishMessage msg) {
-    String clientId = msg.getClientID();
-    if (!clientIdToSessionMap.containsKey(clientId)) {
-      return;
-    }
-    MqttClientSession session = clientIdToSessionMap.get(msg.getClientID());
-    ByteBuf payload = msg.getPayload();
-    String topic = msg.getTopicName();
-    String username = msg.getUsername();
-    MqttQoS qos = msg.getQos();
-
-    LOG.debug(
-        "Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}",
-        clientId,
-        username,
-        qos,
-        topic,
-        payload);
-
-    List<Message> events = payloadFormat.format(payload);
-    if (events == null) {
-      return;
-    }
-
-    for (Message event : events) {
-      if (event == null) {
-        continue;
+    try {
+      String clientId = msg.getClientID();
+      if (!clientIdToSessionMap.containsKey(clientId)) {
+        return;
+      }
+      MqttClientSession session = clientIdToSessionMap.get(msg.getClientID());
+      ByteBuf payload = msg.getPayload();
+      String topic = msg.getTopicName();
+      String username = msg.getUsername();
+      MqttQoS qos = msg.getQos();
+
+      LOG.debug(
+          "Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}",
+          clientId,
+          username,
+          qos,
+          topic,
+          payload);
+
+      List<Message> events = payloadFormat.format(payload);
+      if (events == null) {
+        return;
       }
 
-      TSStatus tsStatus = null;
-      try {
-        InsertRowStatement statement = new InsertRowStatement();
-        statement.setDevicePath(
-            DataNodeDevicePathCache.getInstance().getPartialPath(event.getDevice()));
-        TimestampPrecisionUtils.checkTimestampPrecision(event.getTimestamp());
-        statement.setTime(event.getTimestamp());
-        statement.setMeasurements(event.getMeasurements().toArray(new String[0]));
-        if (event.getDataTypes() == null) {
-          statement.setDataTypes(new TSDataType[event.getMeasurements().size()]);
-          statement.setValues(event.getValues().toArray(new Object[0]));
-          statement.setNeedInferType(true);
-        } else {
-          List<TSDataType> dataTypes = event.getDataTypes();
-          List<String> values = event.getValues();
-          Object[] inferredValues = new Object[values.size()];
-          for (int i = 0; i < values.size(); ++i) {
-            inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i), values.get(i));
-          }
-          statement.setDataTypes(dataTypes.toArray(new TSDataType[0]));
-          statement.setValues(inferredValues);
+      for (Message event : events) {
+        if (event == null) {
+          continue;
         }
-        statement.setAligned(false);
-
-        tsStatus = AuthorityChecker.checkAuthority(statement, session);
-        if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          LOG.warn(tsStatus.message);
-        } else {
-          long queryId = sessionManager.requestQueryId();
-          ExecutionResult result =
-              Coordinator.getInstance()
-                  .execute(
-                      statement,
-                      queryId,
-                      sessionManager.getSessionInfo(session),
-                      "",
-                      partitionFetcher,
-                      schemaFetcher,
-                      config.getQueryTimeoutThreshold());
-          tsStatus = result.status;
+
+        TSStatus tsStatus = null;
+        try {
+          InsertRowStatement statement = new InsertRowStatement();
+          statement.setDevicePath(
+              DataNodeDevicePathCache.getInstance().getPartialPath(event.getDevice()));
+          TimestampPrecisionUtils.checkTimestampPrecision(event.getTimestamp());
+          statement.setTime(event.getTimestamp());
+          statement.setMeasurements(event.getMeasurements().toArray(new String[0]));
+          if (event.getDataTypes() == null) {
+            statement.setDataTypes(new TSDataType[event.getMeasurements().size()]);
+            statement.setValues(event.getValues().toArray(new Object[0]));
+            statement.setNeedInferType(true);
+          } else {
+            List<TSDataType> dataTypes = event.getDataTypes();
+            List<String> values = event.getValues();
+            Object[] inferredValues = new Object[values.size()];
+            for (int i = 0; i < values.size(); ++i) {
+              inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i), values.get(i));
+            }
+            statement.setDataTypes(dataTypes.toArray(new TSDataType[0]));
+            statement.setValues(inferredValues);
+          }
+          statement.setAligned(false);
+
+          tsStatus = AuthorityChecker.checkAuthority(statement, session);
+          if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            LOG.warn(tsStatus.message);
+          } else {
+            long queryId = sessionManager.requestQueryId();
+            ExecutionResult result =
+                Coordinator.getInstance()
+                    .execute(
+                        statement,
+                        queryId,
+                        sessionManager.getSessionInfo(session),
+                        "",
+                        partitionFetcher,
+                        schemaFetcher,
+                        config.getQueryTimeoutThreshold());
+            tsStatus = result.status;
+          }
+        } catch (Exception e) {
+          LOG.warn(
+              "meet error when inserting device {}, measurements {}, at time {}, because ",
+              event.getDevice(),
+              event.getMeasurements(),
+              event.getTimestamp(),
+              e);
         }
-      } catch (Exception e) {
-        LOG.warn(
-            "meet error when inserting device {}, measurements {}, at time {}, because ",
-            event.getDevice(),
-            event.getMeasurements(),
-            event.getTimestamp(),
-            e);
+        LOG.debug("event process result: {}", tsStatus);
       }
-      LOG.debug("event process result: {}", tsStatus);
+    } finally {
+      // release the payload of the message
+      super.onPublish(msg);
     }
   }