You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2024/03/05 02:29:46 UTC
(iotdb) 15/22: [IOTDB-6296] Fix memory leak in MQTTService
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rc/1.3.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6216d3ebbc945feeb6bfc23a15928f6b97ebf8d5
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Mon Feb 19 20:22:52 2024 +0800
[IOTDB-6296] Fix memory leak in MQTTService
---
.../iotdb/db/protocol/mqtt/MPPPublishHandler.java | 151 +++++++++++----------
1 file changed, 78 insertions(+), 73 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index a7aa43f1687..3c0f3c05726 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -102,84 +102,89 @@ public class MPPPublishHandler extends AbstractInterceptHandler {
@Override
public void onPublish(InterceptPublishMessage msg) {
- String clientId = msg.getClientID();
- if (!clientIdToSessionMap.containsKey(clientId)) {
- return;
- }
- MqttClientSession session = clientIdToSessionMap.get(msg.getClientID());
- ByteBuf payload = msg.getPayload();
- String topic = msg.getTopicName();
- String username = msg.getUsername();
- MqttQoS qos = msg.getQos();
-
- LOG.debug(
- "Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}",
- clientId,
- username,
- qos,
- topic,
- payload);
-
- List<Message> events = payloadFormat.format(payload);
- if (events == null) {
- return;
- }
-
- for (Message event : events) {
- if (event == null) {
- continue;
+ try {
+ String clientId = msg.getClientID();
+ if (!clientIdToSessionMap.containsKey(clientId)) {
+ return;
+ }
+ MqttClientSession session = clientIdToSessionMap.get(msg.getClientID());
+ ByteBuf payload = msg.getPayload();
+ String topic = msg.getTopicName();
+ String username = msg.getUsername();
+ MqttQoS qos = msg.getQos();
+
+ LOG.debug(
+ "Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}",
+ clientId,
+ username,
+ qos,
+ topic,
+ payload);
+
+ List<Message> events = payloadFormat.format(payload);
+ if (events == null) {
+ return;
}
- TSStatus tsStatus = null;
- try {
- InsertRowStatement statement = new InsertRowStatement();
- statement.setDevicePath(
- DataNodeDevicePathCache.getInstance().getPartialPath(event.getDevice()));
- TimestampPrecisionUtils.checkTimestampPrecision(event.getTimestamp());
- statement.setTime(event.getTimestamp());
- statement.setMeasurements(event.getMeasurements().toArray(new String[0]));
- if (event.getDataTypes() == null) {
- statement.setDataTypes(new TSDataType[event.getMeasurements().size()]);
- statement.setValues(event.getValues().toArray(new Object[0]));
- statement.setNeedInferType(true);
- } else {
- List<TSDataType> dataTypes = event.getDataTypes();
- List<String> values = event.getValues();
- Object[] inferredValues = new Object[values.size()];
- for (int i = 0; i < values.size(); ++i) {
- inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i), values.get(i));
- }
- statement.setDataTypes(dataTypes.toArray(new TSDataType[0]));
- statement.setValues(inferredValues);
+ for (Message event : events) {
+ if (event == null) {
+ continue;
}
- statement.setAligned(false);
-
- tsStatus = AuthorityChecker.checkAuthority(statement, session);
- if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOG.warn(tsStatus.message);
- } else {
- long queryId = sessionManager.requestQueryId();
- ExecutionResult result =
- Coordinator.getInstance()
- .execute(
- statement,
- queryId,
- sessionManager.getSessionInfo(session),
- "",
- partitionFetcher,
- schemaFetcher,
- config.getQueryTimeoutThreshold());
- tsStatus = result.status;
+
+ TSStatus tsStatus = null;
+ try {
+ InsertRowStatement statement = new InsertRowStatement();
+ statement.setDevicePath(
+ DataNodeDevicePathCache.getInstance().getPartialPath(event.getDevice()));
+ TimestampPrecisionUtils.checkTimestampPrecision(event.getTimestamp());
+ statement.setTime(event.getTimestamp());
+ statement.setMeasurements(event.getMeasurements().toArray(new String[0]));
+ if (event.getDataTypes() == null) {
+ statement.setDataTypes(new TSDataType[event.getMeasurements().size()]);
+ statement.setValues(event.getValues().toArray(new Object[0]));
+ statement.setNeedInferType(true);
+ } else {
+ List<TSDataType> dataTypes = event.getDataTypes();
+ List<String> values = event.getValues();
+ Object[] inferredValues = new Object[values.size()];
+ for (int i = 0; i < values.size(); ++i) {
+ inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i), values.get(i));
+ }
+ statement.setDataTypes(dataTypes.toArray(new TSDataType[0]));
+ statement.setValues(inferredValues);
+ }
+ statement.setAligned(false);
+
+ tsStatus = AuthorityChecker.checkAuthority(statement, session);
+ if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOG.warn(tsStatus.message);
+ } else {
+ long queryId = sessionManager.requestQueryId();
+ ExecutionResult result =
+ Coordinator.getInstance()
+ .execute(
+ statement,
+ queryId,
+ sessionManager.getSessionInfo(session),
+ "",
+ partitionFetcher,
+ schemaFetcher,
+ config.getQueryTimeoutThreshold());
+ tsStatus = result.status;
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "meet error when inserting device {}, measurements {}, at time {}, because ",
+ event.getDevice(),
+ event.getMeasurements(),
+ event.getTimestamp(),
+ e);
}
- } catch (Exception e) {
- LOG.warn(
- "meet error when inserting device {}, measurements {}, at time {}, because ",
- event.getDevice(),
- event.getMeasurements(),
- event.getTimestamp(),
- e);
+ LOG.debug("event process result: {}", tsStatus);
}
- LOG.debug("event process result: {}", tsStatus);
+ } finally {
+ // release the payload of the message
+ super.onPublish(msg);
}
}