You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/21 16:28:31 UTC
[iotdb] branch master updated: [IOTDB-2038] MqttService accesses to BasicServiceProvider (#4440)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fd145d6 [IOTDB-2038] MqttService accesses to BasicServiceProvider (#4440)
fd145d6 is described below
commit fd145d6e5a8f77d1212e1d15759c5bcfcfa7ab82
Author: Xieqijun <44...@users.noreply.github.com>
AuthorDate: Mon Nov 22 00:27:59 2021 +0800
[IOTDB-2038] MqttService accesses to BasicServiceProvider (#4440)
At present, MtqqService contains some duplicate code, which can be accesses to BasicServiceProvider, so as to avoid repeatedly writing code.
In addition, MqttService does not have a perfect authentication mechanism. At present, there is only a user login mechanism, but users do not necessarily have write permissions on all paths. If we access BasicServiceProvider , we can improve the authentication function.
see: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336794
same pr: https://issues.apache.org/jira/browse/IOTDB-2036
---
.../org/apache/iotdb/db/mqtt/PublishHandler.java | 68 +++++++++++++-------
.../db/service/basic/BasicServiceProvider.java | 6 +-
.../apache/iotdb/db/mqtt/PublishHandlerTest.java | 75 +++++++++++++++++++---
3 files changed, 114 insertions(+), 35 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
index ea4ae86..a3d9069 100644
--- a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
@@ -18,50 +18,78 @@
package org.apache.iotdb.db.mqtt;
import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
+import org.apache.iotdb.db.service.basic.BasicServiceProvider;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
import io.moquette.interception.AbstractInterceptHandler;
+import io.moquette.interception.messages.InterceptConnectMessage;
+import io.moquette.interception.messages.InterceptDisconnectMessage;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.ZoneId;
import java.util.List;
/** PublishHandler handle the messages from MQTT clients. */
public class PublishHandler extends AbstractInterceptHandler {
+ private final BasicServiceProvider basicServiceProvider;
+ private long sessionId;
+
private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
- private IPlanExecutor executor;
- private PayloadFormatter payloadFormat;
+ private final PayloadFormatter payloadFormat;
public PublishHandler(IoTDBConfig config) {
this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
try {
- this.executor = new PlanExecutor();
+ this.basicServiceProvider = new BasicServiceProvider();
} catch (QueryProcessException e) {
throw new RuntimeException(e);
}
}
- protected PublishHandler(IPlanExecutor executor, PayloadFormatter payloadFormat) {
- this.executor = executor;
+ protected PublishHandler(PayloadFormatter payloadFormat) {
+ try {
+ this.basicServiceProvider = new BasicServiceProvider();
+ } catch (QueryProcessException e) {
+ throw new RuntimeException(e);
+ }
this.payloadFormat = payloadFormat;
}
@Override
public String getID() {
- return "iotdb-mqtt-broker-listener";
+ return "iotdb-mqtt-broker-listener-" + sessionId;
+ }
+
+ @Override
+ public void onConnect(InterceptConnectMessage msg) {
+ try {
+ BasicOpenSessionResp basicOpenSessionResp =
+ basicServiceProvider.openSession(
+ msg.getUsername(),
+ new String(msg.getPassword()),
+ ZoneId.systemDefault().toString(),
+ TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+ sessionId = basicOpenSessionResp.getSessionId();
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onDisconnect(InterceptDisconnectMessage msg) {
+ basicServiceProvider.closeSession(sessionId);
}
@Override
@@ -101,7 +129,12 @@ public class PublishHandler extends AbstractInterceptHandler {
event.getTimestamp(),
event.getMeasurements().toArray(new String[0]),
event.getValues().toArray(new String[0]));
- status = executeNonQuery(plan);
+ TSStatus tsStatus = basicServiceProvider.checkAuthority(plan, sessionId);
+ if (tsStatus != null) {
+ LOG.warn(tsStatus.message);
+ } else {
+ status = basicServiceProvider.executeNonQuery(plan);
+ }
} catch (Exception e) {
LOG.warn(
"meet error when inserting device {}, measurements {}, at time {}, because ",
@@ -114,13 +147,4 @@ public class PublishHandler extends AbstractInterceptHandler {
LOG.debug("event process result: {}", status);
}
}
-
- private boolean executeNonQuery(PhysicalPlan plan)
- throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
- if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
- throw new QueryProcessException(
- "Current system mode is read-only, does not support non-query operation");
- }
- return executor.processNonQuery(plan);
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
index 693e99e..e11db24 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
@@ -111,7 +111,7 @@ public class BasicServiceProvider {
return AuthorityChecker.check(username, paths, plan.getOperatorType(), targetUser);
}
- protected TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
+ public TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
List<? extends PartialPath> paths = plan.getPaths();
try {
if (!checkAuthorization(paths, plan, sessionManager.getUsername(sessionId))) {
@@ -129,7 +129,7 @@ public class BasicServiceProvider {
return null;
}
- protected BasicOpenSessionResp openSession(
+ public BasicOpenSessionResp openSession(
String username, String password, String zoneId, TSProtocolVersion tsProtocolVersion)
throws TException {
BasicOpenSessionResp openSessionResp = new BasicOpenSessionResp();
@@ -183,7 +183,7 @@ public class BasicServiceProvider {
return openSessionResp.sessionId(sessionId);
}
- protected boolean closeSession(long sessionId) {
+ public boolean closeSession(long sessionId) {
AUDIT_LOGGER.info("Session-{} is closing", sessionId);
sessionManager.removeCurrSessionId();
diff --git a/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java
index 7a70b92..672a41d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java
@@ -17,28 +17,43 @@
*/
package org.apache.iotdb.db.mqtt;
-import org.apache.iotdb.db.qp.executor.IPlanExecutor;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import io.moquette.interception.messages.InterceptConnectMessage;
+import io.moquette.interception.messages.InterceptDisconnectMessage;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.*;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
+import java.sql.*;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
public class PublishHandlerTest {
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
@Test
- public void onPublish() throws Exception {
- IPlanExecutor executor = mock(IPlanExecutor.class);
+ public void onPublish() throws ClassNotFoundException {
PayloadFormatter payloadFormat = PayloadFormatManager.getPayloadFormat("json");
- PublishHandler handler = new PublishHandler(executor, payloadFormat);
+ PublishHandler handler = new PublishHandler(payloadFormat);
String payload =
"{\n"
@@ -50,13 +65,53 @@ public class PublishHandlerTest {
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+ // connect
+ MqttConnectPayload mqttConnectPayload =
+ new MqttConnectPayload(null, null, "test", "root", "root");
+ MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(null, null, mqttConnectPayload);
+ InterceptConnectMessage interceptConnectMessage =
+ new InterceptConnectMessage(mqttConnectMessage);
+ handler.onConnect(interceptConnectMessage);
+
+ // publish
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("root.sg.d1", 1);
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 1);
-
MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, variableHeader, buf);
InterceptPublishMessage message = new InterceptPublishMessage(publishMessage, null, null);
handler.onPublish(message);
- verify(executor).processNonQuery(any(InsertRowPlan.class));
+
+ // disconnect
+ InterceptDisconnectMessage interceptDisconnectMessage =
+ new InterceptDisconnectMessage(null, null);
+ handler.onDisconnect(interceptDisconnectMessage);
+
+ String[] retArray = new String[] {"1586076045524,0.530635,"};
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet = statement.execute("select * from root.sg.d1");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ builder.append(resultSet.getString(i)).append(",");
+ }
+ assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
}